161 lines
6.5 KiB
Python
161 lines
6.5 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
from mais_humana.central_consolidation import (
|
|
PendingClass,
|
|
build_consolidated_report,
|
|
classify_pending_text,
|
|
consolidated_pending_csv,
|
|
consolidated_report_markdown,
|
|
run_consolidated_report,
|
|
)
|
|
from mais_humana.cli import main
|
|
from tests.helpers import make_tmp
|
|
|
|
|
|
class CentralConsolidationTests(unittest.TestCase):
|
|
def make_platform(
|
|
self,
|
|
root: Path,
|
|
folder: str,
|
|
*,
|
|
project_id: str = "alpha",
|
|
pending: str = "",
|
|
order_reason: str = "",
|
|
) -> Path:
|
|
platform = root / folder
|
|
(platform / "indexes").mkdir(parents=True)
|
|
(platform / "current").mkdir()
|
|
(platform / "status").mkdir()
|
|
(platform / "orders" / "executivas").mkdir(parents=True)
|
|
(platform / "orders" / "gerenciais").mkdir(parents=True)
|
|
(platform / "README.md").write_text(
|
|
"\n".join(
|
|
[
|
|
f"# {project_id}",
|
|
"",
|
|
"Projeto real:",
|
|
f"`G:/_codex-git/{project_id}`",
|
|
"",
|
|
"## Identificacao",
|
|
"",
|
|
f"- project_id: `{project_id}`",
|
|
f"- repo_name: `{project_id}`",
|
|
"- status_gerencial: `ativo`",
|
|
"",
|
|
"## Papel no ecossistema",
|
|
"",
|
|
"A plataforma traduz estado tecnico em decisao administrativa e evidencia rastreavel.",
|
|
]
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
(platform / "indexes" / "pending-index.md").write_text(pending, encoding="utf-8")
|
|
(platform / "current" / "current-project-state.md").write_text(
|
|
"# Estado\n\n## Avanco desta rodada\n\n- Provider MCP conectado com traceId e auditId.\n",
|
|
encoding="utf-8",
|
|
)
|
|
if order_reason:
|
|
(platform / "orders" / "executivas" / "0001_EXECUTIVA__validar.md").write_text(
|
|
"\n".join(
|
|
[
|
|
"# ORDEM DE SERVICO: 0001_EXECUTIVA__validar",
|
|
"",
|
|
"## Finalidade da ordem de servico",
|
|
"",
|
|
"Validar pendencia.",
|
|
"",
|
|
"## Motivo da criacao da ordem de servico",
|
|
"",
|
|
order_reason,
|
|
"",
|
|
"## Identificacao",
|
|
"",
|
|
"- order_id: `0001_EXECUTIVA__validar`",
|
|
"- tipo: `executiva`",
|
|
f"- project_id: `{project_id}`",
|
|
"- status: `planejada`",
|
|
]
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
return platform
|
|
|
|
def test_classify_pending_text_splits_external_internal_and_plugin_observation(self) -> None:
|
|
self.assertEqual(classify_pending_text("HTTP 403 code 1010 WAF owner")[0], PendingClass.EXTERNAL)
|
|
self.assertEqual(classify_pending_text("atualizar README e SQLite semantico")[0], PendingClass.INTERNAL)
|
|
self.assertEqual(classify_pending_text("user rejected MCP tool call do plugin Cloudflare")[0], PendingClass.OBSERVATION)
|
|
|
|
def test_build_consolidated_report_redacts_and_counts_pending_items(self) -> None:
|
|
tmp = make_tmp()
|
|
central_root = tmp / "central" / "projects"
|
|
central_root.mkdir(parents=True)
|
|
self.make_platform(
|
|
central_root,
|
|
"01_repo_alpha",
|
|
project_id="alpha",
|
|
pending="- HTTP 403 code 1010 no WAF\n- atualizar README e SQLite semantico\n- cloudflare_token=abcdefghijklmnopqrstuvwxyz1234567890\n",
|
|
order_reason="Executar suite de teste e publicar relatorio.",
|
|
)
|
|
report = build_consolidated_report(
|
|
central_root,
|
|
tmp / "human",
|
|
plugin_auth_attempt="user rejected MCP tool call",
|
|
git_sync_status="fetch falhou por SEC_E_NO_CREDENTIALS",
|
|
)
|
|
self.assertEqual(report.platform_count, 1)
|
|
self.assertGreaterEqual(report.external_pending_count, 2)
|
|
self.assertGreaterEqual(report.internal_pending_count, 2)
|
|
text = consolidated_report_markdown(report)
|
|
self.assertIn("Relatorio consolidado", text)
|
|
self.assertNotIn("abcdefghijklmnopqrstuvwxyz1234567890", text)
|
|
csv_text = consolidated_pending_csv(report)
|
|
self.assertIn("external_blocker", csv_text)
|
|
self.assertIn("internal_codex_action", csv_text)
|
|
|
|
def test_run_consolidated_report_writes_project_and_central_outputs(self) -> None:
|
|
tmp = make_tmp()
|
|
central_root = tmp / "central" / "projects"
|
|
project = tmp / "human"
|
|
platform = self.make_platform(
|
|
central_root,
|
|
"15_repo_tudo-para-ia-mais-humana-platform",
|
|
project_id="tudo-para-ia-mais-humana",
|
|
pending="- resolver documentacao e testes\n",
|
|
)
|
|
report, records = run_consolidated_report(central_root, project, central_platform_folder=platform)
|
|
self.assertEqual(report.platform_count, 1)
|
|
self.assertTrue((project / "dados" / "relatorio-consolidado-operacional.json").exists())
|
|
self.assertTrue((project / "matrizes" / "pendencias-consolidadas.csv").exists())
|
|
self.assertTrue((platform / "reports" / "RELATORIO-CONSOLIDADO-ADMINISTRATIVO__RODADA015.md").exists())
|
|
self.assertGreaterEqual(len(records), 4)
|
|
|
|
def test_cli_consolidated_report_writes_json_payload(self) -> None:
|
|
tmp = make_tmp()
|
|
central_root = tmp / "central" / "projects"
|
|
project = tmp / "human"
|
|
self.make_platform(central_root, "01_repo_alpha", pending="- atualizar status e indice\n")
|
|
code = main(
|
|
[
|
|
"consolidated-report",
|
|
"--central-projects-root",
|
|
str(central_root),
|
|
"--project-root",
|
|
str(project),
|
|
"--plugin-auth-attempt",
|
|
"user rejected MCP tool call",
|
|
]
|
|
)
|
|
self.assertEqual(code, 0)
|
|
payload = json.loads((project / "dados" / "relatorio-consolidado-operacional.json").read_text(encoding="utf-8"))
|
|
self.assertEqual(payload["platform_count"] if "platform_count" in payload else len(payload["platforms"]), 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|