from __future__ import annotations import json import unittest from mais_humana.cli import main from mais_humana.mcp_gateway_access_policy import ( AccessPolicyStatus, build_access_policy_report, has_secret_shape, probes_from_publication_gate_payload, run_access_policy_gate, ) from tests.helpers import make_tmp def publication_gate_payload() -> dict[str, object]: return { "report": { "live_probes": [ { "tool_id": "mais_humana.rulebook.compact", "endpoint": "https://mcps-gateway.ami-app.workers.dev/v1/execute", "http_status": 200, "ok": True, "trace_id": "trace:mcps-gateway:actor:mais_humana.rulebook.compact", "audit_id": "audit:mcps-gateway:actor:mais_humana.rulebook.compact", "evidence_id": "evidence-rulebook", "source_payload_hash": "hash-request-rulebook", "source_records_hash": "hash-response-rulebook", "response_excerpt": {"ok": "True", "providerId": "mais_humana"}, "observed_at": "2026-05-02T00:00:00+00:00", }, { "tool_id": "mais_humana.admin_ui.same_source", "endpoint": "https://mcps-gateway.ami-app.workers.dev/v1/execute", "http_status": 200, "ok": True, "trace_id": "trace:mcps-gateway:actor:mais_humana.admin_ui.same_source", "audit_id": "audit:mcps-gateway:actor:mais_humana.admin_ui.same_source", "evidence_id": "evidence-same-source", "source_payload_hash": "hash-request-same-source", "source_records_hash": "hash-response-same-source", "response_excerpt": {"ok": "True", "sameSource": "True"}, "observed_at": "2026-05-02T00:00:00+00:00", }, { "tool_id": "mais_humana.mcp_transit.ledger", "endpoint": "https://mcps-gateway.ami-app.workers.dev/v1/execute", "http_status": 200, "ok": True, "trace_id": "trace:mcps-gateway:actor:mais_humana.mcp_transit.ledger", "audit_id": "audit:mcps-gateway:actor:mais_humana.mcp_transit.ledger", "evidence_id": "evidence-ledger", "source_payload_hash": "hash-request-ledger", "source_records_hash": "hash-response-ledger", "response_excerpt": {"ok": "True", "records": "3"}, "observed_at": "2026-05-02T00:00:00+00:00", }, ] } } class McpGatewayAccessPolicyTests(unittest.TestCase): def test_policy_from_publication_gate_payload_passes_without_secret_leak(self) -> None: probes = probes_from_publication_gate_payload(publication_gate_payload()) report = build_access_policy_report(probes=probes) self.assertEqual(len(report.probes), 3) self.assertEqual(report.status, AccessPolicyStatus.PASSED) self.assertTrue(report.live_ready) self.assertTrue(report.secret_safe) self.assertFalse(report.blockers) by_rule = {check.rule_id: check for check in report.checks} self.assertEqual(by_rule["auth.bearer.present-redacted"].status, AccessPolicyStatus.PASSED) self.assertEqual(by_rule["redaction.no-secret-shapes"].status, AccessPolicyStatus.PASSED) def test_secret_shapes_block_redaction_rule(self) -> None: self.assertTrue(has_secret_shape("Authorization: Bearer rawtoken123456")) self.assertTrue(has_secret_shape("cfat_abc123")) payload = publication_gate_payload() live_probes = payload["report"]["live_probes"] # type: ignore[index] live_probes[0]["response_excerpt"] = {"authorization": "Bearer rawtoken123456"} # type: ignore[index] report = build_access_policy_report(probes=probes_from_publication_gate_payload(payload)) by_rule = {check.rule_id: check for check in report.checks} self.assertEqual(by_rule["redaction.no-secret-shapes"].status, AccessPolicyStatus.BLOCKED) def test_run_access_policy_gate_writes_project_and_central_artifacts(self) -> None: tmp = make_tmp() project = tmp / "tudo-para-ia-mais-humana" central = tmp / "central" / "projects" / "15_repo_tudo-para-ia-mais-humana-platform" gate_json = project / "dados" / "mcp-publication-gate-mais-humana.json" gate_json.parent.mkdir(parents=True, exist_ok=True) gate_json.write_text(json.dumps(publication_gate_payload()), encoding="utf-8") report, records = run_access_policy_gate(project_root=project, central_platform_folder=central, publication_gate_json=gate_json) self.assertEqual(report.status, AccessPolicyStatus.PASSED) self.assertTrue((project / "dados" / "mcp-gateway-access-policy.json").exists()) self.assertTrue((project / "matrizes" / "mcp-gateway-access-policy.csv").exists()) self.assertTrue((central / "reports" / "MCP-GATEWAY-ACCESS-POLICY__RODADA015.md").exists()) self.assertGreaterEqual(len(records), 4) def test_cli_access_policy_writes_payload(self) -> None: tmp = make_tmp() project = tmp / "tudo-para-ia-mais-humana" gate_json = project / "dados" / "mcp-publication-gate-mais-humana.json" gate_json.parent.mkdir(parents=True, exist_ok=True) gate_json.write_text(json.dumps(publication_gate_payload()), encoding="utf-8") code = main( [ "mcp-access-policy", "--project-root", str(project), "--publication-gate-json", str(gate_json), ] ) self.assertEqual(code, 0) payload = json.loads((project / "dados" / "mcp-gateway-access-policy.json").read_text(encoding="utf-8")) self.assertEqual(payload["status"], "passed") self.assertTrue(payload["secretSafe"]) if __name__ == "__main__": unittest.main()