172 lines
6.2 KiB
Python
172 lines
6.2 KiB
Python
"""Generate the canonical identity registry for Mais Humana.
|
|
|
|
The output is importable Python source. It lets tests, CLI commands, MCP
|
|
providers, and central reports validate aliases without rescanning the
|
|
workspace or parsing Markdown during runtime.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
SRC = ROOT / "src"
|
|
OUTPUT = SRC / "mais_humana" / "generated_canonical_identity_registry.py"
|
|
|
|
|
|
def ensure_import_path() -> None:
|
|
import sys
|
|
|
|
src = str(SRC)
|
|
if src not in sys.path:
|
|
sys.path.insert(0, src)
|
|
|
|
|
|
def q(value: object) -> str:
|
|
return repr(str(value))
|
|
|
|
|
|
def bool_literal(value: bool) -> str:
|
|
return "True" if value else "False"
|
|
|
|
|
|
def tuple_literal(values: Iterable[object], *, indent: int = 8) -> str:
|
|
cleaned = [str(value) for value in values if str(value).strip()]
|
|
if not cleaned:
|
|
return "()"
|
|
if len(cleaned) <= 8:
|
|
return "(" + ", ".join(q(value) for value in cleaned) + ("," if len(cleaned) == 1 else "") + ")"
|
|
pad = " " * indent
|
|
lines = ["("]
|
|
for value in cleaned:
|
|
lines.append(f"{pad}{q(value)},")
|
|
lines.append(" " * (indent - 4) + ")")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def alias_block(alias: object, *, indent: int = 12) -> str:
|
|
return "\n".join(
|
|
[
|
|
" " * indent + "CanonicalAlias(",
|
|
" " * (indent + 4) + f"identifier={q(getattr(alias, 'identifier'))},",
|
|
" " * (indent + 4) + f"kind=CanonicalAliasKind.{getattr(alias, 'kind').name},",
|
|
" " * (indent + 4) + f"accepted={bool_literal(getattr(alias, 'accepted'))},",
|
|
" " * (indent + 4) + f"canonical={bool_literal(getattr(alias, 'canonical'))},",
|
|
" " * (indent + 4) + f"reason={q(getattr(alias, 'reason'))},",
|
|
" " * (indent + 4) + f"required_action={q(getattr(alias, 'required_action'))},",
|
|
" " * indent + "),",
|
|
]
|
|
)
|
|
|
|
|
|
def record_block(record: object) -> str:
|
|
alias_lines = [" aliases=("]
|
|
alias_lines.extend(alias_block(alias, indent=12) for alias in getattr(record, "aliases"))
|
|
alias_lines.append(" ),")
|
|
return "\n".join(
|
|
[
|
|
" CanonicalIdentityRecord(",
|
|
f" platform_id={q(getattr(record, 'platform_id'))},",
|
|
f" canonical_project_id={q(getattr(record, 'canonical_project_id'))},",
|
|
f" current_project_id={q(getattr(record, 'current_project_id'))},",
|
|
f" central_folder={q(getattr(record, 'central_folder'))},",
|
|
f" gitea_repo={q(getattr(record, 'gitea_repo'))},",
|
|
f" expected_remote_url={q(getattr(record, 'expected_remote_url'))},",
|
|
f" owner_platform_id={q(getattr(record, 'owner_platform_id'))},",
|
|
*alias_lines,
|
|
f" decision_status={q(getattr(record, 'decision_status'))},",
|
|
f" decision_source={q(getattr(record, 'decision_source'))},",
|
|
f" compatibility_rule={q(getattr(record, 'compatibility_rule'))},",
|
|
f" migration_safe_now={bool_literal(getattr(record, 'migration_safe_now'))},",
|
|
f" notes={tuple_literal(getattr(record, 'notes'), indent=12)},",
|
|
" ),",
|
|
]
|
|
)
|
|
|
|
|
|
def case_block(case: object) -> str:
|
|
return "\n".join(
|
|
[
|
|
" CanonicalIdentityAcceptanceCase(",
|
|
f" case_id={q(getattr(case, 'case_id'))},",
|
|
f" platform_id={q(getattr(case, 'platform_id'))},",
|
|
f" operation={q(getattr(case, 'operation'))},",
|
|
f" permission_scope={q(getattr(case, 'permission_scope'))},",
|
|
f" field_name={q(getattr(case, 'field_name'))},",
|
|
f" candidate_value={q(getattr(case, 'candidate_value'))},",
|
|
f" canonical_project_id={q(getattr(case, 'canonical_project_id'))},",
|
|
f" accepted={bool_literal(getattr(case, 'accepted'))},",
|
|
f" status={q(getattr(case, 'status'))},",
|
|
f" decision_reason={q(getattr(case, 'decision_reason'))},",
|
|
f" required_action={q(getattr(case, 'required_action'))},",
|
|
f" mcp_transit_required={bool_literal(getattr(case, 'mcp_transit_required'))},",
|
|
f" direct_platform_bypass_blocked={bool_literal(getattr(case, 'direct_platform_bypass_blocked'))},",
|
|
" ),",
|
|
]
|
|
)
|
|
|
|
|
|
def main() -> int:
|
|
ensure_import_path()
|
|
from mais_humana.canonical_identity import build_acceptance_cases, build_identity_records
|
|
|
|
records = build_identity_records()
|
|
cases = build_acceptance_cases(records)
|
|
lines = [
|
|
'"""Generated canonical identity registry for Mais Humana.',
|
|
"",
|
|
"Do not edit this file by hand. Regenerate it with:",
|
|
"",
|
|
" python tools/generate_canonical_identity_registry.py",
|
|
"",
|
|
"The registry is source code so it can be imported, tested, packaged,",
|
|
"hashed, and exposed through MCP without parsing Markdown at runtime.",
|
|
'"""',
|
|
"",
|
|
"from __future__ import annotations",
|
|
"",
|
|
"from .canonical_identity import (",
|
|
" CanonicalAlias,",
|
|
" CanonicalAliasKind,",
|
|
" CanonicalIdentityAcceptanceCase,",
|
|
" CanonicalIdentityRecord,",
|
|
")",
|
|
"",
|
|
f"GENERATED_RECORDS_COUNT = {len(records)}",
|
|
f"GENERATED_ACCEPTANCE_CASES_COUNT = {len(cases)}",
|
|
"",
|
|
"RECORDS = (",
|
|
]
|
|
lines.extend(record_block(record) for record in records)
|
|
lines.extend(
|
|
[
|
|
")",
|
|
"",
|
|
"ACCEPTANCE_CASES = (",
|
|
]
|
|
)
|
|
lines.extend(case_block(case) for case in cases)
|
|
lines.extend(
|
|
[
|
|
")",
|
|
"",
|
|
"",
|
|
"def iter_records():",
|
|
" return RECORDS",
|
|
"",
|
|
"",
|
|
"def iter_acceptance_cases():",
|
|
" return ACCEPTANCE_CASES",
|
|
"",
|
|
]
|
|
)
|
|
OUTPUT.write_text("\n".join(lines), encoding="utf-8")
|
|
print(f"generated {OUTPUT} records={len(records)} cases={len(cases)} lines={len(lines)}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|