118 lines
3.8 KiB
Python
118 lines
3.8 KiB
Python
"""Generate importable canonical migration acceptance cases.
|
|
|
|
The generated module keeps the canonical-name migration matrix available to
|
|
tests and MCP payload generation without rescanning repository declarations on
|
|
each command. Regenerate after changing repository targets, identifier fields,
|
|
or the canonical naming policy.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
SRC = ROOT / "src"
|
|
OUTPUT = SRC / "mais_humana" / "generated_canonical_migration_plan.py"
|
|
|
|
|
|
def ensure_import_path() -> None:
|
|
import sys
|
|
|
|
src = str(SRC)
|
|
if src not in sys.path:
|
|
sys.path.insert(0, src)
|
|
|
|
|
|
def q(value: object) -> str:
|
|
return repr(str(value))
|
|
|
|
|
|
def bool_literal(value: bool) -> str:
|
|
return "True" if value else "False"
|
|
|
|
|
|
def tuple_literal(values: Iterable[object], *, indent: int = 8) -> str:
|
|
cleaned = [str(value) for value in values if str(value).strip()]
|
|
if not cleaned:
|
|
return "()"
|
|
if len(cleaned) <= 8:
|
|
return "(" + ", ".join(q(value) for value in cleaned) + ("," if len(cleaned) == 1 else "") + ")"
|
|
pad = " " * indent
|
|
lines = ["("]
|
|
for value in cleaned:
|
|
lines.append(f"{pad}{q(value)},")
|
|
lines.append(" " * (indent - 4) + ")")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def case_block(case: object) -> str:
|
|
return "\n".join(
|
|
[
|
|
" CanonicalMigrationAcceptanceCase(",
|
|
f" case_id={q(getattr(case, 'case_id'))},",
|
|
f" target_name={q(getattr(case, 'target_name'))},",
|
|
f" field_name={q(getattr(case, 'field_name'))},",
|
|
f" candidate_value={q(getattr(case, 'candidate_value'))},",
|
|
f" canonical_value={q(getattr(case, 'canonical_value'))},",
|
|
f" operation={q(getattr(case, 'operation'))},",
|
|
f" permission_scope={q(getattr(case, 'permission_scope'))},",
|
|
f" accepted={bool_literal(getattr(case, 'accepted'))},",
|
|
f" status=AcceptanceStatus.{getattr(case, 'status').name},",
|
|
f" required_action={q(getattr(case, 'required_action'))},",
|
|
f" reason={q(getattr(case, 'reason'))},",
|
|
f" mcp_transit_required={bool_literal(getattr(case, 'mcp_transit_required'))},",
|
|
f" direct_repository_write_blocked={bool_literal(getattr(case, 'direct_repository_write_blocked'))},",
|
|
" ),",
|
|
]
|
|
)
|
|
|
|
|
|
def main() -> int:
|
|
ensure_import_path()
|
|
from mais_humana.canonical_migration import build_migration_acceptance_cases
|
|
|
|
cases = build_migration_acceptance_cases()
|
|
lines = [
|
|
'"""Generated canonical migration plan for Mais Humana.',
|
|
"",
|
|
"Do not edit this file by hand. Regenerate it with:",
|
|
"",
|
|
" python tools/generate_canonical_migration_plan.py",
|
|
"",
|
|
"The registry is source code so it can be imported, tested, packaged,",
|
|
"hashed, and exposed through MCP without rescanning repository targets.",
|
|
'"""',
|
|
"",
|
|
"from __future__ import annotations",
|
|
"",
|
|
"from .canonical_migration import (",
|
|
" AcceptanceStatus,",
|
|
" CanonicalMigrationAcceptanceCase,",
|
|
")",
|
|
"",
|
|
f"GENERATED_ACCEPTANCE_CASES_COUNT = {len(cases)}",
|
|
"",
|
|
"ACCEPTANCE_CASES = (",
|
|
]
|
|
lines.extend(case_block(case) for case in cases)
|
|
lines.extend(
|
|
[
|
|
")",
|
|
"",
|
|
"",
|
|
"def iter_acceptance_cases():",
|
|
" return ACCEPTANCE_CASES",
|
|
"",
|
|
]
|
|
)
|
|
OUTPUT.write_text("\n".join(lines), encoding="utf-8")
|
|
print(f"generated {OUTPUT} cases={len(cases)} lines={len(lines)}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|