auto-sync: tudo-para-ia-mais-humana 2026-05-01 23:21:24
This commit is contained in:
124
tests/test_mcp_gateway_access_policy.py
Normal file
124
tests/test_mcp_gateway_access_policy.py
Normal file
@@ -0,0 +1,124 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import unittest
|
||||
|
||||
from mais_humana.cli import main
|
||||
from mais_humana.mcp_gateway_access_policy import (
|
||||
AccessPolicyStatus,
|
||||
build_access_policy_report,
|
||||
has_secret_shape,
|
||||
probes_from_publication_gate_payload,
|
||||
run_access_policy_gate,
|
||||
)
|
||||
from tests.helpers import make_tmp
|
||||
|
||||
|
||||
def publication_gate_payload() -> dict[str, object]:
|
||||
return {
|
||||
"report": {
|
||||
"live_probes": [
|
||||
{
|
||||
"tool_id": "mais_humana.rulebook.compact",
|
||||
"endpoint": "https://mcps-gateway.ami-app.workers.dev/v1/execute",
|
||||
"http_status": 200,
|
||||
"ok": True,
|
||||
"trace_id": "trace:mcps-gateway:actor:mais_humana.rulebook.compact",
|
||||
"audit_id": "audit:mcps-gateway:actor:mais_humana.rulebook.compact",
|
||||
"evidence_id": "evidence-rulebook",
|
||||
"source_payload_hash": "hash-request-rulebook",
|
||||
"source_records_hash": "hash-response-rulebook",
|
||||
"response_excerpt": {"ok": "True", "providerId": "mais_humana"},
|
||||
"observed_at": "2026-05-02T00:00:00+00:00",
|
||||
},
|
||||
{
|
||||
"tool_id": "mais_humana.admin_ui.same_source",
|
||||
"endpoint": "https://mcps-gateway.ami-app.workers.dev/v1/execute",
|
||||
"http_status": 200,
|
||||
"ok": True,
|
||||
"trace_id": "trace:mcps-gateway:actor:mais_humana.admin_ui.same_source",
|
||||
"audit_id": "audit:mcps-gateway:actor:mais_humana.admin_ui.same_source",
|
||||
"evidence_id": "evidence-same-source",
|
||||
"source_payload_hash": "hash-request-same-source",
|
||||
"source_records_hash": "hash-response-same-source",
|
||||
"response_excerpt": {"ok": "True", "sameSource": "True"},
|
||||
"observed_at": "2026-05-02T00:00:00+00:00",
|
||||
},
|
||||
{
|
||||
"tool_id": "mais_humana.mcp_transit.ledger",
|
||||
"endpoint": "https://mcps-gateway.ami-app.workers.dev/v1/execute",
|
||||
"http_status": 200,
|
||||
"ok": True,
|
||||
"trace_id": "trace:mcps-gateway:actor:mais_humana.mcp_transit.ledger",
|
||||
"audit_id": "audit:mcps-gateway:actor:mais_humana.mcp_transit.ledger",
|
||||
"evidence_id": "evidence-ledger",
|
||||
"source_payload_hash": "hash-request-ledger",
|
||||
"source_records_hash": "hash-response-ledger",
|
||||
"response_excerpt": {"ok": "True", "records": "3"},
|
||||
"observed_at": "2026-05-02T00:00:00+00:00",
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class McpGatewayAccessPolicyTests(unittest.TestCase):
|
||||
def test_policy_from_publication_gate_payload_passes_without_secret_leak(self) -> None:
|
||||
probes = probes_from_publication_gate_payload(publication_gate_payload())
|
||||
report = build_access_policy_report(probes=probes)
|
||||
self.assertEqual(len(report.probes), 3)
|
||||
self.assertEqual(report.status, AccessPolicyStatus.PASSED)
|
||||
self.assertTrue(report.live_ready)
|
||||
self.assertTrue(report.secret_safe)
|
||||
self.assertFalse(report.blockers)
|
||||
by_rule = {check.rule_id: check for check in report.checks}
|
||||
self.assertEqual(by_rule["auth.bearer.present-redacted"].status, AccessPolicyStatus.PASSED)
|
||||
self.assertEqual(by_rule["redaction.no-secret-shapes"].status, AccessPolicyStatus.PASSED)
|
||||
|
||||
def test_secret_shapes_block_redaction_rule(self) -> None:
|
||||
self.assertTrue(has_secret_shape("Authorization: Bearer rawtoken123456"))
|
||||
self.assertTrue(has_secret_shape("cfat_abc123"))
|
||||
payload = publication_gate_payload()
|
||||
live_probes = payload["report"]["live_probes"] # type: ignore[index]
|
||||
live_probes[0]["response_excerpt"] = {"authorization": "Bearer rawtoken123456"} # type: ignore[index]
|
||||
report = build_access_policy_report(probes=probes_from_publication_gate_payload(payload))
|
||||
by_rule = {check.rule_id: check for check in report.checks}
|
||||
self.assertEqual(by_rule["redaction.no-secret-shapes"].status, AccessPolicyStatus.BLOCKED)
|
||||
|
||||
def test_run_access_policy_gate_writes_project_and_central_artifacts(self) -> None:
|
||||
tmp = make_tmp()
|
||||
project = tmp / "tudo-para-ia-mais-humana"
|
||||
central = tmp / "central" / "projects" / "15_repo_tudo-para-ia-mais-humana-platform"
|
||||
gate_json = project / "dados" / "mcp-publication-gate-mais-humana.json"
|
||||
gate_json.parent.mkdir(parents=True, exist_ok=True)
|
||||
gate_json.write_text(json.dumps(publication_gate_payload()), encoding="utf-8")
|
||||
report, records = run_access_policy_gate(project_root=project, central_platform_folder=central, publication_gate_json=gate_json)
|
||||
self.assertEqual(report.status, AccessPolicyStatus.PASSED)
|
||||
self.assertTrue((project / "dados" / "mcp-gateway-access-policy.json").exists())
|
||||
self.assertTrue((project / "matrizes" / "mcp-gateway-access-policy.csv").exists())
|
||||
self.assertTrue((central / "reports" / "MCP-GATEWAY-ACCESS-POLICY__RODADA015.md").exists())
|
||||
self.assertGreaterEqual(len(records), 4)
|
||||
|
||||
def test_cli_access_policy_writes_payload(self) -> None:
|
||||
tmp = make_tmp()
|
||||
project = tmp / "tudo-para-ia-mais-humana"
|
||||
gate_json = project / "dados" / "mcp-publication-gate-mais-humana.json"
|
||||
gate_json.parent.mkdir(parents=True, exist_ok=True)
|
||||
gate_json.write_text(json.dumps(publication_gate_payload()), encoding="utf-8")
|
||||
code = main(
|
||||
[
|
||||
"mcp-access-policy",
|
||||
"--project-root",
|
||||
str(project),
|
||||
"--publication-gate-json",
|
||||
str(gate_json),
|
||||
]
|
||||
)
|
||||
self.assertEqual(code, 0)
|
||||
payload = json.loads((project / "dados" / "mcp-gateway-access-policy.json").read_text(encoding="utf-8"))
|
||||
self.assertEqual(payload["status"], "passed")
|
||||
self.assertTrue(payload["secretSafe"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -113,6 +113,15 @@ class McpProviderContractTests(unittest.TestCase):
|
||||
self.assertGreater(len(ui_screens), 20)
|
||||
self.assertTrue(all(contract.report_model_id for contract in report_models))
|
||||
|
||||
def test_access_policy_contracts_cover_profiles_and_gateway_rules(self) -> None:
|
||||
access_contracts = contracts_for_kind(McpContractKind.ACCESS_POLICY)
|
||||
self.assertGreater(len(access_contracts), 500)
|
||||
sample = access_contracts[0]
|
||||
self.assertIn("authorizationCredentialRef", sample.required_payload_fields)
|
||||
self.assertIn("pluginCloudflareDiagnosticIgnored", sample.required_payload_fields)
|
||||
self.assertTrue(any("WAF" in step or "waf" in step for step in sample.validation_steps))
|
||||
self.assertTrue(all("0045_GERENCIAL__pactuar-politica-acesso-waf-gpt-mcp-gateway" in item.order_ids for item in access_contracts))
|
||||
|
||||
def test_cli_mcp_provider_returns_json(self) -> None:
|
||||
root = make_tmp()
|
||||
self.make_repo(
|
||||
|
||||
Reference in New Issue
Block a user