125 lines
6.1 KiB
Python
125 lines
6.1 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import unittest
|
|
|
|
from mais_humana.cli import main
|
|
from mais_humana.mcp_gateway_access_policy import (
|
|
AccessPolicyStatus,
|
|
build_access_policy_report,
|
|
has_secret_shape,
|
|
probes_from_publication_gate_payload,
|
|
run_access_policy_gate,
|
|
)
|
|
from tests.helpers import make_tmp
|
|
|
|
|
|
def publication_gate_payload() -> dict[str, object]:
|
|
return {
|
|
"report": {
|
|
"live_probes": [
|
|
{
|
|
"tool_id": "mais_humana.rulebook.compact",
|
|
"endpoint": "https://mcps-gateway.ami-app.workers.dev/v1/execute",
|
|
"http_status": 200,
|
|
"ok": True,
|
|
"trace_id": "trace:mcps-gateway:actor:mais_humana.rulebook.compact",
|
|
"audit_id": "audit:mcps-gateway:actor:mais_humana.rulebook.compact",
|
|
"evidence_id": "evidence-rulebook",
|
|
"source_payload_hash": "hash-request-rulebook",
|
|
"source_records_hash": "hash-response-rulebook",
|
|
"response_excerpt": {"ok": "True", "providerId": "mais_humana"},
|
|
"observed_at": "2026-05-02T00:00:00+00:00",
|
|
},
|
|
{
|
|
"tool_id": "mais_humana.admin_ui.same_source",
|
|
"endpoint": "https://mcps-gateway.ami-app.workers.dev/v1/execute",
|
|
"http_status": 200,
|
|
"ok": True,
|
|
"trace_id": "trace:mcps-gateway:actor:mais_humana.admin_ui.same_source",
|
|
"audit_id": "audit:mcps-gateway:actor:mais_humana.admin_ui.same_source",
|
|
"evidence_id": "evidence-same-source",
|
|
"source_payload_hash": "hash-request-same-source",
|
|
"source_records_hash": "hash-response-same-source",
|
|
"response_excerpt": {"ok": "True", "sameSource": "True"},
|
|
"observed_at": "2026-05-02T00:00:00+00:00",
|
|
},
|
|
{
|
|
"tool_id": "mais_humana.mcp_transit.ledger",
|
|
"endpoint": "https://mcps-gateway.ami-app.workers.dev/v1/execute",
|
|
"http_status": 200,
|
|
"ok": True,
|
|
"trace_id": "trace:mcps-gateway:actor:mais_humana.mcp_transit.ledger",
|
|
"audit_id": "audit:mcps-gateway:actor:mais_humana.mcp_transit.ledger",
|
|
"evidence_id": "evidence-ledger",
|
|
"source_payload_hash": "hash-request-ledger",
|
|
"source_records_hash": "hash-response-ledger",
|
|
"response_excerpt": {"ok": "True", "records": "3"},
|
|
"observed_at": "2026-05-02T00:00:00+00:00",
|
|
},
|
|
]
|
|
}
|
|
}
|
|
|
|
|
|
class McpGatewayAccessPolicyTests(unittest.TestCase):
|
|
def test_policy_from_publication_gate_payload_passes_without_secret_leak(self) -> None:
|
|
probes = probes_from_publication_gate_payload(publication_gate_payload())
|
|
report = build_access_policy_report(probes=probes)
|
|
self.assertEqual(len(report.probes), 3)
|
|
self.assertEqual(report.status, AccessPolicyStatus.PASSED)
|
|
self.assertTrue(report.live_ready)
|
|
self.assertTrue(report.secret_safe)
|
|
self.assertFalse(report.blockers)
|
|
by_rule = {check.rule_id: check for check in report.checks}
|
|
self.assertEqual(by_rule["auth.bearer.present-redacted"].status, AccessPolicyStatus.PASSED)
|
|
self.assertEqual(by_rule["redaction.no-secret-shapes"].status, AccessPolicyStatus.PASSED)
|
|
|
|
def test_secret_shapes_block_redaction_rule(self) -> None:
|
|
self.assertTrue(has_secret_shape("Authorization: Bearer rawtoken123456"))
|
|
self.assertTrue(has_secret_shape("cfat_abc123"))
|
|
payload = publication_gate_payload()
|
|
live_probes = payload["report"]["live_probes"] # type: ignore[index]
|
|
live_probes[0]["response_excerpt"] = {"authorization": "Bearer rawtoken123456"} # type: ignore[index]
|
|
report = build_access_policy_report(probes=probes_from_publication_gate_payload(payload))
|
|
by_rule = {check.rule_id: check for check in report.checks}
|
|
self.assertEqual(by_rule["redaction.no-secret-shapes"].status, AccessPolicyStatus.BLOCKED)
|
|
|
|
def test_run_access_policy_gate_writes_project_and_central_artifacts(self) -> None:
|
|
tmp = make_tmp()
|
|
project = tmp / "tudo-para-ia-mais-humana"
|
|
central = tmp / "central" / "projects" / "15_repo_tudo-para-ia-mais-humana-platform"
|
|
gate_json = project / "dados" / "mcp-publication-gate-mais-humana.json"
|
|
gate_json.parent.mkdir(parents=True, exist_ok=True)
|
|
gate_json.write_text(json.dumps(publication_gate_payload()), encoding="utf-8")
|
|
report, records = run_access_policy_gate(project_root=project, central_platform_folder=central, publication_gate_json=gate_json)
|
|
self.assertEqual(report.status, AccessPolicyStatus.PASSED)
|
|
self.assertTrue((project / "dados" / "mcp-gateway-access-policy.json").exists())
|
|
self.assertTrue((project / "matrizes" / "mcp-gateway-access-policy.csv").exists())
|
|
self.assertTrue((central / "reports" / "MCP-GATEWAY-ACCESS-POLICY__RODADA015.md").exists())
|
|
self.assertGreaterEqual(len(records), 4)
|
|
|
|
def test_cli_access_policy_writes_payload(self) -> None:
|
|
tmp = make_tmp()
|
|
project = tmp / "tudo-para-ia-mais-humana"
|
|
gate_json = project / "dados" / "mcp-publication-gate-mais-humana.json"
|
|
gate_json.parent.mkdir(parents=True, exist_ok=True)
|
|
gate_json.write_text(json.dumps(publication_gate_payload()), encoding="utf-8")
|
|
code = main(
|
|
[
|
|
"mcp-access-policy",
|
|
"--project-root",
|
|
str(project),
|
|
"--publication-gate-json",
|
|
str(gate_json),
|
|
]
|
|
)
|
|
self.assertEqual(code, 0)
|
|
payload = json.loads((project / "dados" / "mcp-gateway-access-policy.json").read_text(encoding="utf-8"))
|
|
self.assertEqual(payload["status"], "passed")
|
|
self.assertTrue(payload["secretSafe"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|