208 lines
6.3 KiB
Python
208 lines
6.3 KiB
Python
"""Generate a compact Python acceptance catalog for MCP admin routes."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
SRC = ROOT / "src"
|
|
OUTPUT = SRC / "mais_humana" / "generated_mcp_admin_route_acceptance.py"
|
|
|
|
if str(SRC) not in sys.path:
|
|
sys.path.insert(0, str(SRC))
|
|
|
|
from mais_humana.mcp_contract import McpContractKind, iter_contracts, stable_hash # noqa: E402
|
|
|
|
|
|
def _operation(contract_id: str) -> str:
|
|
parts = contract_id.split(".")
|
|
if len(parts) >= 4 and parts[-1] == "administration-route":
|
|
return parts[-2]
|
|
return "unknown"
|
|
|
|
|
|
def _permission(policy_tags: tuple[str, ...]) -> str:
|
|
for tag in reversed(policy_tags):
|
|
if tag.startswith("mcp.admin."):
|
|
return tag
|
|
return ""
|
|
|
|
|
|
def _case_status(row: dict[str, Any]) -> tuple[str, tuple[str, ...]]:
|
|
blockers: list[str] = []
|
|
required_transit = tuple(row["requiredTransitFields"])
|
|
required_payload = tuple(row["requiredPayloadFields"])
|
|
for field in (
|
|
"origin",
|
|
"destination",
|
|
"tool",
|
|
"payload",
|
|
"actor",
|
|
"permission",
|
|
"result",
|
|
"traceId",
|
|
"auditId",
|
|
"timestamp",
|
|
):
|
|
if field not in required_transit:
|
|
blockers.append(f"missing_transit:{field}")
|
|
for field in (
|
|
"adminRouteId",
|
|
"adminRouteKind",
|
|
"sourceEndpoint",
|
|
"sourceToolId",
|
|
"sourcePayloadHash",
|
|
"sourceRecordsHash",
|
|
"truthState",
|
|
"panelReady",
|
|
"gptExplainable",
|
|
"humanNextAction",
|
|
"permissionScope",
|
|
"mcpOnlyAdministration",
|
|
):
|
|
if field not in required_payload:
|
|
blockers.append(f"missing_payload:{field}")
|
|
if not row["sameSource"]:
|
|
blockers.append("same_source_false")
|
|
if not row["permissionScope"]:
|
|
blockers.append("permission_scope_missing")
|
|
if row["operation"] == "acao" and not (row["approvalRequired"] or row["dryRunSupported"]):
|
|
blockers.append("mutable_action_without_approval_or_dry_run")
|
|
if blockers:
|
|
return "blocked", tuple(blockers)
|
|
if row["maturityLevel"] < 8:
|
|
return "partial", ("maturity_below_8",)
|
|
return "ready", ()
|
|
|
|
|
|
def _case_from_contract(contract: Any) -> dict[str, Any]:
|
|
operation = _operation(contract.contract_id)
|
|
permission = _permission(tuple(contract.policy_tags))
|
|
required_payload = tuple(contract.required_payload_fields)
|
|
row = {
|
|
"routeId": contract.contract_id,
|
|
"platformId": contract.platform_id,
|
|
"profileId": contract.profile_id,
|
|
"operation": operation,
|
|
"toolId": contract.tool_id,
|
|
"sourceToolId": contract.source_tool_id,
|
|
"permissionScope": permission,
|
|
"sourceEndpoint": contract.source_endpoint,
|
|
"sourcePayloadHash": contract.source_payload_hash,
|
|
"sourceRecordsHash": contract.source_records_hash,
|
|
"evidenceId": f"evidence-{contract.source_records_hash[:24]}",
|
|
"truthState": contract.truth_state.value,
|
|
"panelReady": bool(contract.panel_ready),
|
|
"gptExplainable": bool(contract.gpt_explainable),
|
|
"sameSource": bool(contract.same_source_ready),
|
|
"approvalRequired": "approvalRequired" in required_payload,
|
|
"dryRunSupported": "dryRunSupported" in required_payload,
|
|
"requiredTransitFields": tuple(contract.required_transit_fields),
|
|
"requiredPayloadFields": required_payload,
|
|
"validationSteps": tuple(contract.validation_steps),
|
|
"redactionRequirements": tuple(contract.redaction_requirements),
|
|
"pendingIfMissing": contract.pending_if_missing,
|
|
"orderIds": tuple(contract.order_ids),
|
|
"policyTags": tuple(contract.policy_tags),
|
|
"maturityLevel": int(contract.maturity_level),
|
|
}
|
|
status, blockers = _case_status(row)
|
|
row["status"] = status
|
|
row["blockerReasons"] = blockers
|
|
return row
|
|
|
|
|
|
def _format_value(value: Any) -> str:
|
|
return repr(value)
|
|
|
|
|
|
def _render_case(row: dict[str, Any]) -> list[str]:
|
|
lines = [" {"]
|
|
for key in (
|
|
"routeId",
|
|
"platformId",
|
|
"profileId",
|
|
"operation",
|
|
"toolId",
|
|
"sourceToolId",
|
|
"permissionScope",
|
|
"sourceEndpoint",
|
|
"sourcePayloadHash",
|
|
"sourceRecordsHash",
|
|
"evidenceId",
|
|
"truthState",
|
|
"panelReady",
|
|
"gptExplainable",
|
|
"sameSource",
|
|
"approvalRequired",
|
|
"dryRunSupported",
|
|
"requiredTransitFields",
|
|
"requiredPayloadFields",
|
|
"validationSteps",
|
|
"redactionRequirements",
|
|
"pendingIfMissing",
|
|
"orderIds",
|
|
"policyTags",
|
|
"maturityLevel",
|
|
"status",
|
|
"blockerReasons",
|
|
):
|
|
lines.append(f" {key!r}: {_format_value(row[key])},")
|
|
lines.append(" },")
|
|
return lines
|
|
|
|
|
|
def build_cases() -> tuple[dict[str, Any], ...]:
|
|
contracts = [contract for contract in iter_contracts() if contract.kind == McpContractKind.ADMINISTRATION_ROUTE]
|
|
rows = [_case_from_contract(contract) for contract in sorted(contracts, key=lambda item: item.contract_id)]
|
|
return tuple(rows)
|
|
|
|
|
|
def render_module(cases: tuple[dict[str, Any], ...]) -> str:
|
|
source_hash = stable_hash(cases)
|
|
lines = [
|
|
'"""Generated MCP administration route acceptance catalog.',
|
|
"",
|
|
"Do not edit by hand. Regenerate with:",
|
|
"python tools/generate_mcp_admin_route_acceptance.py",
|
|
'"""',
|
|
"",
|
|
"from __future__ import annotations",
|
|
"",
|
|
f"SOURCE_HASH = {source_hash!r}",
|
|
f"CASES_COUNT = {len(cases)!r}",
|
|
"",
|
|
"ADMIN_ROUTE_ACCEPTANCE_CASES = (",
|
|
]
|
|
for row in cases:
|
|
lines.extend(_render_case(row))
|
|
lines.extend([")", ""])
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main() -> int:
|
|
cases = build_cases()
|
|
OUTPUT.parent.mkdir(parents=True, exist_ok=True)
|
|
OUTPUT.write_text(render_module(cases), encoding="utf-8")
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"output": str(OUTPUT),
|
|
"cases": len(cases),
|
|
"sourceHash": stable_hash(cases),
|
|
},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
sort_keys=True,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|