Files
tudo-para-ia-mais-humana-pl…/src/mais_humana/cli.py

705 lines
34 KiB
Python

"""Command line interface for the Mais Humana platform."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from .models import as_plain_data
from .central_consolidation import run_consolidated_report
from .central_materialization import run_central_materialization
from .canonical_identity import identity_graph_payload, run_canonical_identity_graph
from .canonical_migration import migration_plan_payload, run_canonical_migration_plan
from .matrix import build_global_recommendations, build_matrix, build_platform_reports
from .mcp_contract import build_mcp_contract_report, build_mcp_execute_probe, mcp_provider_compact_json, mcp_provider_payload
from .mcp_contract import (
administration_route_readiness_markdown,
administration_route_readiness_payload,
mcp_contract_csv,
mcp_contract_markdown,
mcp_execute_probe_markdown,
official_report_models_markdown,
same_source_validation_payload,
ui_renderer_policy_markdown,
)
from .mcp_admin_route_acceptance import run_admin_route_acceptance
from .mcp_gateway_access_policy import run_access_policy_gate
from .mcp_publication_gate import run_publication_gate
from .mcp_transit_ledger import build_mcp_transit_ledger, mcp_transit_csv, mcp_transit_ledger_compact_json, mcp_transit_markdown
from .operational_dossier import build_execution_round_dossier
from .governance_engine import build_governance_portfolio, compact_governance_payload
from .human_rulebook import evaluate_rulebook, rulebook_compact_json
from .human_readiness_registry import build_readiness_registry
from .runtime_budget import build_round_line_budget
from .orders import build_exit_orders
from .reports import generate
from .repository_mesh import mesh_summary_payload, run_repository_mesh
from .repository_mesh_reconciliation import apply_reconciliation_to_report, reconciliation_payload
from .repository_mesh_runtime import (
acquire_lock,
build_runtime_cycle,
cron_scheduler_spec,
release_lock,
scheduler_payload,
windows_scheduler_spec,
write_runtime_artifacts,
)
from .repository_mesh_semantic import write_repository_mesh_semantic_state
from .repository_mesh_readiness import build_mesh_readiness_report, write_readiness_artifacts
from .repository_mesh_gitea import build_gitea_mesh_plan, write_gitea_plan_artifacts
from .scanner import environment_summary, scan_ecosystem
from .storage import table_counts
from .targeted_sync_audit import run_targeted_sync_audit
from .workspace_hygiene import run_workspace_hygiene
from .workspace_hygiene_policy import policy_payload, run_hygiene_policy
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="mais-humana", description="Generate human-centered ecosystem reports.")
sub = parser.add_subparsers(dest="command", required=True)
scan = sub.add_parser("scan", help="Scan ecosystem repositories and print compact JSON summary.")
scan.add_argument("--ecosystem-root", default="G:/_codex-git")
gen = sub.add_parser("generate", help="Generate DOCX, SVG, JSON, matrices, and service orders.")
gen.add_argument("--ecosystem-root", default="G:/_codex-git")
gen.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
gen.add_argument("--central-platform-folder", default="")
gen.add_argument("--push-status", default="")
sql = sub.add_parser("sql-counts", help="Print semantic SQLite table counts.")
sql.add_argument("--sqlite", required=True)
env = sub.add_parser("env", help="Print local environment summary.")
env.add_argument("--ecosystem-root", default="G:/_codex-git")
dossier = sub.add_parser("dossier", help="Print operational dossier JSON without writing artifacts.")
dossier.add_argument("--ecosystem-root", default="G:/_codex-git")
dossier.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
governance = sub.add_parser("governance", help="Print compact governance portfolio JSON.")
governance.add_argument("--ecosystem-root", default="G:/_codex-git")
governance.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
budget = sub.add_parser("line-budget", help="Print round line-budget JSON.")
budget.add_argument("--ecosystem-root", default="G:/_codex-git")
budget.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
rulebook = sub.add_parser("rulebook", help="Print compact human-operational rulebook JSON.")
rulebook.add_argument("--ecosystem-root", default="G:/_codex-git")
rulebook.add_argument("--limit", type=int, default=0)
mcp_provider = sub.add_parser("mcp-provider", help="Print the compact Mais Humana MCP provider payload.")
mcp_provider.add_argument("--ecosystem-root", default="G:/_codex-git")
mcp_provider.add_argument("--limit", type=int, default=80)
mcp_provider.add_argument("--envelope", action="store_true")
mcp_probe = sub.add_parser("mcp-execute-probe", help="Print a safe /v1/execute probe for the Mais Humana provider.")
mcp_probe.add_argument("--ecosystem-root", default="G:/_codex-git")
mcp_probe.add_argument("--limit", type=int, default=20)
mcp_probe.add_argument("--observed-status", default="not_executed")
mcp_probe.add_argument("--observed-note", default="request prepared; live execution must record the HTTP status separately")
mcp_transit = sub.add_parser("mcp-transit-ledger", help="Print compact MCP transit ledger for Mais Humana contracts.")
mcp_transit.add_argument("--ecosystem-root", default="G:/_codex-git")
mcp_transit.add_argument("--limit", type=int, default=80)
mcp_artifacts = sub.add_parser("mcp-contract-artifacts", help="Write focused MCP contract artifacts without regenerating DOCX reports.")
mcp_artifacts.add_argument("--ecosystem-root", default="G:/_codex-git")
mcp_artifacts.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
mcp_artifacts.add_argument("--central-platform-folder", default="")
mcp_artifacts.add_argument("--no-scan", action="store_true", help="Use the generated MCP catalog directly, without rescanning repositories.")
repo_mesh = sub.add_parser("repo-mesh", help="Inventory repository mirrors and write safe synchronization artifacts.")
repo_mesh.add_argument("--ecosystem-root", default="G:/_codex-git")
repo_mesh.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
repo_mesh.add_argument("--central-platform-folder", default="")
repo_mesh.add_argument("--fetch", action="store_true")
repo_mesh.add_argument("--plugin-auth-attempt", default="")
consolidated = sub.add_parser("consolidated-report", help="Write consolidated administrative pending report across central projects.")
consolidated.add_argument("--central-projects-root", default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects")
consolidated.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
consolidated.add_argument("--central-platform-folder", default="")
consolidated.add_argument("--plugin-auth-attempt", default="")
consolidated.add_argument("--git-sync-status", default="")
publication = sub.add_parser("mcp-publication-gate", help="Write the Mais Humana MCP publication gate artifacts.")
publication.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
publication.add_argument("--mcp-repo-root", default="G:/_codex-git/tudo-para-ia-mcps-internos-plataform")
publication.add_argument("--central-platform-folder", default="")
publication.add_argument("--wrangler-summary", default="")
publication.add_argument("--git-sync-status", default="")
publication.add_argument("--repo-remote", default="")
publication.add_argument("--bearer", default="")
publication.add_argument("--live-probe", action="store_true")
access_policy = sub.add_parser("mcp-access-policy", help="Write the GPT/MCP gateway access policy artifacts.")
access_policy.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
access_policy.add_argument("--central-platform-folder", default="")
access_policy.add_argument("--publication-gate-json", default="")
hygiene = sub.add_parser("workspace-hygiene", help="Inspect or clean approved local artifacts for closeout.")
hygiene.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
hygiene.add_argument("--central-platform-folder", default="")
hygiene.add_argument("--apply", action="store_true")
hygiene_policy = sub.add_parser("workspace-hygiene-policy", help="Write executable workspace cleanup and ACL retention policy.")
hygiene_policy.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
hygiene_policy.add_argument("--no-generated", action="store_true")
hygiene_policy.add_argument("--limit", type=int, default=40)
sync_audit = sub.add_parser("targeted-sync-audit", help="Write safe Git synchronization audit for the active round repos.")
sync_audit.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
sync_audit.add_argument("--mcp-repo-root", default="G:/_codex-git/tudo-para-ia-mcps-internos-plataform")
sync_audit.add_argument("--central-repo-root", default="G:/_codex-git/nucleo-gestao-operacional")
sync_audit.add_argument("--central-platform-folder", default="")
sync_audit.add_argument("--fetch", action="store_true")
admin_acceptance = sub.add_parser("mcp-admin-route-acceptance", help="Write MCP-only administration route acceptance artifacts.")
admin_acceptance.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
admin_acceptance.add_argument("--central-platform-folder", default="")
admin_acceptance.add_argument("--platform-id", default="")
admin_acceptance.add_argument("--profile-id", default="")
admin_acceptance.add_argument("--operation", default="")
admin_acceptance.add_argument("--status", default="")
admin_acceptance.add_argument("--limit", type=int, default=120)
central_materialization = sub.add_parser("central-materialization", help="Materialize central active/output orders and semantic records.")
central_materialization.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
central_materialization.add_argument(
"--central-platform-folder",
default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects/15_repo_tudo-para-ia-mais-humana-platform",
)
central_materialization.add_argument("--overwrite", action="store_true")
canonical_identity = sub.add_parser("canonical-identity", help="Write canonical identity graph and MCP alias acceptance artifacts.")
canonical_identity.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
canonical_identity.add_argument(
"--central-platform-folder",
default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects/15_repo_tudo-para-ia-mais-humana-platform",
)
canonical_identity.add_argument("--no-generated", action="store_true", help="Build graph from runtime targets instead of generated registry.")
canonical_migration = sub.add_parser("canonical-migration-plan", help="Write canonical-name migration controls and MCP acceptance cases.")
canonical_migration.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
canonical_migration.add_argument(
"--central-platform-folder",
default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects/15_repo_tudo-para-ia-mais-humana-platform",
)
canonical_migration.add_argument("--no-generated", action="store_true", help="Build acceptance cases from runtime repository targets.")
canonical_migration.add_argument("--limit", type=int, default=40)
return parser
def command_scan(args: argparse.Namespace) -> int:
root = Path(args.ecosystem_root)
scans = scan_ecosystem(root)
payload = {
"platforms": [
{
"platform_id": scan.platform.platform_id,
"exists": scan.exists,
"git_present": scan.git_present,
"code_lines": scan.code_lines,
"evidence": len(scan.evidence),
"warnings": scan.warnings,
}
for scan in scans
],
"total_code_lines": sum(scan.code_lines for scan in scans),
"total_evidence": sum(len(scan.evidence) for scan in scans),
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_generate(args: argparse.Namespace) -> int:
central = Path(args.central_platform_folder) if args.central_platform_folder else None
bundle = generate(
ecosystem_root=Path(args.ecosystem_root),
project_root=Path(args.project_root),
central_platform_folder=central,
push_status=args.push_status or None,
)
print(json.dumps(as_plain_data(bundle), ensure_ascii=False, indent=2))
return 0
def command_sql_counts(args: argparse.Namespace) -> int:
print(json.dumps(table_counts(Path(args.sqlite)), ensure_ascii=False, indent=2))
return 0
def command_env(args: argparse.Namespace) -> int:
print(json.dumps(environment_summary(Path(args.ecosystem_root)), ensure_ascii=False, indent=2))
return 0
def command_dossier(args: argparse.Namespace) -> int:
scans = scan_ecosystem(Path(args.ecosystem_root))
cells = build_matrix(scans)
reports = build_platform_reports(scans, cells)
recommendations = build_global_recommendations(reports)
orders = build_exit_orders(recommendations)
dossier = build_execution_round_dossier(
project_root=Path(args.project_root),
platform_reports=reports,
recommendations=recommendations,
output_orders=orders,
total_code_lines_analyzed=sum(scan.code_lines for scan in scans),
)
print(json.dumps(as_plain_data(dossier), ensure_ascii=False, indent=2))
return 0
def command_governance(args: argparse.Namespace) -> int:
scans = scan_ecosystem(Path(args.ecosystem_root))
cells = build_matrix(scans)
reports = build_platform_reports(scans, cells)
recommendations = build_global_recommendations(reports)
orders = build_exit_orders(recommendations)
dossier = build_execution_round_dossier(
project_root=Path(args.project_root),
platform_reports=reports,
recommendations=recommendations,
output_orders=orders,
total_code_lines_analyzed=sum(scan.code_lines for scan in scans),
)
portfolio = build_governance_portfolio(reports, recommendations=recommendations, round_dossier=dossier)
registry = build_readiness_registry(reports, portfolio)
payload = compact_governance_payload(portfolio)
payload["readiness_entries"] = len(registry.entries)
payload["weak_readiness_entries"] = len(registry.weak_entries)
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_line_budget(args: argparse.Namespace) -> int:
budget = build_round_line_budget(Path(args.ecosystem_root), Path(args.project_root))
print(json.dumps(as_plain_data(budget), ensure_ascii=False, indent=2))
return 0
def command_rulebook(args: argparse.Namespace) -> int:
scans = scan_ecosystem(Path(args.ecosystem_root))
cells = build_matrix(scans)
reports = build_platform_reports(scans, cells)
report = evaluate_rulebook(reports, limit=args.limit or None)
print(json.dumps(rulebook_compact_json(report), ensure_ascii=False, indent=2))
return 0
def command_mcp_provider(args: argparse.Namespace) -> int:
scans = scan_ecosystem(Path(args.ecosystem_root))
cells = build_matrix(scans)
reports = build_platform_reports(scans, cells)
rulebook = evaluate_rulebook(reports)
contracts = build_mcp_contract_report(rulebook)
payload = (
mcp_provider_payload(contracts, limit=args.limit)
if args.envelope
else mcp_provider_compact_json(contracts, limit=args.limit)
)
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_mcp_execute_probe(args: argparse.Namespace) -> int:
scans = scan_ecosystem(Path(args.ecosystem_root))
cells = build_matrix(scans)
reports = build_platform_reports(scans, cells)
rulebook = evaluate_rulebook(reports)
contracts = build_mcp_contract_report(rulebook)
probe = build_mcp_execute_probe(
contracts,
limit=args.limit,
observed_status=args.observed_status,
observed_note=args.observed_note,
)
print(json.dumps(as_plain_data(probe), ensure_ascii=False, indent=2))
return 0
def command_mcp_transit_ledger(args: argparse.Namespace) -> int:
scans = scan_ecosystem(Path(args.ecosystem_root))
cells = build_matrix(scans)
reports = build_platform_reports(scans, cells)
rulebook = evaluate_rulebook(reports)
contracts = build_mcp_contract_report(rulebook)
ledger = build_mcp_transit_ledger(contracts)
print(json.dumps(mcp_transit_ledger_compact_json(ledger, limit=args.limit), ensure_ascii=False, indent=2))
return 0
def _write_json(path: Path, payload: object) -> str:
from .models import as_plain_data
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(as_plain_data(payload), ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return str(path)
def _write_text(path: Path, text: str) -> str:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")
return str(path)
def command_mcp_contract_artifacts(args: argparse.Namespace) -> int:
project_root = Path(args.project_root)
central = Path(args.central_platform_folder) if args.central_platform_folder else None
rulebook = None
if not args.no_scan:
scans = scan_ecosystem(Path(args.ecosystem_root))
cells = build_matrix(scans)
reports = build_platform_reports(scans, cells)
rulebook = evaluate_rulebook(reports)
contracts = build_mcp_contract_report(rulebook)
ledger = build_mcp_transit_ledger(contracts)
admin_routes = administration_route_readiness_payload()
probe = build_mcp_execute_probe(contracts)
written = [
_write_json(project_root / "dados" / "mcp-contratos-humanos.json", contracts),
_write_json(project_root / "dados" / "mcp-provider-mais-humana.json", mcp_provider_payload(contracts)),
_write_json(project_root / "dados" / "mcp-provider-mais-humana-compacto.json", mcp_provider_compact_json(contracts)),
_write_json(project_root / "dados" / "mcp-execute-probe-mais-humana.json", probe),
_write_json(project_root / "dados" / "mcp-admin-ui-same-source-validation.json", same_source_validation_payload(contracts)),
_write_json(project_root / "dados" / "mcp-transit-ledger.json", ledger),
_write_json(project_root / "dados" / "mcp-transit-ledger-compacto.json", mcp_transit_ledger_compact_json(ledger)),
_write_json(project_root / "dados" / "mcp-administration-routes-readiness.json", admin_routes),
_write_text(project_root / "ecossistema" / "MCP-PROVIDER-MAIS-HUMANA.md", mcp_contract_markdown(contracts)),
_write_text(project_root / "ecossistema" / "MCP-EXECUTE-PROBE-MAIS-HUMANA.md", mcp_execute_probe_markdown(probe)),
_write_text(project_root / "ecossistema" / "MCP-TRANSIT-LEDGER.md", mcp_transit_markdown(ledger)),
_write_text(project_root / "ecossistema" / "MCP-ADMINISTRATION-ROUTES.md", administration_route_readiness_markdown(admin_routes)),
_write_text(project_root / "ecossistema" / "MODELOS-OFICIAIS-RELATORIO-HUMANO.md", official_report_models_markdown(contracts)),
_write_text(project_root / "ecossistema" / "UI-RENDERER-SAME-SOURCE-POLICY.md", ui_renderer_policy_markdown(contracts)),
_write_text(project_root / "matrizes" / "mcp-contratos-humanos.csv", mcp_contract_csv(contracts)),
_write_text(project_root / "matrizes" / "mcp-transit-ledger.csv", mcp_transit_csv(ledger)),
]
central_error = ""
if central is not None:
try:
written.append(_write_text(central / "reports" / "EXECUTADO__mcp-contract-artifacts.md", mcp_contract_markdown(contracts)))
written.append(_write_text(central / "indexes" / "mcp-administration-routes-index.md", administration_route_readiness_markdown(admin_routes)))
except OSError as exc:
central_error = f"{type(exc).__name__}: {exc}"
payload = {
"contractsCount": contracts.contracts_count,
"coverageCount": len(contracts.coverage),
"transitRecords": ledger.records_count,
"administrationRoutes": admin_routes["contractsCount"],
"scanMode": "generated_catalog_only" if args.no_scan else "ecosystem_scan",
"centralError": central_error,
"generatedFiles": written,
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_repo_mesh(args: argparse.Namespace) -> int:
central = Path(args.central_platform_folder) if args.central_platform_folder else None
central_write_error = ""
central_for_write = central
if central_for_write is not None:
try:
for folder_name in ("reports", "indexes", "audit", "status"):
(central_for_write / folder_name).mkdir(parents=True, exist_ok=True)
probe_path = central_for_write / "reports" / ".repository_mesh_write_probe.tmp"
probe_path.write_text("repository mesh write probe\n", encoding="utf-8")
probe_path.unlink(missing_ok=True)
except OSError as exc:
central_write_error = f"{type(exc).__name__}: {exc}"
central_for_write = None
report, records = run_repository_mesh(
ecosystem_root=Path(args.ecosystem_root),
project_root=Path(args.project_root),
central_platform_folder=central_for_write,
fetch=bool(args.fetch),
plugin_auth_attempt=args.plugin_auth_attempt,
)
plan, reconciliation_records = apply_reconciliation_to_report(
report,
Path(args.project_root),
central_platform_folder=central_for_write,
)
project_root = Path(args.project_root)
lock = acquire_lock(project_root / "dados" / "repository-mesh.lock.json", owner="mais_humana.cli.repo-mesh")
cycle = build_runtime_cycle(report, plan, lock=lock, execute=False)
specs = (
windows_scheduler_spec(
python_exe="C:\\Users\\Ami\\.cache\\codex-runtimes\\codex-primary-runtime\\dependencies\\python\\python.exe",
project_root=project_root,
ecosystem_root=Path(args.ecosystem_root),
central_platform_folder=central_for_write,
),
cron_scheduler_spec(
python_exe="python",
project_root=project_root,
ecosystem_root=Path(args.ecosystem_root),
central_platform_folder=central_for_write,
),
)
runtime_records = write_runtime_artifacts(cycle, specs, project_root, central_platform_folder=central_for_write)
semantic_write_error = ""
semantic_path_used = ""
if central is not None:
try:
central_semantic_path = central / "controle-semantico.sqlite"
semantic_counts = write_repository_mesh_semantic_state(
central_semantic_path,
report=report,
plan=plan,
cycle=cycle,
schedulers=specs,
)
semantic_path_used = str(central_semantic_path)
except Exception as exc:
semantic_write_error = f"{type(exc).__name__}: {exc}"
semantic_counts = write_repository_mesh_semantic_state(
project_root / "controle-semantico.sqlite",
report=report,
plan=plan,
cycle=cycle,
schedulers=specs,
)
semantic_path_used = str(project_root / "controle-semantico.sqlite")
else:
semantic_counts = write_repository_mesh_semantic_state(
project_root / "controle-semantico.sqlite",
report=report,
plan=plan,
cycle=cycle,
schedulers=specs,
)
semantic_path_used = str(project_root / "controle-semantico.sqlite")
readiness = build_mesh_readiness_report(report, plan, cycle, specs, semantic_counts)
readiness_records = write_readiness_artifacts(readiness, project_root, central_platform_folder=central_for_write)
gitea_plan = build_gitea_mesh_plan(report)
gitea_records = write_gitea_plan_artifacts(gitea_plan, project_root, central_platform_folder=central_for_write)
release_lock(lock)
payload = mesh_summary_payload(report)
payload["reconciliation"] = reconciliation_payload(plan)
payload["runtime"] = {
"cycleId": cycle.cycle_id,
"allowed": cycle.allowed_count,
"blocked": cycle.blocked_count,
"skipped": cycle.skipped_count,
"schedulers": scheduler_payload(specs),
}
payload["readiness"] = readiness.to_dict()
payload["gitea"] = gitea_plan.to_dict()
payload["centralWrite"] = {
"requested": str(central) if central is not None else "",
"used": str(central_for_write) if central_for_write is not None else "",
"error": central_write_error,
"semanticPath": semantic_path_used,
"semanticError": semantic_write_error,
}
payload["generatedFiles"] = [
record.path
for record in tuple(records)
+ tuple(reconciliation_records)
+ tuple(runtime_records)
+ tuple(readiness_records)
+ tuple(gitea_records)
]
if central_write_error:
status_path = project_root / "dados" / "repository-mesh-central-write-status.json"
status_path.parent.mkdir(parents=True, exist_ok=True)
status_path.write_text(
json.dumps(payload["centralWrite"], ensure_ascii=False, indent=2, sort_keys=True),
encoding="utf-8",
)
payload["generatedFiles"].append(str(status_path))
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_consolidated_report(args: argparse.Namespace) -> int:
central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None
report, records = run_consolidated_report(
Path(args.central_projects_root),
Path(args.project_root),
central_platform_folder=central_platform_folder,
plugin_auth_attempt=args.plugin_auth_attempt,
git_sync_status=args.git_sync_status,
)
payload = {
"report": report.to_dict(),
"generatedFiles": [record.path for record in records],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_mcp_publication_gate(args: argparse.Namespace) -> int:
central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None
report, records = run_publication_gate(
project_root=Path(args.project_root),
mcp_repo_root=Path(args.mcp_repo_root),
central_platform_folder=central_platform_folder,
wrangler_raw_summary=args.wrangler_summary,
git_sync_status=args.git_sync_status,
repo_remote=args.repo_remote,
bearer=args.bearer,
live_probe=bool(args.live_probe),
)
payload = {
"report": report.to_dict(),
"generatedFiles": [record.path for record in records],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_mcp_access_policy(args: argparse.Namespace) -> int:
central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None
publication_gate_json = Path(args.publication_gate_json) if args.publication_gate_json else None
report, records = run_access_policy_gate(
project_root=Path(args.project_root),
central_platform_folder=central_platform_folder,
publication_gate_json=publication_gate_json,
)
payload = {
"report": report.to_dict(),
"generatedFiles": [record.path for record in records],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_workspace_hygiene(args: argparse.Namespace) -> int:
central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None
report, records = run_workspace_hygiene(
project_root=Path(args.project_root),
central_platform_folder=central_platform_folder,
apply=bool(args.apply),
)
payload = {
"report": report.to_dict(),
"generatedFiles": [record.path for record in records],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_workspace_hygiene_policy(args: argparse.Namespace) -> int:
report, records = run_hygiene_policy(
project_root=Path(args.project_root),
use_generated=not bool(args.no_generated),
)
payload = {
"report": policy_payload(report, limit_cases=int(args.limit)),
"generatedFiles": [record.path for record in records],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_targeted_sync_audit(args: argparse.Namespace) -> int:
central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None
report, records = run_targeted_sync_audit(
project_root=Path(args.project_root),
mcp_repo_root=Path(args.mcp_repo_root),
central_repo_root=Path(args.central_repo_root),
central_platform_folder=central_platform_folder,
fetch=bool(args.fetch),
)
payload = {
"report": report.to_dict(),
"generatedFiles": [record.path for record in records],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_mcp_admin_route_acceptance(args: argparse.Namespace) -> int:
central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None
report, records = run_admin_route_acceptance(
project_root=Path(args.project_root),
central_platform_folder=central_platform_folder,
platform_id=args.platform_id or None,
profile_id=args.profile_id or None,
operation=args.operation or None,
status=args.status or None,
limit=int(args.limit),
)
payload = {
"report": report.to_dict(),
"generatedFiles": [record.path for record in records],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_central_materialization(args: argparse.Namespace) -> int:
report = run_central_materialization(
project_root=Path(args.project_root),
central_platform_folder=Path(args.central_platform_folder),
overwrite=bool(args.overwrite),
)
print(json.dumps(report.to_dict(), ensure_ascii=False, indent=2))
return 0
def command_canonical_identity(args: argparse.Namespace) -> int:
central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None
graph, records = run_canonical_identity_graph(
project_root=Path(args.project_root),
central_platform_folder=central_platform_folder,
use_generated=not bool(args.no_generated),
)
payload = {
"graph": identity_graph_payload(graph, limit_cases=20),
"generatedFiles": [record.path for record in records],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_canonical_migration_plan(args: argparse.Namespace) -> int:
central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None
plan = run_canonical_migration_plan(
project_root=Path(args.project_root),
central_platform_folder=central_platform_folder,
use_generated=not bool(args.no_generated),
)
print(json.dumps(migration_plan_payload(plan, limit_cases=int(args.limit)), ensure_ascii=False, indent=2))
return 0
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.command == "scan":
return command_scan(args)
if args.command == "generate":
return command_generate(args)
if args.command == "sql-counts":
return command_sql_counts(args)
if args.command == "env":
return command_env(args)
if args.command == "dossier":
return command_dossier(args)
if args.command == "governance":
return command_governance(args)
if args.command == "line-budget":
return command_line_budget(args)
if args.command == "rulebook":
return command_rulebook(args)
if args.command == "mcp-provider":
return command_mcp_provider(args)
if args.command == "mcp-execute-probe":
return command_mcp_execute_probe(args)
if args.command == "mcp-transit-ledger":
return command_mcp_transit_ledger(args)
if args.command == "mcp-contract-artifacts":
return command_mcp_contract_artifacts(args)
if args.command == "repo-mesh":
return command_repo_mesh(args)
if args.command == "consolidated-report":
return command_consolidated_report(args)
if args.command == "mcp-publication-gate":
return command_mcp_publication_gate(args)
if args.command == "mcp-access-policy":
return command_mcp_access_policy(args)
if args.command == "workspace-hygiene":
return command_workspace_hygiene(args)
if args.command == "workspace-hygiene-policy":
return command_workspace_hygiene_policy(args)
if args.command == "targeted-sync-audit":
return command_targeted_sync_audit(args)
if args.command == "mcp-admin-route-acceptance":
return command_mcp_admin_route_acceptance(args)
if args.command == "central-materialization":
return command_central_materialization(args)
if args.command == "canonical-identity":
return command_canonical_identity(args)
if args.command == "canonical-migration-plan":
return command_canonical_migration_plan(args)
parser.error(f"unknown command: {args.command}")
return 2
if __name__ == "__main__":
raise SystemExit(main())