"""Generate a compact Python acceptance catalog for MCP admin routes.""" from __future__ import annotations import json import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] SRC = ROOT / "src" OUTPUT = SRC / "mais_humana" / "generated_mcp_admin_route_acceptance.py" if str(SRC) not in sys.path: sys.path.insert(0, str(SRC)) from mais_humana.mcp_contract import McpContractKind, iter_contracts, stable_hash # noqa: E402 def _operation(contract_id: str) -> str: parts = contract_id.split(".") if len(parts) >= 4 and parts[-1] == "administration-route": return parts[-2] return "unknown" def _permission(policy_tags: tuple[str, ...]) -> str: for tag in reversed(policy_tags): if tag.startswith("mcp.admin."): return tag return "" def _case_status(row: dict[str, Any]) -> tuple[str, tuple[str, ...]]: blockers: list[str] = [] required_transit = tuple(row["requiredTransitFields"]) required_payload = tuple(row["requiredPayloadFields"]) for field in ( "origin", "destination", "tool", "payload", "actor", "permission", "result", "traceId", "auditId", "timestamp", ): if field not in required_transit: blockers.append(f"missing_transit:{field}") for field in ( "adminRouteId", "adminRouteKind", "sourceEndpoint", "sourceToolId", "sourcePayloadHash", "sourceRecordsHash", "truthState", "panelReady", "gptExplainable", "humanNextAction", "permissionScope", "mcpOnlyAdministration", ): if field not in required_payload: blockers.append(f"missing_payload:{field}") if not row["sameSource"]: blockers.append("same_source_false") if not row["permissionScope"]: blockers.append("permission_scope_missing") if row["operation"] == "acao" and not (row["approvalRequired"] or row["dryRunSupported"]): blockers.append("mutable_action_without_approval_or_dry_run") if blockers: return "blocked", tuple(blockers) if row["maturityLevel"] < 8: return "partial", ("maturity_below_8",) return "ready", () def _case_from_contract(contract: Any) -> dict[str, Any]: operation = _operation(contract.contract_id) permission = _permission(tuple(contract.policy_tags)) required_payload = tuple(contract.required_payload_fields) row = { "routeId": contract.contract_id, "platformId": contract.platform_id, "profileId": contract.profile_id, "operation": operation, "toolId": contract.tool_id, "sourceToolId": contract.source_tool_id, "permissionScope": permission, "sourceEndpoint": contract.source_endpoint, "sourcePayloadHash": contract.source_payload_hash, "sourceRecordsHash": contract.source_records_hash, "evidenceId": f"evidence-{contract.source_records_hash[:24]}", "truthState": contract.truth_state.value, "panelReady": bool(contract.panel_ready), "gptExplainable": bool(contract.gpt_explainable), "sameSource": bool(contract.same_source_ready), "approvalRequired": "approvalRequired" in required_payload, "dryRunSupported": "dryRunSupported" in required_payload, "requiredTransitFields": tuple(contract.required_transit_fields), "requiredPayloadFields": required_payload, "validationSteps": tuple(contract.validation_steps), "redactionRequirements": tuple(contract.redaction_requirements), "pendingIfMissing": contract.pending_if_missing, "orderIds": tuple(contract.order_ids), "policyTags": tuple(contract.policy_tags), "maturityLevel": int(contract.maturity_level), } status, blockers = _case_status(row) row["status"] = status row["blockerReasons"] = blockers return row def _format_value(value: Any) -> str: return repr(value) def _render_case(row: dict[str, Any]) -> list[str]: lines = [" {"] for key in ( "routeId", "platformId", "profileId", "operation", "toolId", "sourceToolId", "permissionScope", "sourceEndpoint", "sourcePayloadHash", "sourceRecordsHash", "evidenceId", "truthState", "panelReady", "gptExplainable", "sameSource", "approvalRequired", "dryRunSupported", "requiredTransitFields", "requiredPayloadFields", "validationSteps", "redactionRequirements", "pendingIfMissing", "orderIds", "policyTags", "maturityLevel", "status", "blockerReasons", ): lines.append(f" {key!r}: {_format_value(row[key])},") lines.append(" },") return lines def build_cases() -> tuple[dict[str, Any], ...]: contracts = [contract for contract in iter_contracts() if contract.kind == McpContractKind.ADMINISTRATION_ROUTE] rows = [_case_from_contract(contract) for contract in sorted(contracts, key=lambda item: item.contract_id)] return tuple(rows) def render_module(cases: tuple[dict[str, Any], ...]) -> str: source_hash = stable_hash(cases) lines = [ '"""Generated MCP administration route acceptance catalog.', "", "Do not edit by hand. Regenerate with:", "python tools/generate_mcp_admin_route_acceptance.py", '"""', "", "from __future__ import annotations", "", f"SOURCE_HASH = {source_hash!r}", f"CASES_COUNT = {len(cases)!r}", "", "ADMIN_ROUTE_ACCEPTANCE_CASES = (", ] for row in cases: lines.extend(_render_case(row)) lines.extend([")", ""]) return "\n".join(lines) def main() -> int: cases = build_cases() OUTPUT.parent.mkdir(parents=True, exist_ok=True) OUTPUT.write_text(render_module(cases), encoding="utf-8") print( json.dumps( { "output": str(OUTPUT), "cases": len(cases), "sourceHash": stable_hash(cases), }, ensure_ascii=False, indent=2, sort_keys=True, ) ) return 0 if __name__ == "__main__": raise SystemExit(main())