Files
tudo-para-ia-mais-humana-pl…/tools/generate_mcp_admin_route_acceptance.py

208 lines
6.3 KiB
Python

"""Generate a compact Python acceptance catalog for MCP admin routes."""
from __future__ import annotations
import json
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
OUTPUT = SRC / "mais_humana" / "generated_mcp_admin_route_acceptance.py"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
from mais_humana.mcp_contract import McpContractKind, iter_contracts, stable_hash # noqa: E402
def _operation(contract_id: str) -> str:
parts = contract_id.split(".")
if len(parts) >= 4 and parts[-1] == "administration-route":
return parts[-2]
return "unknown"
def _permission(policy_tags: tuple[str, ...]) -> str:
for tag in reversed(policy_tags):
if tag.startswith("mcp.admin."):
return tag
return ""
def _case_status(row: dict[str, Any]) -> tuple[str, tuple[str, ...]]:
blockers: list[str] = []
required_transit = tuple(row["requiredTransitFields"])
required_payload = tuple(row["requiredPayloadFields"])
for field in (
"origin",
"destination",
"tool",
"payload",
"actor",
"permission",
"result",
"traceId",
"auditId",
"timestamp",
):
if field not in required_transit:
blockers.append(f"missing_transit:{field}")
for field in (
"adminRouteId",
"adminRouteKind",
"sourceEndpoint",
"sourceToolId",
"sourcePayloadHash",
"sourceRecordsHash",
"truthState",
"panelReady",
"gptExplainable",
"humanNextAction",
"permissionScope",
"mcpOnlyAdministration",
):
if field not in required_payload:
blockers.append(f"missing_payload:{field}")
if not row["sameSource"]:
blockers.append("same_source_false")
if not row["permissionScope"]:
blockers.append("permission_scope_missing")
if row["operation"] == "acao" and not (row["approvalRequired"] or row["dryRunSupported"]):
blockers.append("mutable_action_without_approval_or_dry_run")
if blockers:
return "blocked", tuple(blockers)
if row["maturityLevel"] < 8:
return "partial", ("maturity_below_8",)
return "ready", ()
def _case_from_contract(contract: Any) -> dict[str, Any]:
operation = _operation(contract.contract_id)
permission = _permission(tuple(contract.policy_tags))
required_payload = tuple(contract.required_payload_fields)
row = {
"routeId": contract.contract_id,
"platformId": contract.platform_id,
"profileId": contract.profile_id,
"operation": operation,
"toolId": contract.tool_id,
"sourceToolId": contract.source_tool_id,
"permissionScope": permission,
"sourceEndpoint": contract.source_endpoint,
"sourcePayloadHash": contract.source_payload_hash,
"sourceRecordsHash": contract.source_records_hash,
"evidenceId": f"evidence-{contract.source_records_hash[:24]}",
"truthState": contract.truth_state.value,
"panelReady": bool(contract.panel_ready),
"gptExplainable": bool(contract.gpt_explainable),
"sameSource": bool(contract.same_source_ready),
"approvalRequired": "approvalRequired" in required_payload,
"dryRunSupported": "dryRunSupported" in required_payload,
"requiredTransitFields": tuple(contract.required_transit_fields),
"requiredPayloadFields": required_payload,
"validationSteps": tuple(contract.validation_steps),
"redactionRequirements": tuple(contract.redaction_requirements),
"pendingIfMissing": contract.pending_if_missing,
"orderIds": tuple(contract.order_ids),
"policyTags": tuple(contract.policy_tags),
"maturityLevel": int(contract.maturity_level),
}
status, blockers = _case_status(row)
row["status"] = status
row["blockerReasons"] = blockers
return row
def _format_value(value: Any) -> str:
return repr(value)
def _render_case(row: dict[str, Any]) -> list[str]:
lines = [" {"]
for key in (
"routeId",
"platformId",
"profileId",
"operation",
"toolId",
"sourceToolId",
"permissionScope",
"sourceEndpoint",
"sourcePayloadHash",
"sourceRecordsHash",
"evidenceId",
"truthState",
"panelReady",
"gptExplainable",
"sameSource",
"approvalRequired",
"dryRunSupported",
"requiredTransitFields",
"requiredPayloadFields",
"validationSteps",
"redactionRequirements",
"pendingIfMissing",
"orderIds",
"policyTags",
"maturityLevel",
"status",
"blockerReasons",
):
lines.append(f" {key!r}: {_format_value(row[key])},")
lines.append(" },")
return lines
def build_cases() -> tuple[dict[str, Any], ...]:
contracts = [contract for contract in iter_contracts() if contract.kind == McpContractKind.ADMINISTRATION_ROUTE]
rows = [_case_from_contract(contract) for contract in sorted(contracts, key=lambda item: item.contract_id)]
return tuple(rows)
def render_module(cases: tuple[dict[str, Any], ...]) -> str:
source_hash = stable_hash(cases)
lines = [
'"""Generated MCP administration route acceptance catalog.',
"",
"Do not edit by hand. Regenerate with:",
"python tools/generate_mcp_admin_route_acceptance.py",
'"""',
"",
"from __future__ import annotations",
"",
f"SOURCE_HASH = {source_hash!r}",
f"CASES_COUNT = {len(cases)!r}",
"",
"ADMIN_ROUTE_ACCEPTANCE_CASES = (",
]
for row in cases:
lines.extend(_render_case(row))
lines.extend([")", ""])
return "\n".join(lines)
def main() -> int:
cases = build_cases()
OUTPUT.parent.mkdir(parents=True, exist_ok=True)
OUTPUT.write_text(render_module(cases), encoding="utf-8")
print(
json.dumps(
{
"output": str(OUTPUT),
"cases": len(cases),
"sourceHash": stable_hash(cases),
},
ensure_ascii=False,
indent=2,
sort_keys=True,
)
)
return 0
if __name__ == "__main__":
raise SystemExit(main())