"""Command line interface for the Mais Humana platform.""" from __future__ import annotations import argparse import json from pathlib import Path from .models import as_plain_data from .central_consolidation import run_consolidated_report from .central_materialization import run_central_materialization from .canonical_identity import identity_graph_payload, run_canonical_identity_graph from .canonical_migration import migration_plan_payload, run_canonical_migration_plan from .matrix import build_global_recommendations, build_matrix, build_platform_reports from .mcp_contract import build_mcp_contract_report, build_mcp_execute_probe, mcp_provider_compact_json, mcp_provider_payload from .mcp_contract import ( administration_route_readiness_markdown, administration_route_readiness_payload, mcp_contract_csv, mcp_contract_markdown, mcp_execute_probe_markdown, official_report_models_markdown, same_source_validation_payload, ui_renderer_policy_markdown, ) from .mcp_admin_route_acceptance import run_admin_route_acceptance from .mcp_gateway_access_policy import run_access_policy_gate from .mcp_publication_gate import run_publication_gate from .mcp_transit_ledger import build_mcp_transit_ledger, mcp_transit_csv, mcp_transit_ledger_compact_json, mcp_transit_markdown from .operational_dossier import build_execution_round_dossier from .governance_engine import build_governance_portfolio, compact_governance_payload from .human_rulebook import evaluate_rulebook, rulebook_compact_json from .human_readiness_registry import build_readiness_registry from .institutional_assurance import compact_assurance_payload, run_institutional_assurance from .runtime_budget import build_round_line_budget from .orders import build_exit_orders from .reports import generate from .repository_mesh import mesh_summary_payload, run_repository_mesh from .repository_mesh_reconciliation import apply_reconciliation_to_report, reconciliation_payload from .repository_mesh_runtime import ( acquire_lock, build_runtime_cycle, cron_scheduler_spec, release_lock, scheduler_payload, windows_scheduler_spec, write_runtime_artifacts, ) from .repository_mesh_semantic import write_repository_mesh_semantic_state from .repository_mesh_readiness import build_mesh_readiness_report, write_readiness_artifacts from .repository_mesh_gitea import build_gitea_mesh_plan, write_gitea_plan_artifacts from .router000_exit_orders import run_router000_exit_orders from .scanner import environment_summary, scan_ecosystem from .storage import table_counts from .targeted_sync_audit import run_targeted_sync_audit from .workspace_hygiene import run_workspace_hygiene from .workspace_hygiene_policy import policy_payload, run_hygiene_policy def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="mais-humana", description="Generate human-centered ecosystem reports.") sub = parser.add_subparsers(dest="command", required=True) scan = sub.add_parser("scan", help="Scan ecosystem repositories and print compact JSON summary.") scan.add_argument("--ecosystem-root", default="G:/_codex-git") gen = sub.add_parser("generate", help="Generate DOCX, SVG, JSON, matrices, and service orders.") gen.add_argument("--ecosystem-root", default="G:/_codex-git") gen.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") gen.add_argument("--central-platform-folder", default="") gen.add_argument("--push-status", default="") sql = sub.add_parser("sql-counts", help="Print semantic SQLite table counts.") sql.add_argument("--sqlite", required=True) env = sub.add_parser("env", help="Print local environment summary.") env.add_argument("--ecosystem-root", default="G:/_codex-git") dossier = sub.add_parser("dossier", help="Print operational dossier JSON without writing artifacts.") dossier.add_argument("--ecosystem-root", default="G:/_codex-git") dossier.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") governance = sub.add_parser("governance", help="Print compact governance portfolio JSON.") governance.add_argument("--ecosystem-root", default="G:/_codex-git") governance.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") budget = sub.add_parser("line-budget", help="Print round line-budget JSON.") budget.add_argument("--ecosystem-root", default="G:/_codex-git") budget.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") rulebook = sub.add_parser("rulebook", help="Print compact human-operational rulebook JSON.") rulebook.add_argument("--ecosystem-root", default="G:/_codex-git") rulebook.add_argument("--limit", type=int, default=0) mcp_provider = sub.add_parser("mcp-provider", help="Print the compact Mais Humana MCP provider payload.") mcp_provider.add_argument("--ecosystem-root", default="G:/_codex-git") mcp_provider.add_argument("--limit", type=int, default=80) mcp_provider.add_argument("--envelope", action="store_true") mcp_probe = sub.add_parser("mcp-execute-probe", help="Print a safe /v1/execute probe for the Mais Humana provider.") mcp_probe.add_argument("--ecosystem-root", default="G:/_codex-git") mcp_probe.add_argument("--limit", type=int, default=20) mcp_probe.add_argument("--observed-status", default="not_executed") mcp_probe.add_argument("--observed-note", default="request prepared; live execution must record the HTTP status separately") mcp_transit = sub.add_parser("mcp-transit-ledger", help="Print compact MCP transit ledger for Mais Humana contracts.") mcp_transit.add_argument("--ecosystem-root", default="G:/_codex-git") mcp_transit.add_argument("--limit", type=int, default=80) mcp_artifacts = sub.add_parser("mcp-contract-artifacts", help="Write focused MCP contract artifacts without regenerating DOCX reports.") mcp_artifacts.add_argument("--ecosystem-root", default="G:/_codex-git") mcp_artifacts.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") mcp_artifacts.add_argument("--central-platform-folder", default="") mcp_artifacts.add_argument("--no-scan", action="store_true", help="Use the generated MCP catalog directly, without rescanning repositories.") repo_mesh = sub.add_parser("repo-mesh", help="Inventory repository mirrors and write safe synchronization artifacts.") repo_mesh.add_argument("--ecosystem-root", default="G:/_codex-git") repo_mesh.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") repo_mesh.add_argument("--central-platform-folder", default="") repo_mesh.add_argument("--fetch", action="store_true") repo_mesh.add_argument("--plugin-auth-attempt", default="") consolidated = sub.add_parser("consolidated-report", help="Write consolidated administrative pending report across central projects.") consolidated.add_argument("--central-projects-root", default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects") consolidated.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") consolidated.add_argument("--central-platform-folder", default="") consolidated.add_argument("--plugin-auth-attempt", default="") consolidated.add_argument("--git-sync-status", default="") publication = sub.add_parser("mcp-publication-gate", help="Write the Mais Humana MCP publication gate artifacts.") publication.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") publication.add_argument("--mcp-repo-root", default="G:/_codex-git/tudo-para-ia-mcps-internos-plataform") publication.add_argument("--central-platform-folder", default="") publication.add_argument("--wrangler-summary", default="") publication.add_argument("--git-sync-status", default="") publication.add_argument("--repo-remote", default="") publication.add_argument("--bearer", default="") publication.add_argument("--live-probe", action="store_true") access_policy = sub.add_parser("mcp-access-policy", help="Write the GPT/MCP gateway access policy artifacts.") access_policy.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") access_policy.add_argument("--central-platform-folder", default="") access_policy.add_argument("--publication-gate-json", default="") hygiene = sub.add_parser("workspace-hygiene", help="Inspect or clean approved local artifacts for closeout.") hygiene.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") hygiene.add_argument("--central-platform-folder", default="") hygiene.add_argument("--apply", action="store_true") hygiene_policy = sub.add_parser("workspace-hygiene-policy", help="Write executable workspace cleanup and ACL retention policy.") hygiene_policy.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") hygiene_policy.add_argument("--no-generated", action="store_true") hygiene_policy.add_argument("--limit", type=int, default=40) sync_audit = sub.add_parser("targeted-sync-audit", help="Write safe Git synchronization audit for the active round repos.") sync_audit.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") sync_audit.add_argument("--mcp-repo-root", default="G:/_codex-git/tudo-para-ia-mcps-internos-plataform") sync_audit.add_argument("--central-repo-root", default="G:/_codex-git/nucleo-gestao-operacional") sync_audit.add_argument("--central-platform-folder", default="") sync_audit.add_argument("--fetch", action="store_true") admin_acceptance = sub.add_parser("mcp-admin-route-acceptance", help="Write MCP-only administration route acceptance artifacts.") admin_acceptance.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") admin_acceptance.add_argument("--central-platform-folder", default="") admin_acceptance.add_argument("--platform-id", default="") admin_acceptance.add_argument("--profile-id", default="") admin_acceptance.add_argument("--operation", default="") admin_acceptance.add_argument("--status", default="") admin_acceptance.add_argument("--limit", type=int, default=120) central_materialization = sub.add_parser("central-materialization", help="Materialize central active/output orders and semantic records.") central_materialization.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") central_materialization.add_argument( "--central-platform-folder", default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects/15_repo_tudo-para-ia-mais-humana-platform", ) central_materialization.add_argument("--overwrite", action="store_true") canonical_identity = sub.add_parser("canonical-identity", help="Write canonical identity graph and MCP alias acceptance artifacts.") canonical_identity.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") canonical_identity.add_argument( "--central-platform-folder", default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects/15_repo_tudo-para-ia-mais-humana-platform", ) canonical_identity.add_argument("--no-generated", action="store_true", help="Build graph from runtime targets instead of generated registry.") canonical_migration = sub.add_parser("canonical-migration-plan", help="Write canonical-name migration controls and MCP acceptance cases.") canonical_migration.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") canonical_migration.add_argument( "--central-platform-folder", default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects/15_repo_tudo-para-ia-mais-humana-platform", ) canonical_migration.add_argument("--no-generated", action="store_true", help="Build acceptance cases from runtime repository targets.") canonical_migration.add_argument("--limit", type=int, default=40) institutional_assurance = sub.add_parser( "institutional-assurance", help="Scan router-000 institutional decisions and write assurance artifacts.", ) institutional_assurance.add_argument("--ecosystem-root", default="G:/_codex-git") institutional_assurance.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") institutional_assurance.add_argument( "--central-projects-root", default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects", ) institutional_assurance.add_argument("--sandbox-mode", default="workspace-write") institutional_assurance.add_argument("--plugin-auth-attempt", default="") institutional_assurance.add_argument("--no-central", action="store_true") institutional_assurance.add_argument("--limit", type=int, default=40) router000_orders = sub.add_parser("router000-exit-orders", help="Write Router 000 output service orders into affected central folders.") router000_orders.add_argument("--ecosystem-root", default="G:/_codex-git") router000_orders.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") router000_orders.add_argument( "--central-projects-root", default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects", ) router000_orders.add_argument("--executive-limit", type=int, default=5) router000_orders.add_argument("--managerial-limit", type=int, default=5) return parser def command_scan(args: argparse.Namespace) -> int: root = Path(args.ecosystem_root) scans = scan_ecosystem(root) payload = { "platforms": [ { "platform_id": scan.platform.platform_id, "exists": scan.exists, "git_present": scan.git_present, "code_lines": scan.code_lines, "evidence": len(scan.evidence), "warnings": scan.warnings, } for scan in scans ], "total_code_lines": sum(scan.code_lines for scan in scans), "total_evidence": sum(len(scan.evidence) for scan in scans), } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_generate(args: argparse.Namespace) -> int: central = Path(args.central_platform_folder) if args.central_platform_folder else None bundle = generate( ecosystem_root=Path(args.ecosystem_root), project_root=Path(args.project_root), central_platform_folder=central, push_status=args.push_status or None, ) print(json.dumps(as_plain_data(bundle), ensure_ascii=False, indent=2)) return 0 def command_sql_counts(args: argparse.Namespace) -> int: print(json.dumps(table_counts(Path(args.sqlite)), ensure_ascii=False, indent=2)) return 0 def command_env(args: argparse.Namespace) -> int: print(json.dumps(environment_summary(Path(args.ecosystem_root)), ensure_ascii=False, indent=2)) return 0 def command_dossier(args: argparse.Namespace) -> int: scans = scan_ecosystem(Path(args.ecosystem_root)) cells = build_matrix(scans) reports = build_platform_reports(scans, cells) recommendations = build_global_recommendations(reports) orders = build_exit_orders(recommendations) dossier = build_execution_round_dossier( project_root=Path(args.project_root), platform_reports=reports, recommendations=recommendations, output_orders=orders, total_code_lines_analyzed=sum(scan.code_lines for scan in scans), ) print(json.dumps(as_plain_data(dossier), ensure_ascii=False, indent=2)) return 0 def command_governance(args: argparse.Namespace) -> int: scans = scan_ecosystem(Path(args.ecosystem_root)) cells = build_matrix(scans) reports = build_platform_reports(scans, cells) recommendations = build_global_recommendations(reports) orders = build_exit_orders(recommendations) dossier = build_execution_round_dossier( project_root=Path(args.project_root), platform_reports=reports, recommendations=recommendations, output_orders=orders, total_code_lines_analyzed=sum(scan.code_lines for scan in scans), ) portfolio = build_governance_portfolio(reports, recommendations=recommendations, round_dossier=dossier) registry = build_readiness_registry(reports, portfolio) payload = compact_governance_payload(portfolio) payload["readiness_entries"] = len(registry.entries) payload["weak_readiness_entries"] = len(registry.weak_entries) print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_line_budget(args: argparse.Namespace) -> int: budget = build_round_line_budget(Path(args.ecosystem_root), Path(args.project_root)) print(json.dumps(as_plain_data(budget), ensure_ascii=False, indent=2)) return 0 def command_rulebook(args: argparse.Namespace) -> int: scans = scan_ecosystem(Path(args.ecosystem_root)) cells = build_matrix(scans) reports = build_platform_reports(scans, cells) report = evaluate_rulebook(reports, limit=args.limit or None) print(json.dumps(rulebook_compact_json(report), ensure_ascii=False, indent=2)) return 0 def command_mcp_provider(args: argparse.Namespace) -> int: scans = scan_ecosystem(Path(args.ecosystem_root)) cells = build_matrix(scans) reports = build_platform_reports(scans, cells) rulebook = evaluate_rulebook(reports) contracts = build_mcp_contract_report(rulebook) payload = ( mcp_provider_payload(contracts, limit=args.limit) if args.envelope else mcp_provider_compact_json(contracts, limit=args.limit) ) print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_mcp_execute_probe(args: argparse.Namespace) -> int: scans = scan_ecosystem(Path(args.ecosystem_root)) cells = build_matrix(scans) reports = build_platform_reports(scans, cells) rulebook = evaluate_rulebook(reports) contracts = build_mcp_contract_report(rulebook) probe = build_mcp_execute_probe( contracts, limit=args.limit, observed_status=args.observed_status, observed_note=args.observed_note, ) print(json.dumps(as_plain_data(probe), ensure_ascii=False, indent=2)) return 0 def command_mcp_transit_ledger(args: argparse.Namespace) -> int: scans = scan_ecosystem(Path(args.ecosystem_root)) cells = build_matrix(scans) reports = build_platform_reports(scans, cells) rulebook = evaluate_rulebook(reports) contracts = build_mcp_contract_report(rulebook) ledger = build_mcp_transit_ledger(contracts) print(json.dumps(mcp_transit_ledger_compact_json(ledger, limit=args.limit), ensure_ascii=False, indent=2)) return 0 def _write_json(path: Path, payload: object) -> str: from .models import as_plain_data path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(as_plain_data(payload), ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") return str(path) def _write_text(path: Path, text: str) -> str: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(text, encoding="utf-8") return str(path) def command_mcp_contract_artifacts(args: argparse.Namespace) -> int: project_root = Path(args.project_root) central = Path(args.central_platform_folder) if args.central_platform_folder else None rulebook = None if not args.no_scan: scans = scan_ecosystem(Path(args.ecosystem_root)) cells = build_matrix(scans) reports = build_platform_reports(scans, cells) rulebook = evaluate_rulebook(reports) contracts = build_mcp_contract_report(rulebook) ledger = build_mcp_transit_ledger(contracts) admin_routes = administration_route_readiness_payload() probe = build_mcp_execute_probe(contracts) written = [ _write_json(project_root / "dados" / "mcp-contratos-humanos.json", contracts), _write_json(project_root / "dados" / "mcp-provider-mais-humana.json", mcp_provider_payload(contracts)), _write_json(project_root / "dados" / "mcp-provider-mais-humana-compacto.json", mcp_provider_compact_json(contracts)), _write_json(project_root / "dados" / "mcp-execute-probe-mais-humana.json", probe), _write_json(project_root / "dados" / "mcp-admin-ui-same-source-validation.json", same_source_validation_payload(contracts)), _write_json(project_root / "dados" / "mcp-transit-ledger.json", ledger), _write_json(project_root / "dados" / "mcp-transit-ledger-compacto.json", mcp_transit_ledger_compact_json(ledger)), _write_json(project_root / "dados" / "mcp-administration-routes-readiness.json", admin_routes), _write_text(project_root / "ecossistema" / "MCP-PROVIDER-MAIS-HUMANA.md", mcp_contract_markdown(contracts)), _write_text(project_root / "ecossistema" / "MCP-EXECUTE-PROBE-MAIS-HUMANA.md", mcp_execute_probe_markdown(probe)), _write_text(project_root / "ecossistema" / "MCP-TRANSIT-LEDGER.md", mcp_transit_markdown(ledger)), _write_text(project_root / "ecossistema" / "MCP-ADMINISTRATION-ROUTES.md", administration_route_readiness_markdown(admin_routes)), _write_text(project_root / "ecossistema" / "MODELOS-OFICIAIS-RELATORIO-HUMANO.md", official_report_models_markdown(contracts)), _write_text(project_root / "ecossistema" / "UI-RENDERER-SAME-SOURCE-POLICY.md", ui_renderer_policy_markdown(contracts)), _write_text(project_root / "matrizes" / "mcp-contratos-humanos.csv", mcp_contract_csv(contracts)), _write_text(project_root / "matrizes" / "mcp-transit-ledger.csv", mcp_transit_csv(ledger)), ] central_error = "" if central is not None: try: written.append(_write_text(central / "reports" / "EXECUTADO__mcp-contract-artifacts.md", mcp_contract_markdown(contracts))) written.append(_write_text(central / "indexes" / "mcp-administration-routes-index.md", administration_route_readiness_markdown(admin_routes))) except OSError as exc: central_error = f"{type(exc).__name__}: {exc}" payload = { "contractsCount": contracts.contracts_count, "coverageCount": len(contracts.coverage), "transitRecords": ledger.records_count, "administrationRoutes": admin_routes["contractsCount"], "scanMode": "generated_catalog_only" if args.no_scan else "ecosystem_scan", "centralError": central_error, "generatedFiles": written, } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_repo_mesh(args: argparse.Namespace) -> int: central = Path(args.central_platform_folder) if args.central_platform_folder else None central_write_error = "" central_for_write = central if central_for_write is not None: try: for folder_name in ("reports", "indexes", "audit", "status"): (central_for_write / folder_name).mkdir(parents=True, exist_ok=True) probe_path = central_for_write / "reports" / ".repository_mesh_write_probe.tmp" probe_path.write_text("repository mesh write probe\n", encoding="utf-8") probe_path.unlink(missing_ok=True) except OSError as exc: central_write_error = f"{type(exc).__name__}: {exc}" central_for_write = None report, records = run_repository_mesh( ecosystem_root=Path(args.ecosystem_root), project_root=Path(args.project_root), central_platform_folder=central_for_write, fetch=bool(args.fetch), plugin_auth_attempt=args.plugin_auth_attempt, ) plan, reconciliation_records = apply_reconciliation_to_report( report, Path(args.project_root), central_platform_folder=central_for_write, ) project_root = Path(args.project_root) lock = acquire_lock(project_root / "dados" / "repository-mesh.lock.json", owner="mais_humana.cli.repo-mesh") cycle = build_runtime_cycle(report, plan, lock=lock, execute=False) specs = ( windows_scheduler_spec( python_exe="C:\\Users\\Ami\\.cache\\codex-runtimes\\codex-primary-runtime\\dependencies\\python\\python.exe", project_root=project_root, ecosystem_root=Path(args.ecosystem_root), central_platform_folder=central_for_write, ), cron_scheduler_spec( python_exe="python", project_root=project_root, ecosystem_root=Path(args.ecosystem_root), central_platform_folder=central_for_write, ), ) runtime_records = write_runtime_artifacts(cycle, specs, project_root, central_platform_folder=central_for_write) semantic_write_error = "" semantic_path_used = "" if central is not None: try: central_semantic_path = central / "controle-semantico.sqlite" semantic_counts = write_repository_mesh_semantic_state( central_semantic_path, report=report, plan=plan, cycle=cycle, schedulers=specs, ) semantic_path_used = str(central_semantic_path) except Exception as exc: semantic_write_error = f"{type(exc).__name__}: {exc}" semantic_counts = write_repository_mesh_semantic_state( project_root / "controle-semantico.sqlite", report=report, plan=plan, cycle=cycle, schedulers=specs, ) semantic_path_used = str(project_root / "controle-semantico.sqlite") else: semantic_counts = write_repository_mesh_semantic_state( project_root / "controle-semantico.sqlite", report=report, plan=plan, cycle=cycle, schedulers=specs, ) semantic_path_used = str(project_root / "controle-semantico.sqlite") readiness = build_mesh_readiness_report(report, plan, cycle, specs, semantic_counts) readiness_records = write_readiness_artifacts(readiness, project_root, central_platform_folder=central_for_write) gitea_plan = build_gitea_mesh_plan(report) gitea_records = write_gitea_plan_artifacts(gitea_plan, project_root, central_platform_folder=central_for_write) release_lock(lock) payload = mesh_summary_payload(report) payload["reconciliation"] = reconciliation_payload(plan) payload["runtime"] = { "cycleId": cycle.cycle_id, "allowed": cycle.allowed_count, "blocked": cycle.blocked_count, "skipped": cycle.skipped_count, "schedulers": scheduler_payload(specs), } payload["readiness"] = readiness.to_dict() payload["gitea"] = gitea_plan.to_dict() payload["centralWrite"] = { "requested": str(central) if central is not None else "", "used": str(central_for_write) if central_for_write is not None else "", "error": central_write_error, "semanticPath": semantic_path_used, "semanticError": semantic_write_error, } payload["generatedFiles"] = [ record.path for record in tuple(records) + tuple(reconciliation_records) + tuple(runtime_records) + tuple(readiness_records) + tuple(gitea_records) ] if central_write_error: status_path = project_root / "dados" / "repository-mesh-central-write-status.json" status_path.parent.mkdir(parents=True, exist_ok=True) status_path.write_text( json.dumps(payload["centralWrite"], ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8", ) payload["generatedFiles"].append(str(status_path)) print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_consolidated_report(args: argparse.Namespace) -> int: central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None report, records = run_consolidated_report( Path(args.central_projects_root), Path(args.project_root), central_platform_folder=central_platform_folder, plugin_auth_attempt=args.plugin_auth_attempt, git_sync_status=args.git_sync_status, ) payload = { "report": report.to_dict(), "generatedFiles": [record.path for record in records], } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_mcp_publication_gate(args: argparse.Namespace) -> int: central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None report, records = run_publication_gate( project_root=Path(args.project_root), mcp_repo_root=Path(args.mcp_repo_root), central_platform_folder=central_platform_folder, wrangler_raw_summary=args.wrangler_summary, git_sync_status=args.git_sync_status, repo_remote=args.repo_remote, bearer=args.bearer, live_probe=bool(args.live_probe), ) payload = { "report": report.to_dict(), "generatedFiles": [record.path for record in records], } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_mcp_access_policy(args: argparse.Namespace) -> int: central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None publication_gate_json = Path(args.publication_gate_json) if args.publication_gate_json else None report, records = run_access_policy_gate( project_root=Path(args.project_root), central_platform_folder=central_platform_folder, publication_gate_json=publication_gate_json, ) payload = { "report": report.to_dict(), "generatedFiles": [record.path for record in records], } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_workspace_hygiene(args: argparse.Namespace) -> int: central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None report, records = run_workspace_hygiene( project_root=Path(args.project_root), central_platform_folder=central_platform_folder, apply=bool(args.apply), ) payload = { "report": report.to_dict(), "generatedFiles": [record.path for record in records], } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_workspace_hygiene_policy(args: argparse.Namespace) -> int: report, records = run_hygiene_policy( project_root=Path(args.project_root), use_generated=not bool(args.no_generated), ) payload = { "report": policy_payload(report, limit_cases=int(args.limit)), "generatedFiles": [record.path for record in records], } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_targeted_sync_audit(args: argparse.Namespace) -> int: central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None report, records = run_targeted_sync_audit( project_root=Path(args.project_root), mcp_repo_root=Path(args.mcp_repo_root), central_repo_root=Path(args.central_repo_root), central_platform_folder=central_platform_folder, fetch=bool(args.fetch), ) payload = { "report": report.to_dict(), "generatedFiles": [record.path for record in records], } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_mcp_admin_route_acceptance(args: argparse.Namespace) -> int: central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None report, records = run_admin_route_acceptance( project_root=Path(args.project_root), central_platform_folder=central_platform_folder, platform_id=args.platform_id or None, profile_id=args.profile_id or None, operation=args.operation or None, status=args.status or None, limit=int(args.limit), ) payload = { "report": report.to_dict(), "generatedFiles": [record.path for record in records], } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_central_materialization(args: argparse.Namespace) -> int: report = run_central_materialization( project_root=Path(args.project_root), central_platform_folder=Path(args.central_platform_folder), overwrite=bool(args.overwrite), ) print(json.dumps(report.to_dict(), ensure_ascii=False, indent=2)) return 0 def command_canonical_identity(args: argparse.Namespace) -> int: central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None graph, records = run_canonical_identity_graph( project_root=Path(args.project_root), central_platform_folder=central_platform_folder, use_generated=not bool(args.no_generated), ) payload = { "graph": identity_graph_payload(graph, limit_cases=20), "generatedFiles": [record.path for record in records], } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_canonical_migration_plan(args: argparse.Namespace) -> int: central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None plan = run_canonical_migration_plan( project_root=Path(args.project_root), central_platform_folder=central_platform_folder, use_generated=not bool(args.no_generated), ) print(json.dumps(migration_plan_payload(plan, limit_cases=int(args.limit)), ensure_ascii=False, indent=2)) return 0 def command_institutional_assurance(args: argparse.Namespace) -> int: report, records = run_institutional_assurance( ecosystem_root=Path(args.ecosystem_root), project_root=Path(args.project_root), central_projects_root=Path(args.central_projects_root), sandbox_mode=args.sandbox_mode, plugin_auth_attempt=args.plugin_auth_attempt, write_central=not bool(args.no_central), ) payload = compact_assurance_payload(report, limit=int(args.limit)) payload["generatedFiles"] = [record.path for record in records] print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def command_router000_exit_orders(args: argparse.Namespace) -> int: report, records = run_router000_exit_orders( ecosystem_root=Path(args.ecosystem_root), project_root=Path(args.project_root), central_projects_root=Path(args.central_projects_root), executive_limit=int(args.executive_limit), managerial_limit=int(args.managerial_limit), ) payload = { "report": report.to_dict(), "generatedFiles": [record.path for record in records], } print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) if args.command == "scan": return command_scan(args) if args.command == "generate": return command_generate(args) if args.command == "sql-counts": return command_sql_counts(args) if args.command == "env": return command_env(args) if args.command == "dossier": return command_dossier(args) if args.command == "governance": return command_governance(args) if args.command == "line-budget": return command_line_budget(args) if args.command == "rulebook": return command_rulebook(args) if args.command == "mcp-provider": return command_mcp_provider(args) if args.command == "mcp-execute-probe": return command_mcp_execute_probe(args) if args.command == "mcp-transit-ledger": return command_mcp_transit_ledger(args) if args.command == "mcp-contract-artifacts": return command_mcp_contract_artifacts(args) if args.command == "repo-mesh": return command_repo_mesh(args) if args.command == "consolidated-report": return command_consolidated_report(args) if args.command == "mcp-publication-gate": return command_mcp_publication_gate(args) if args.command == "mcp-access-policy": return command_mcp_access_policy(args) if args.command == "workspace-hygiene": return command_workspace_hygiene(args) if args.command == "workspace-hygiene-policy": return command_workspace_hygiene_policy(args) if args.command == "targeted-sync-audit": return command_targeted_sync_audit(args) if args.command == "mcp-admin-route-acceptance": return command_mcp_admin_route_acceptance(args) if args.command == "central-materialization": return command_central_materialization(args) if args.command == "canonical-identity": return command_canonical_identity(args) if args.command == "canonical-migration-plan": return command_canonical_migration_plan(args) if args.command == "institutional-assurance": return command_institutional_assurance(args) if args.command == "router000-exit-orders": return command_router000_exit_orders(args) parser.error(f"unknown command: {args.command}") return 2 if __name__ == "__main__": raise SystemExit(main())