from __future__ import annotations import json import unittest from pathlib import Path from mais_humana.central_consolidation import ( PendingClass, build_consolidated_report, classify_pending_text, consolidated_pending_csv, consolidated_report_markdown, run_consolidated_report, ) from mais_humana.cli import main from tests.helpers import make_tmp class CentralConsolidationTests(unittest.TestCase): def make_platform( self, root: Path, folder: str, *, project_id: str = "alpha", pending: str = "", order_reason: str = "", ) -> Path: platform = root / folder (platform / "indexes").mkdir(parents=True) (platform / "current").mkdir() (platform / "status").mkdir() (platform / "orders" / "executivas").mkdir(parents=True) (platform / "orders" / "gerenciais").mkdir(parents=True) (platform / "README.md").write_text( "\n".join( [ f"# {project_id}", "", "Projeto real:", f"`G:/_codex-git/{project_id}`", "", "## Identificacao", "", f"- project_id: `{project_id}`", f"- repo_name: `{project_id}`", "- status_gerencial: `ativo`", "", "## Papel no ecossistema", "", "A plataforma traduz estado tecnico em decisao administrativa e evidencia rastreavel.", ] ) + "\n", encoding="utf-8", ) (platform / "indexes" / "pending-index.md").write_text(pending, encoding="utf-8") (platform / "current" / "current-project-state.md").write_text( "# Estado\n\n## Avanco desta rodada\n\n- Provider MCP conectado com traceId e auditId.\n", encoding="utf-8", ) if order_reason: (platform / "orders" / "executivas" / "0001_EXECUTIVA__validar.md").write_text( "\n".join( [ "# ORDEM DE SERVICO: 0001_EXECUTIVA__validar", "", "## Finalidade da ordem de servico", "", "Validar pendencia.", "", "## Motivo da criacao da ordem de servico", "", order_reason, "", "## Identificacao", "", "- order_id: `0001_EXECUTIVA__validar`", "- tipo: `executiva`", f"- project_id: `{project_id}`", "- status: `planejada`", ] ) + "\n", encoding="utf-8", ) return platform def test_classify_pending_text_splits_external_internal_and_plugin_observation(self) -> None: self.assertEqual(classify_pending_text("HTTP 403 code 1010 WAF owner")[0], PendingClass.EXTERNAL) self.assertEqual(classify_pending_text("atualizar README e SQLite semantico")[0], PendingClass.INTERNAL) self.assertEqual(classify_pending_text("user rejected MCP tool call do plugin Cloudflare")[0], PendingClass.OBSERVATION) def test_build_consolidated_report_redacts_and_counts_pending_items(self) -> None: tmp = make_tmp() central_root = tmp / "central" / "projects" central_root.mkdir(parents=True) self.make_platform( central_root, "01_repo_alpha", project_id="alpha", pending="- HTTP 403 code 1010 no WAF\n- atualizar README e SQLite semantico\n- cloudflare_token=abcdefghijklmnopqrstuvwxyz1234567890\n", order_reason="Executar suite de teste e publicar relatorio.", ) report = build_consolidated_report( central_root, tmp / "human", plugin_auth_attempt="user rejected MCP tool call", git_sync_status="fetch falhou por SEC_E_NO_CREDENTIALS", ) self.assertEqual(report.platform_count, 1) self.assertGreaterEqual(report.external_pending_count, 2) self.assertGreaterEqual(report.internal_pending_count, 2) text = consolidated_report_markdown(report) self.assertIn("Relatorio consolidado", text) self.assertNotIn("abcdefghijklmnopqrstuvwxyz1234567890", text) csv_text = consolidated_pending_csv(report) self.assertIn("external_blocker", csv_text) self.assertIn("internal_codex_action", csv_text) def test_run_consolidated_report_writes_project_and_central_outputs(self) -> None: tmp = make_tmp() central_root = tmp / "central" / "projects" project = tmp / "human" platform = self.make_platform( central_root, "15_repo_tudo-para-ia-mais-humana-platform", project_id="tudo-para-ia-mais-humana", pending="- resolver documentacao e testes\n", ) report, records = run_consolidated_report(central_root, project, central_platform_folder=platform) self.assertEqual(report.platform_count, 1) self.assertTrue((project / "dados" / "relatorio-consolidado-operacional.json").exists()) self.assertTrue((project / "matrizes" / "pendencias-consolidadas.csv").exists()) self.assertTrue((platform / "reports" / "RELATORIO-CONSOLIDADO-ADMINISTRATIVO__RODADA015.md").exists()) self.assertGreaterEqual(len(records), 4) def test_cli_consolidated_report_writes_json_payload(self) -> None: tmp = make_tmp() central_root = tmp / "central" / "projects" project = tmp / "human" self.make_platform(central_root, "01_repo_alpha", pending="- atualizar status e indice\n") code = main( [ "consolidated-report", "--central-projects-root", str(central_root), "--project-root", str(project), "--plugin-auth-attempt", "user rejected MCP tool call", ] ) self.assertEqual(code, 0) payload = json.loads((project / "dados" / "relatorio-consolidado-operacional.json").read_text(encoding="utf-8")) self.assertEqual(payload["platform_count"] if "platform_count" in payload else len(payload["platforms"]), 1) if __name__ == "__main__": unittest.main()