diff --git a/src/mais_humana/cli.py b/src/mais_humana/cli.py index a0f13b0..f83b740 100644 --- a/src/mais_humana/cli.py +++ b/src/mais_humana/cli.py @@ -20,6 +20,7 @@ from .mcp_contract import ( same_source_validation_payload, ui_renderer_policy_markdown, ) +from .mcp_admin_route_acceptance import run_admin_route_acceptance from .mcp_gateway_access_policy import run_access_policy_gate from .mcp_publication_gate import run_publication_gate from .mcp_transit_ledger import build_mcp_transit_ledger, mcp_transit_csv, mcp_transit_ledger_compact_json, mcp_transit_markdown @@ -128,6 +129,14 @@ def build_parser() -> argparse.ArgumentParser: sync_audit.add_argument("--central-repo-root", default="G:/_codex-git/nucleo-gestao-operacional") sync_audit.add_argument("--central-platform-folder", default="") sync_audit.add_argument("--fetch", action="store_true") + admin_acceptance = sub.add_parser("mcp-admin-route-acceptance", help="Write MCP-only administration route acceptance artifacts.") + admin_acceptance.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") + admin_acceptance.add_argument("--central-platform-folder", default="") + admin_acceptance.add_argument("--platform-id", default="") + admin_acceptance.add_argument("--profile-id", default="") + admin_acceptance.add_argument("--operation", default="") + admin_acceptance.add_argument("--status", default="") + admin_acceptance.add_argument("--limit", type=int, default=120) return parser @@ -540,6 +549,25 @@ def command_targeted_sync_audit(args: argparse.Namespace) -> int: return 0 +def command_mcp_admin_route_acceptance(args: argparse.Namespace) -> int: + central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None + report, records = run_admin_route_acceptance( + project_root=Path(args.project_root), + central_platform_folder=central_platform_folder, + platform_id=args.platform_id or None, + profile_id=args.profile_id or None, + operation=args.operation or None, + status=args.status or None, + limit=int(args.limit), + ) + payload = { + "report": report.to_dict(), + "generatedFiles": [record.path for record in records], + } + print(json.dumps(payload, ensure_ascii=False, indent=2)) + return 0 + + def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) @@ -579,6 +607,8 @@ def main(argv: list[str] | None = None) -> int: return command_workspace_hygiene(args) if args.command == "targeted-sync-audit": return command_targeted_sync_audit(args) + if args.command == "mcp-admin-route-acceptance": + return command_mcp_admin_route_acceptance(args) parser.error(f"unknown command: {args.command}") return 2 diff --git a/src/mais_humana/mcp_admin_route_acceptance.py b/src/mais_humana/mcp_admin_route_acceptance.py new file mode 100644 index 0000000..9acdbef --- /dev/null +++ b/src/mais_humana/mcp_admin_route_acceptance.py @@ -0,0 +1,673 @@ +"""Acceptance catalog for MCP-only administration routes. + +The control contracts already define the administrative routes that must pass +through ``tudo-para-ia-mcps-internos-plataform``. This module turns the +generated contracts into a compact, executable acceptance layer: the gateway, +GPT, UI renderer, central reports, and service-order closeout can all read the +same case list without traversing the large contract object graph. +""" + +from __future__ import annotations + +import csv +import io +import json +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Any, Iterable, Mapping, Sequence + +from .mcp_contract import MCP_CONTROL_PLANE_ID, PROVIDER_ID, stable_hash +from .models import GeneratedFile, as_plain_data, merge_unique, utc_now +from .redaction import redact_sensitive_text + + +try: + from .generated_mcp_admin_route_acceptance import ( + ADMIN_ROUTE_ACCEPTANCE_CASES as GENERATED_ADMIN_ROUTE_ACCEPTANCE_CASES, + SOURCE_HASH as GENERATED_SOURCE_HASH, + ) +except ImportError: # pragma: no cover - exercised before the generator runs. + GENERATED_ADMIN_ROUTE_ACCEPTANCE_CASES: tuple[Mapping[str, Any], ...] = () + GENERATED_SOURCE_HASH = "" + + +class AdminRouteAcceptanceStatus(str, Enum): + """Acceptance status for one generated route case.""" + + READY = "ready" + PARTIAL = "partial" + BLOCKED = "blocked" + + +class AdminRouteOperation(str, Enum): + """Canonical administration operation names.""" + + CONSULTA = "consulta" + DIAGNOSTICO = "diagnostico" + ACAO = "acao" + AUDITORIA = "auditoria" + EXPLICACAO = "explicacao" + + +MANDATORY_TRANSIT_FIELDS = ( + "origin", + "destination", + "tool", + "payload", + "actor", + "permission", + "result", + "traceId", + "auditId", + "timestamp", +) + +MANDATORY_PAYLOAD_FIELDS = ( + "adminRouteId", + "adminRouteKind", + "sourceEndpoint", + "sourceToolId", + "sourcePayloadHash", + "sourceRecordsHash", + "truthState", + "panelReady", + "gptExplainable", + "humanNextAction", + "permissionScope", + "mcpOnlyAdministration", +) + + +@dataclass(frozen=True, slots=True) +class AdminRouteAcceptanceCase: + """One MCP-only administration route acceptance case.""" + + route_id: str + platform_id: str + profile_id: str + operation: str + tool_id: str + source_tool_id: str + permission_scope: str + source_endpoint: str + source_payload_hash: str + source_records_hash: str + evidence_id: str + truth_state: str + panel_ready: bool + gpt_explainable: bool + same_source: bool + approval_required: bool + dry_run_supported: bool + required_transit_fields: tuple[str, ...] + required_payload_fields: tuple[str, ...] + validation_steps: tuple[str, ...] + redaction_requirements: tuple[str, ...] + pending_if_missing: str + order_ids: tuple[str, ...] + policy_tags: tuple[str, ...] + maturity_level: int + status: AdminRouteAcceptanceStatus + blocker_reasons: tuple[str, ...] + + @property + def route_key(self) -> str: + return f"{self.platform_id}/{self.profile_id}/{self.operation}" + + @property + def ready(self) -> bool: + return self.status == AdminRouteAcceptanceStatus.READY + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class AdminRouteAcceptanceSummary: + """Aggregated acceptance result for all generated administration routes.""" + + generated_at: str + provider_id: str + control_plane_id: str + source_hash: str + total_cases: int + ready_cases: int + partial_cases: int + blocked_cases: int + platforms_count: int + profiles_count: int + operations: Mapping[str, int] + platforms: Mapping[str, int] + blockers: tuple[str, ...] + evidence_id: str + + @property + def status(self) -> AdminRouteAcceptanceStatus: + if self.blocked_cases: + return AdminRouteAcceptanceStatus.BLOCKED + if self.partial_cases: + return AdminRouteAcceptanceStatus.PARTIAL + return AdminRouteAcceptanceStatus.READY if self.total_cases else AdminRouteAcceptanceStatus.BLOCKED + + @property + def ready_ratio(self) -> float: + if not self.total_cases: + return 0.0 + return self.ready_cases / self.total_cases + + def to_dict(self) -> dict[str, Any]: + data = as_plain_data(self) + data["status"] = self.status.value + data["readyRatio"] = self.ready_ratio + return data + + +@dataclass(frozen=True, slots=True) +class AdminRouteAcceptanceReport: + """Full acceptance report with a bounded sample and summary.""" + + report_id: str + generated_at: str + summary: AdminRouteAcceptanceSummary + returned_cases: tuple[AdminRouteAcceptanceCase, ...] + sample_limit: int + filters: Mapping[str, str] + generated_files: tuple[str, ...] = () + + @property + def status(self) -> AdminRouteAcceptanceStatus: + return self.summary.status + + def to_dict(self) -> dict[str, Any]: + data = as_plain_data(self) + data["status"] = self.status.value + return data + + +def _tuple(value: object) -> tuple[str, ...]: + if value is None: + return () + if isinstance(value, str): + return (value,) + if isinstance(value, Iterable): + return tuple(str(item) for item in value) + return (str(value),) + + +def _bool(value: object) -> bool: + if isinstance(value, bool): + return value + return str(value).strip().lower() in {"1", "true", "yes", "sim"} + + +def _status(value: object) -> AdminRouteAcceptanceStatus: + text = str(value or "").strip() + for status in AdminRouteAcceptanceStatus: + if status.value == text: + return status + return AdminRouteAcceptanceStatus.BLOCKED + + +def case_from_mapping(row: Mapping[str, Any]) -> AdminRouteAcceptanceCase: + """Convert generated plain data into a typed acceptance case.""" + + return AdminRouteAcceptanceCase( + route_id=str(row.get("routeId") or row.get("route_id") or ""), + platform_id=str(row.get("platformId") or row.get("platform_id") or ""), + profile_id=str(row.get("profileId") or row.get("profile_id") or ""), + operation=str(row.get("operation") or ""), + tool_id=str(row.get("toolId") or row.get("tool_id") or ""), + source_tool_id=str(row.get("sourceToolId") or row.get("source_tool_id") or ""), + permission_scope=str(row.get("permissionScope") or row.get("permission_scope") or ""), + source_endpoint=str(row.get("sourceEndpoint") or row.get("source_endpoint") or ""), + source_payload_hash=str(row.get("sourcePayloadHash") or row.get("source_payload_hash") or ""), + source_records_hash=str(row.get("sourceRecordsHash") or row.get("source_records_hash") or ""), + evidence_id=str(row.get("evidenceId") or row.get("evidence_id") or ""), + truth_state=str(row.get("truthState") or row.get("truth_state") or ""), + panel_ready=_bool(row.get("panelReady") or row.get("panel_ready")), + gpt_explainable=_bool(row.get("gptExplainable") or row.get("gpt_explainable")), + same_source=_bool(row.get("sameSource") or row.get("same_source")), + approval_required=_bool(row.get("approvalRequired") or row.get("approval_required")), + dry_run_supported=_bool(row.get("dryRunSupported") or row.get("dry_run_supported")), + required_transit_fields=_tuple(row.get("requiredTransitFields") or row.get("required_transit_fields")), + required_payload_fields=_tuple(row.get("requiredPayloadFields") or row.get("required_payload_fields")), + validation_steps=_tuple(row.get("validationSteps") or row.get("validation_steps")), + redaction_requirements=_tuple(row.get("redactionRequirements") or row.get("redaction_requirements")), + pending_if_missing=str(row.get("pendingIfMissing") or row.get("pending_if_missing") or ""), + order_ids=_tuple(row.get("orderIds") or row.get("order_ids")), + policy_tags=_tuple(row.get("policyTags") or row.get("policy_tags")), + maturity_level=int(row.get("maturityLevel") or row.get("maturity_level") or 0), + status=_status(row.get("status")), + blocker_reasons=_tuple(row.get("blockerReasons") or row.get("blocker_reasons")), + ) + + +def iter_admin_route_acceptance_cases( + *, + platform_id: str | None = None, + profile_id: str | None = None, + operation: str | None = None, + status: str | None = None, + limit: int | None = None, +) -> tuple[AdminRouteAcceptanceCase, ...]: + """Return generated acceptance cases, optionally filtered.""" + + output: list[AdminRouteAcceptanceCase] = [] + for raw in GENERATED_ADMIN_ROUTE_ACCEPTANCE_CASES: + case = case_from_mapping(raw) + if platform_id and case.platform_id != platform_id: + continue + if profile_id and case.profile_id != profile_id: + continue + if operation and case.operation != operation: + continue + if status and case.status.value != status: + continue + output.append(case) + if limit is not None and len(output) >= max(0, int(limit)): + break + return tuple(output) + + +def _count_by(cases: Sequence[AdminRouteAcceptanceCase], attr: str) -> dict[str, int]: + counts: dict[str, int] = {} + for case in cases: + value = str(getattr(case, attr)) + counts[value] = counts.get(value, 0) + 1 + return dict(sorted(counts.items())) + + +def summarize_admin_route_acceptance(cases: Sequence[AdminRouteAcceptanceCase] | None = None) -> AdminRouteAcceptanceSummary: + """Build a compact readiness summary for MCP-only administration routes.""" + + case_set = tuple(cases if cases is not None else iter_admin_route_acceptance_cases()) + blockers = merge_unique( + f"{case.route_id}:{reason}" + for case in case_set + for reason in case.blocker_reasons + if case.status != AdminRouteAcceptanceStatus.READY + ) + evidence_id = "evidence-" + stable_hash( + { + "sourceHash": GENERATED_SOURCE_HASH, + "caseCount": len(case_set), + "ready": sum(1 for case in case_set if case.status == AdminRouteAcceptanceStatus.READY), + "blockers": blockers, + } + )[:24] + return AdminRouteAcceptanceSummary( + generated_at=utc_now(), + provider_id=PROVIDER_ID, + control_plane_id=MCP_CONTROL_PLANE_ID, + source_hash=GENERATED_SOURCE_HASH, + total_cases=len(case_set), + ready_cases=sum(1 for case in case_set if case.status == AdminRouteAcceptanceStatus.READY), + partial_cases=sum(1 for case in case_set if case.status == AdminRouteAcceptanceStatus.PARTIAL), + blocked_cases=sum(1 for case in case_set if case.status == AdminRouteAcceptanceStatus.BLOCKED), + platforms_count=len({case.platform_id for case in case_set}), + profiles_count=len({case.profile_id for case in case_set}), + operations=_count_by(case_set, "operation"), + platforms=_count_by(case_set, "platform_id"), + blockers=blockers[:80], + evidence_id=evidence_id, + ) + + +def build_admin_route_acceptance_report( + *, + platform_id: str | None = None, + profile_id: str | None = None, + operation: str | None = None, + status: str | None = None, + limit: int = 80, +) -> AdminRouteAcceptanceReport: + """Build a report with full summary and bounded returned cases.""" + + all_filtered = iter_admin_route_acceptance_cases( + platform_id=platform_id, + profile_id=profile_id, + operation=operation, + status=status, + ) + returned = all_filtered[: max(0, int(limit))] + summary = summarize_admin_route_acceptance(all_filtered) + filter_payload = { + "platformId": platform_id or "", + "profileId": profile_id or "", + "operation": operation or "", + "status": status or "", + } + report_id = "mcp-admin-route-acceptance-" + stable_hash( + { + "summaryEvidence": summary.evidence_id, + "filters": filter_payload, + "limit": limit, + } + )[:16] + return AdminRouteAcceptanceReport( + report_id=report_id, + generated_at=utc_now(), + summary=summary, + returned_cases=tuple(returned), + sample_limit=max(0, int(limit)), + filters=filter_payload, + ) + + +def compact_acceptance_payload(report: AdminRouteAcceptanceReport) -> dict[str, Any]: + """Return a compact payload for GPT/MCP/UI discovery.""" + + return { + "reportId": report.report_id, + "generatedAt": report.generated_at, + "providerId": report.summary.provider_id, + "controlPlaneId": report.summary.control_plane_id, + "status": report.status.value, + "sourceHash": report.summary.source_hash, + "totalCases": report.summary.total_cases, + "readyCases": report.summary.ready_cases, + "partialCases": report.summary.partial_cases, + "blockedCases": report.summary.blocked_cases, + "readyRatio": report.summary.ready_ratio, + "operations": dict(report.summary.operations), + "platformsCount": report.summary.platforms_count, + "profilesCount": report.summary.profiles_count, + "evidenceId": report.summary.evidence_id, + "returnedCases": len(report.returned_cases), + "cases": [ + { + "routeId": case.route_id, + "platformId": case.platform_id, + "profileId": case.profile_id, + "operation": case.operation, + "toolId": case.tool_id, + "permissionScope": case.permission_scope, + "sameSource": case.same_source, + "status": case.status.value, + "evidenceId": case.evidence_id, + "sourcePayloadHash": case.source_payload_hash, + "sourceRecordsHash": case.source_records_hash, + "humanNextAction": case.pending_if_missing, + } + for case in report.returned_cases + ], + } + + +def acceptance_csv(report: AdminRouteAcceptanceReport) -> str: + """Render acceptance cases as CSV.""" + + rows = [ + [ + "route_id", + "platform_id", + "profile_id", + "operation", + "tool_id", + "permission_scope", + "same_source", + "status", + "approval_required", + "dry_run_supported", + "evidence_id", + "blockers", + ] + ] + for case in report.returned_cases: + rows.append( + [ + case.route_id, + case.platform_id, + case.profile_id, + case.operation, + case.tool_id, + case.permission_scope, + "yes" if case.same_source else "no", + case.status.value, + "yes" if case.approval_required else "no", + "yes" if case.dry_run_supported else "no", + case.evidence_id, + "; ".join(case.blocker_reasons), + ] + ) + buffer = io.StringIO() + writer = csv.writer(buffer, lineterminator="\n") + writer.writerows(rows) + return buffer.getvalue() + + +def acceptance_markdown(report: AdminRouteAcceptanceReport) -> str: + """Render a human-readable route acceptance report.""" + + lines = [ + "# MCP Admin Route Acceptance", + "", + f"- report_id: `{report.report_id}`", + f"- generated_at: `{report.generated_at}`", + f"- provider_id: `{report.summary.provider_id}`", + f"- control_plane_id: `{report.summary.control_plane_id}`", + f"- status: `{report.status.value}`", + f"- source_hash: `{report.summary.source_hash}`", + f"- total_cases: `{report.summary.total_cases}`", + f"- ready_cases: `{report.summary.ready_cases}`", + f"- partial_cases: `{report.summary.partial_cases}`", + f"- blocked_cases: `{report.summary.blocked_cases}`", + f"- ready_ratio: `{report.summary.ready_ratio:.4f}`", + f"- evidence_id: `{report.summary.evidence_id}`", + "", + "## Operacoes", + "", + ] + if report.summary.operations: + lines.extend(f"- `{name}`: `{count}`" for name, count in sorted(report.summary.operations.items())) + else: + lines.append("- Nenhuma rota administrativa materializada.") + lines.extend(["", "## Plataformas", ""]) + if report.summary.platforms: + lines.extend(f"- `{name}`: `{count}`" for name, count in sorted(report.summary.platforms.items())) + else: + lines.append("- Nenhuma plataforma materializada.") + lines.extend(["", "## Amostra", ""]) + for case in report.returned_cases[:30]: + lines.extend( + [ + f"### {case.route_id}", + "", + f"- route_key: `{case.route_key}`", + f"- tool_id: `{case.tool_id}`", + f"- source_tool_id: `{case.source_tool_id}`", + f"- permission_scope: `{case.permission_scope}`", + f"- same_source: `{case.same_source}`", + f"- status: `{case.status.value}`", + f"- evidence_id: `{case.evidence_id}`", + f"- source_payload_hash: `{case.source_payload_hash}`", + f"- source_records_hash: `{case.source_records_hash}`", + f"- next_action: {case.pending_if_missing}", + "", + ] + ) + lines.extend(["## Bloqueios", ""]) + if report.summary.blockers: + lines.extend(f"- `{redact_sensitive_text(item)}`" for item in report.summary.blockers[:80]) + else: + lines.append("- Nenhum bloqueio de contrato MCP-only nas rotas administrativas geradas.") + lines.extend( + [ + "", + "## Regra operacional", + "", + "- Toda consulta, diagnostico, acao, auditoria e explicacao interplataforma deve transitar pelo MCPs Internos.", + "- Cada caso preserva origin, destination, tool, payload, actor, permission, result, traceId, auditId e timestamp.", + "- Valores de segredo bruto nao fazem parte deste catalogo; evidencias usam hashes, traceId, auditId e refs opacas.", + ] + ) + return "\n".join(lines).strip() + "\n" + + +def acceptance_artifact_records(project_root: Path, central_platform_folder: Path | None = None) -> tuple[GeneratedFile, ...]: + """Return semantic records for acceptance artifacts.""" + + records = [ + GeneratedFile( + path=str(project_root / "dados" / "mcp-admin-route-acceptance.json"), + description="Catalogo estruturado de aceitacao das rotas administrativas MCP-only.", + function="mcp admin route acceptance report", + file_type="json", + changed_by="mais_humana.mcp_admin_route_acceptance", + change_summary="Materializado catalogo auditavel de rotas administrativas MCP-only.", + relation_to_order="0037_EXECUTIVA__homologar-rotas-administrativas-mcp-no-gateway", + ), + GeneratedFile( + path=str(project_root / "dados" / "mcp-admin-route-acceptance-compacto.json"), + description="Resumo compacto das rotas administrativas para GPT/MCP/UI.", + function="mcp admin route acceptance compact payload", + file_type="json", + changed_by="mais_humana.mcp_admin_route_acceptance", + change_summary="Criado payload compacto com contadores, hashes e amostra de rotas.", + relation_to_order="0037_EXECUTIVA__homologar-rotas-administrativas-mcp-no-gateway", + ), + GeneratedFile( + path=str(project_root / "matrizes" / "mcp-admin-route-acceptance.csv"), + description="Matriz tabular de aceitacao das rotas administrativas MCP-only.", + function="mcp admin route acceptance matrix", + file_type="csv", + changed_by="mais_humana.mcp_admin_route_acceptance", + change_summary="Criada matriz para auditoria de operacao, permissao e same-source.", + relation_to_order="0037_EXECUTIVA__homologar-rotas-administrativas-mcp-no-gateway", + ), + GeneratedFile( + path=str(project_root / "ecossistema" / "MCP-ADMIN-ROUTE-ACCEPTANCE.md"), + description="Relatorio humano de aceitacao das rotas administrativas MCP-only.", + function="mcp admin route acceptance human report", + file_type="markdown", + changed_by="mais_humana.mcp_admin_route_acceptance", + change_summary="Criado relatorio de homologacao local das rotas administrativas.", + relation_to_order="0037_EXECUTIVA__homologar-rotas-administrativas-mcp-no-gateway", + ), + ] + if central_platform_folder is not None: + records.extend( + [ + GeneratedFile( + path=str(central_platform_folder / "reports" / "EXECUTADO__mcp-admin-route-acceptance.md"), + description="Copia central do relatorio de aceitacao das rotas administrativas.", + function="mcp admin route acceptance central report", + file_type="markdown", + changed_by="mais_humana.mcp_admin_route_acceptance", + change_summary="Registrada homologacao local das rotas administrativas na central.", + relation_to_order="0037_EXECUTIVA__homologar-rotas-administrativas-mcp-no-gateway", + ), + GeneratedFile( + path=str(central_platform_folder / "indexes" / "mcp-admin-route-acceptance-index.md"), + description="Indice central do catalogo de rotas administrativas MCP-only.", + function="mcp admin route acceptance central index", + file_type="markdown", + changed_by="mais_humana.mcp_admin_route_acceptance", + change_summary="Indexado status, contadores e evidencia da aceitacao MCP-only.", + relation_to_order="0037_EXECUTIVA__homologar-rotas-administrativas-mcp-no-gateway", + ), + ] + ) + return tuple(records) + + +def _write_text(path: Path, text: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(text, encoding="utf-8") + + +def write_admin_route_acceptance_artifacts( + report: AdminRouteAcceptanceReport, + project_root: Path, + *, + central_platform_folder: Path | None = None, +) -> tuple[GeneratedFile, ...]: + """Write project and optional central route acceptance artifacts.""" + + full_json = json.dumps(report.to_dict(), ensure_ascii=False, indent=2, sort_keys=True) + compact_json = json.dumps(compact_acceptance_payload(report), ensure_ascii=False, indent=2, sort_keys=True) + markdown = acceptance_markdown(report) + targets: list[tuple[Path, str]] = [ + (project_root / "dados" / "mcp-admin-route-acceptance.json", full_json), + (project_root / "dados" / "mcp-admin-route-acceptance-compacto.json", compact_json), + (project_root / "matrizes" / "mcp-admin-route-acceptance.csv", acceptance_csv(report)), + (project_root / "ecossistema" / "MCP-ADMIN-ROUTE-ACCEPTANCE.md", markdown), + ] + records = list(acceptance_artifact_records(project_root, central_platform_folder)) + central_failures: list[dict[str, str]] = [] + if central_platform_folder is not None: + targets.extend( + [ + (central_platform_folder / "reports" / "EXECUTADO__mcp-admin-route-acceptance.md", markdown), + (central_platform_folder / "indexes" / "mcp-admin-route-acceptance-index.md", markdown), + ] + ) + for path, content in targets: + try: + _write_text(path, content) + except OSError as exc: + if central_platform_folder is not None and central_platform_folder in path.parents: + central_failures.append({"path": str(path), "error": f"{type(exc).__name__}: {exc}"}) + continue + raise + if central_failures: + status_path = project_root / "dados" / "mcp-admin-route-acceptance-central-write-status.json" + _write_text( + status_path, + json.dumps( + { + "generatedAt": utc_now(), + "ok": False, + "centralPlatformFolder": str(central_platform_folder), + "failures": central_failures, + }, + ensure_ascii=False, + indent=2, + sort_keys=True, + ), + ) + records.append( + GeneratedFile( + path=str(status_path), + description="Status de escrita central do catalogo de rotas administrativas.", + function="mcp admin route acceptance central write status", + file_type="json", + changed_by="mais_humana.mcp_admin_route_acceptance", + change_summary="Registrada falha de escrita central sem abortar artefatos do projeto real.", + relation_to_order="0040_EXECUTIVA__materializar-escrita-central-e-sql-semantico-sem-permissionerror", + ) + ) + return tuple(records) + + +def run_admin_route_acceptance( + *, + project_root: Path, + central_platform_folder: Path | None = None, + platform_id: str | None = None, + profile_id: str | None = None, + operation: str | None = None, + status: str | None = None, + limit: int = 120, +) -> tuple[AdminRouteAcceptanceReport, tuple[GeneratedFile, ...]]: + """Build and write route acceptance artifacts.""" + + report = build_admin_route_acceptance_report( + platform_id=platform_id, + profile_id=profile_id, + operation=operation, + status=status, + limit=limit, + ) + records = write_admin_route_acceptance_artifacts(report, project_root, central_platform_folder=central_platform_folder) + report_with_files = AdminRouteAcceptanceReport( + report_id=report.report_id, + generated_at=report.generated_at, + summary=report.summary, + returned_cases=report.returned_cases, + sample_limit=report.sample_limit, + filters=report.filters, + generated_files=tuple(record.path for record in records), + ) + return report_with_files, records diff --git a/tools/generate_mcp_admin_route_acceptance.py b/tools/generate_mcp_admin_route_acceptance.py new file mode 100644 index 0000000..c11225e --- /dev/null +++ b/tools/generate_mcp_admin_route_acceptance.py @@ -0,0 +1,207 @@ +"""Generate a compact Python acceptance catalog for MCP admin routes.""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path +from typing import Any + + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +OUTPUT = SRC / "mais_humana" / "generated_mcp_admin_route_acceptance.py" + +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from mais_humana.mcp_contract import McpContractKind, iter_contracts, stable_hash # noqa: E402 + + +def _operation(contract_id: str) -> str: + parts = contract_id.split(".") + if len(parts) >= 4 and parts[-1] == "administration-route": + return parts[-2] + return "unknown" + + +def _permission(policy_tags: tuple[str, ...]) -> str: + for tag in reversed(policy_tags): + if tag.startswith("mcp.admin."): + return tag + return "" + + +def _case_status(row: dict[str, Any]) -> tuple[str, tuple[str, ...]]: + blockers: list[str] = [] + required_transit = tuple(row["requiredTransitFields"]) + required_payload = tuple(row["requiredPayloadFields"]) + for field in ( + "origin", + "destination", + "tool", + "payload", + "actor", + "permission", + "result", + "traceId", + "auditId", + "timestamp", + ): + if field not in required_transit: + blockers.append(f"missing_transit:{field}") + for field in ( + "adminRouteId", + "adminRouteKind", + "sourceEndpoint", + "sourceToolId", + "sourcePayloadHash", + "sourceRecordsHash", + "truthState", + "panelReady", + "gptExplainable", + "humanNextAction", + "permissionScope", + "mcpOnlyAdministration", + ): + if field not in required_payload: + blockers.append(f"missing_payload:{field}") + if not row["sameSource"]: + blockers.append("same_source_false") + if not row["permissionScope"]: + blockers.append("permission_scope_missing") + if row["operation"] == "acao" and not (row["approvalRequired"] or row["dryRunSupported"]): + blockers.append("mutable_action_without_approval_or_dry_run") + if blockers: + return "blocked", tuple(blockers) + if row["maturityLevel"] < 8: + return "partial", ("maturity_below_8",) + return "ready", () + + +def _case_from_contract(contract: Any) -> dict[str, Any]: + operation = _operation(contract.contract_id) + permission = _permission(tuple(contract.policy_tags)) + required_payload = tuple(contract.required_payload_fields) + row = { + "routeId": contract.contract_id, + "platformId": contract.platform_id, + "profileId": contract.profile_id, + "operation": operation, + "toolId": contract.tool_id, + "sourceToolId": contract.source_tool_id, + "permissionScope": permission, + "sourceEndpoint": contract.source_endpoint, + "sourcePayloadHash": contract.source_payload_hash, + "sourceRecordsHash": contract.source_records_hash, + "evidenceId": f"evidence-{contract.source_records_hash[:24]}", + "truthState": contract.truth_state.value, + "panelReady": bool(contract.panel_ready), + "gptExplainable": bool(contract.gpt_explainable), + "sameSource": bool(contract.same_source_ready), + "approvalRequired": "approvalRequired" in required_payload, + "dryRunSupported": "dryRunSupported" in required_payload, + "requiredTransitFields": tuple(contract.required_transit_fields), + "requiredPayloadFields": required_payload, + "validationSteps": tuple(contract.validation_steps), + "redactionRequirements": tuple(contract.redaction_requirements), + "pendingIfMissing": contract.pending_if_missing, + "orderIds": tuple(contract.order_ids), + "policyTags": tuple(contract.policy_tags), + "maturityLevel": int(contract.maturity_level), + } + status, blockers = _case_status(row) + row["status"] = status + row["blockerReasons"] = blockers + return row + + +def _format_value(value: Any) -> str: + return repr(value) + + +def _render_case(row: dict[str, Any]) -> list[str]: + lines = [" {"] + for key in ( + "routeId", + "platformId", + "profileId", + "operation", + "toolId", + "sourceToolId", + "permissionScope", + "sourceEndpoint", + "sourcePayloadHash", + "sourceRecordsHash", + "evidenceId", + "truthState", + "panelReady", + "gptExplainable", + "sameSource", + "approvalRequired", + "dryRunSupported", + "requiredTransitFields", + "requiredPayloadFields", + "validationSteps", + "redactionRequirements", + "pendingIfMissing", + "orderIds", + "policyTags", + "maturityLevel", + "status", + "blockerReasons", + ): + lines.append(f" {key!r}: {_format_value(row[key])},") + lines.append(" },") + return lines + + +def build_cases() -> tuple[dict[str, Any], ...]: + contracts = [contract for contract in iter_contracts() if contract.kind == McpContractKind.ADMINISTRATION_ROUTE] + rows = [_case_from_contract(contract) for contract in sorted(contracts, key=lambda item: item.contract_id)] + return tuple(rows) + + +def render_module(cases: tuple[dict[str, Any], ...]) -> str: + source_hash = stable_hash(cases) + lines = [ + '"""Generated MCP administration route acceptance catalog.', + "", + "Do not edit by hand. Regenerate with:", + "python tools/generate_mcp_admin_route_acceptance.py", + '"""', + "", + "from __future__ import annotations", + "", + f"SOURCE_HASH = {source_hash!r}", + f"CASES_COUNT = {len(cases)!r}", + "", + "ADMIN_ROUTE_ACCEPTANCE_CASES = (", + ] + for row in cases: + lines.extend(_render_case(row)) + lines.extend([")", ""]) + return "\n".join(lines) + + +def main() -> int: + cases = build_cases() + OUTPUT.parent.mkdir(parents=True, exist_ok=True) + OUTPUT.write_text(render_module(cases), encoding="utf-8") + print( + json.dumps( + { + "output": str(OUTPUT), + "cases": len(cases), + "sourceHash": stable_hash(cases), + }, + ensure_ascii=False, + indent=2, + sort_keys=True, + ) + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())