Files
tudo-para-ia-mais-humana-pl…/tests/test_central_materialization.py

85 lines
4.3 KiB
Python

from __future__ import annotations
import json
import unittest
from mais_humana.central_materialization import (
MaterializationStatus,
active_input_order_specs,
active_managerial_order_specs,
next_output_order_specs,
run_central_materialization,
)
from mais_humana.cli import main
from mais_humana.storage import table_counts
from tests.helpers import make_tmp
class CentralMaterializationTests(unittest.TestCase):
def test_specs_cover_active_and_next_output_cycles(self) -> None:
active = active_input_order_specs()
managerial = active_managerial_order_specs()
output = next_output_order_specs()
self.assertEqual(len(active), 5)
self.assertEqual(len(managerial), 5)
self.assertEqual(len([item for item in output if item.order_type.value == "executiva"]), 5)
self.assertEqual(len([item for item in output if item.order_type.value == "gerencial"]), 5)
self.assertEqual(active[0].order_id, "0037_EXECUTIVA__homologar-rotas-administrativas-mcp-no-gateway")
self.assertEqual(managerial[-1].order_id, "0053_GERENCIAL__pactuar-docs-catalogonly-ou-response-ready-como-excecao-global")
def test_run_materialization_creates_orders_reports_indexes_and_sql(self) -> None:
root = make_tmp()
project = root / "project"
central = root / "central" / "projects" / "15_repo_tudo-para-ia-mais-humana-platform"
report = run_central_materialization(project_root=project, central_platform_folder=central)
self.assertIn(report.status, {MaterializationStatus.CREATED, MaterializationStatus.EXISTS})
self.assertTrue((central / "orders" / "executivas" / "0037_EXECUTIVA__homologar-rotas-administrativas-mcp-no-gateway.md").exists())
self.assertTrue((central / "orders" / "gerenciais" / "0058_GERENCIAL__fechar-decisao-docs-response-ready-ou-catalogonly-formal.md").exists())
self.assertTrue((central / "reports" / "EXECUTADO__rodada-015-central-materialization-2026-05-02.md").exists())
self.assertTrue((central / "indexes" / "orders-index.md").exists())
self.assertTrue((central / "current" / "active-order-queue.md").exists())
self.assertTrue((central / "status" / "overview.md").exists())
counts = table_counts(central / "controle-semantico.sqlite")
self.assertGreaterEqual(counts.get("service_orders", 0), 20)
self.assertGreaterEqual(counts.get("files", 0), 20)
def test_cli_materialization_writes_payload(self) -> None:
root = make_tmp()
project = root / "project"
central = root / "central" / "platform"
code = main(["central-materialization", "--project-root", str(project), "--central-platform-folder", str(central)])
self.assertEqual(code, 0)
payload = json.loads((central / "dados" / "central-materialization-report.json").read_text(encoding="utf-8"))
self.assertEqual(payload["project_id"], "tudo-para-ia-mais-humana-platform")
self.assertEqual(len(payload["active_input_orders"]), 10)
self.assertEqual(len(payload["next_output_orders"]), 10)
def test_run_materialization_preserves_project_fallback_when_central_is_blocked(self) -> None:
root = make_tmp()
project = root / "project"
central = root / "central-blocked"
central.write_text("not a directory", encoding="utf-8")
report = run_central_materialization(project_root=project, central_platform_folder=central)
self.assertEqual(report.status, MaterializationStatus.PARTIAL)
self.assertGreaterEqual(len(report.fallback_order_files), 20)
self.assertIsNotNone(report.local_semantic_write)
self.assertTrue(report.local_semantic_write and report.local_semantic_write.ok)
fallback_order = (
project
/ "os-orientadoras"
/ "central-materialization-fallback"
/ "executivas"
/ "0037_EXECUTIVA__homologar-rotas-administrativas-mcp-no-gateway.md"
)
self.assertTrue(fallback_order.exists())
self.assertIn("Espelho operacional de contingencia", fallback_order.read_text(encoding="utf-8"))
counts = table_counts(project / "controle-semantico.sqlite")
self.assertGreaterEqual(counts.get("service_orders", 0), 20)
self.assertGreaterEqual(counts.get("files", 0), 20)
if __name__ == "__main__":
unittest.main()