diff --git a/src/mais_humana/canonical_identity.py b/src/mais_humana/canonical_identity.py new file mode 100644 index 0000000..f584712 --- /dev/null +++ b/src/mais_humana/canonical_identity.py @@ -0,0 +1,842 @@ +"""Canonical identity graph for repository, MCP, and central aliases. + +The Mais Humana platform now has an institutional canonical name: +``tudo-para-ia-mais-humana-platform``. The physical repository is still +materialized as the historical no-suffix folder in this workspace, and other +platforms also carry ``platform``/``plataform`` history. This module makes +that identity policy executable: it builds a graph of accepted identifiers, +validates MCP payloads, and writes auditable artifacts for the central dossier. +""" + +from __future__ import annotations + +import csv +import io +import json +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Any, Iterable, Mapping, Sequence + +from .identity_policy import ( + CANONICAL_COMPATIBILITY_RULE, + CANONICAL_DECISION_SOURCE, + CANONICAL_PROJECT_ID, + CENTRAL_FOLDER_NAME, + CURRENT_PROJECT_ID, + LEGACY_PLATAFORM_ALIAS, + MCP_CONTROL_PLANE_ID, +) +from .models import GeneratedFile, as_plain_data, merge_unique, slugify, utc_now +from .repository_mesh import RepositoryTarget, default_repository_targets, stable_digest +from .repository_mesh_naming import plataform_to_platform, platform_to_plataform + + +class CanonicalAliasKind(str, Enum): + """Kinds of identifiers accepted by the graph.""" + + CANONICAL_PROJECT_ID = "canonical_project_id" + CURRENT_PROJECT_ID = "current_project_id" + LEGACY_PROJECT_ID = "legacy_project_id" + SPELLING_VARIANT = "spelling_variant" + CENTRAL_FOLDER = "central_folder" + REMOTE_URL = "remote_url" + GITEA_REPOSITORY = "gitea_repository" + + +class IdentityIssueSeverity(str, Enum): + """Severity for identity validation issues.""" + + INFO = "info" + WARNING = "warning" + BLOCKER = "blocker" + + +CANONICAL_REQUIRED_FIELDS = { + "canonicalProjectId", + "ownerPlatformId", + "owner_platform_id", +} + +COMPATIBILITY_IDENTIFIER_FIELDS = { + "projectId", + "project_id", + "currentProjectId", + "current_project_id", + "platformId", + "platform_id", + "origin", + "destination", + "targetPlatformId", + "target_platform_id", + "repositoryName", + "repoName", + "repo_name", +} + +REMOTE_IDENTIFIER_FIELDS = { + "repoRemote", + "remoteOrigin", + "remote_origin", + "originRemote", +} + +CENTRAL_IDENTIFIER_FIELDS = { + "centralFolder", + "central_folder", + "centralPlatformFolder", + "central_platform_folder", +} + +MCP_ADMIN_OPERATIONS: tuple[tuple[str, str], ...] = ( + ("consulta", "mcp.admin.readonly"), + ("diagnostico", "mcp.admin.diagnostic"), + ("acao", "mcp.admin.action.request"), + ("auditoria", "mcp.admin.audit"), + ("explicacao", "mcp.admin.explain"), +) + +IDENTITY_TRANSIT_FIELDS: tuple[str, ...] = ( + "origin", + "destination", + "ownerPlatformId", + "targetPlatformId", + "projectId", + "canonicalProjectId", + "currentProjectId", + "repositoryName", +) + + +@dataclass(frozen=True, slots=True) +class CanonicalAlias: + """One accepted alias or locator for a platform identity.""" + + identifier: str + kind: CanonicalAliasKind + accepted: bool + canonical: bool + reason: str + required_action: str = "" + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class CanonicalIdentityRecord: + """Canonical identity for one managed repository or platform.""" + + platform_id: str + canonical_project_id: str + current_project_id: str + central_folder: str + gitea_repo: str + expected_remote_url: str + owner_platform_id: str + aliases: tuple[CanonicalAlias, ...] + decision_status: str + decision_source: str + compatibility_rule: str + migration_safe_now: bool + notes: tuple[str, ...] = () + + @property + def accepted_identifiers(self) -> tuple[str, ...]: + return merge_unique(alias.identifier for alias in self.aliases if alias.accepted) + + @property + def canonical_aliases(self) -> tuple[CanonicalAlias, ...]: + return tuple(alias for alias in self.aliases if alias.canonical) + + def alias_for(self, identifier: str) -> CanonicalAlias | None: + normalized = normalize_identifier(identifier) + for alias in self.aliases: + if normalize_identifier(alias.identifier) == normalized: + return alias + return None + + def canonicalize(self, identifier: str) -> str: + alias = self.alias_for(identifier) + if alias and alias.accepted: + return self.canonical_project_id + return identifier + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class CanonicalIdentityAcceptanceCase: + """Generated acceptance case for MCP transit and payload identifiers.""" + + case_id: str + platform_id: str + operation: str + permission_scope: str + field_name: str + candidate_value: str + canonical_project_id: str + accepted: bool + status: str + decision_reason: str + required_action: str + mcp_transit_required: bool + direct_platform_bypass_blocked: bool + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class IdentityValidationIssue: + """One validation issue found in a payload.""" + + field_name: str + value: str + severity: IdentityIssueSeverity + message: str + canonical_project_id: str = "" + required_action: str = "" + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class IdentityValidationResult: + """Result of validating identifiers present in one payload.""" + + ok: bool + canonical_project_ids: tuple[str, ...] + accepted_aliases: tuple[str, ...] + issues: tuple[IdentityValidationIssue, ...] + + @property + def blockers(self) -> tuple[IdentityValidationIssue, ...]: + return tuple(issue for issue in self.issues if issue.severity == IdentityIssueSeverity.BLOCKER) + + @property + def warnings(self) -> tuple[IdentityValidationIssue, ...]: + return tuple(issue for issue in self.issues if issue.severity == IdentityIssueSeverity.WARNING) + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class CanonicalIdentityGraph: + """Full identity graph and generated acceptance cases.""" + + graph_id: str + generated_at: str + records: tuple[CanonicalIdentityRecord, ...] + acceptance_cases: tuple[CanonicalIdentityAcceptanceCase, ...] + decision_source: str + compatibility_rule: str + + @property + def records_count(self) -> int: + return len(self.records) + + @property + def aliases_count(self) -> int: + return sum(len(record.aliases) for record in self.records) + + @property + def accepted_cases_count(self) -> int: + return sum(1 for case in self.acceptance_cases if case.accepted) + + @property + def blocked_cases_count(self) -> int: + return sum(1 for case in self.acceptance_cases if not case.accepted) + + def record_for(self, identifier: str) -> CanonicalIdentityRecord | None: + normalized = normalize_identifier(identifier) + for record in self.records: + if normalize_identifier(record.canonical_project_id) == normalized: + return record + if normalize_identifier(record.current_project_id) == normalized: + return record + if normalize_identifier(record.central_folder) == normalized: + return record + if normalize_identifier(record.expected_remote_url) == normalized: + return record + if normalize_identifier(record.gitea_repo) == normalized: + return record + if record.alias_for(identifier): + return record + return None + + def canonicalize(self, identifier: str) -> str: + record = self.record_for(identifier) + if record is None: + return identifier + return record.canonicalize(identifier) + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +def normalize_identifier(value: str) -> str: + """Normalize path-ish, URL-ish, and slug-ish identifiers for lookup.""" + + text = str(value or "").strip().replace("\\", "/").rstrip("/") + if text.endswith(".git"): + text = text[: -len(".git")] + return text.lower() + + +def platform_id_from_repo_name(name: str) -> str: + text = str(name).strip() + if text.startswith("tudo-para-ia-"): + text = text[len("tudo-para-ia-") :] + for suffix in ("-platform", "-plataform"): + if text.endswith(suffix): + text = text[: -len(suffix)] + break + return text.replace("-", "_") + + +def _alias(identifier: str, kind: CanonicalAliasKind, canonical: bool, reason: str, required_action: str = "") -> CanonicalAlias: + return CanonicalAlias( + identifier=identifier, + kind=kind, + accepted=True, + canonical=canonical, + reason=reason, + required_action=required_action, + ) + + +def aliases_for_target(target: RepositoryTarget) -> tuple[CanonicalAlias, ...]: + """Build accepted aliases for one repository target.""" + + canonical_id = target.canonical_name or target.expected_local_name + raw_values: list[CanonicalAlias] = [ + _alias( + canonical_id, + CanonicalAliasKind.CANONICAL_PROJECT_ID, + True, + "identificador canonico do repositorio/plataforma", + ), + _alias( + target.expected_local_name, + CanonicalAliasKind.CURRENT_PROJECT_ID, + target.expected_local_name == canonical_id, + "nome local esperado pelo inventario de sincronizacao", + "usar canonico em ownerPlatformId quando houver divergencia", + ), + _alias( + target.declared_name, + CanonicalAliasKind.CURRENT_PROJECT_ID, + target.declared_name == canonical_id, + "nome declarado pela ordem permanente de sincronizacao", + "registrar divergencia se diferente do canonico", + ), + _alias( + target.central_folder, + CanonicalAliasKind.CENTRAL_FOLDER, + False, + "pasta gerencial da central de ordem de servico", + "nao usar pasta central como ownerPlatformId", + ), + _alias( + target.gitea_repo, + CanonicalAliasKind.GITEA_REPOSITORY, + False, + "repositorio Gitea esperado", + "normalizar para remote URL antes de publicar evidencia", + ), + _alias( + target.expected_remote_url, + CanonicalAliasKind.REMOTE_URL, + False, + "remote HTTPS esperado", + "validar credencial Git antes de sincronizar", + ), + ] + for alias in target.aliases: + raw_values.append( + _alias( + alias, + CanonicalAliasKind.LEGACY_PROJECT_ID, + alias == canonical_id, + "alias historico autorizado para compatibilidade", + "preservar alias ate migracao Git/MCP coordenada", + ) + ) + for variant in (platform_to_plataform(canonical_id), plataform_to_platform(canonical_id)): + if variant != canonical_id: + raw_values.append( + _alias( + variant, + CanonicalAliasKind.SPELLING_VARIANT, + False, + "variante platform/plataform reconhecida para evitar repositorio duplicado", + "registrar como alias e nao criar repositorio paralelo", + ) + ) + by_identifier: dict[str, CanonicalAlias] = {} + for item in raw_values: + key = normalize_identifier(item.identifier) + if key not in by_identifier or item.canonical: + by_identifier[key] = item + return tuple(by_identifier.values()) + + +def record_for_target(target: RepositoryTarget) -> CanonicalIdentityRecord: + canonical_id = target.canonical_name or target.expected_local_name + platform_id = platform_id_from_repo_name(canonical_id) + if canonical_id == CANONICAL_PROJECT_ID: + platform_id = "mais_humana" + current_id = CURRENT_PROJECT_ID if canonical_id == CANONICAL_PROJECT_ID else target.expected_local_name + decision_source = CANONICAL_DECISION_SOURCE if canonical_id == CANONICAL_PROJECT_ID else "000_sincronizacao-dos-espelhos.md" + compatibility_rule = ( + CANONICAL_COMPATIBILITY_RULE + if canonical_id == CANONICAL_PROJECT_ID + else "Identidade aceita conforme inventario permanente; variantes platform/plataform sao aliases ate reconciliacao segura." + ) + migration_safe_now = bool(canonical_id == target.expected_local_name and not target.requires_nominal_reconciliation) + return CanonicalIdentityRecord( + platform_id=platform_id, + canonical_project_id=canonical_id, + current_project_id=current_id, + central_folder=target.central_folder, + gitea_repo=target.gitea_repo, + expected_remote_url=target.expected_remote_url, + owner_platform_id=canonical_id, + aliases=aliases_for_target(target), + decision_status="approved" if canonical_id == CANONICAL_PROJECT_ID else "inventory_declared", + decision_source=decision_source, + compatibility_rule=compatibility_rule, + migration_safe_now=migration_safe_now, + notes=target.notes, + ) + + +def build_identity_records(targets: Sequence[RepositoryTarget] | None = None) -> tuple[CanonicalIdentityRecord, ...]: + """Build identity records from repository targets.""" + + return tuple(record_for_target(target) for target in (targets or default_repository_targets())) + + +def build_acceptance_cases(records: Sequence[CanonicalIdentityRecord]) -> tuple[CanonicalIdentityAcceptanceCase, ...]: + """Build exhaustive MCP transit acceptance cases for records and aliases.""" + + cases: list[CanonicalIdentityAcceptanceCase] = [] + for record in records: + candidates = tuple(record.aliases) + for operation, permission in MCP_ADMIN_OPERATIONS: + for field in IDENTITY_TRANSIT_FIELDS: + for alias in candidates: + candidate = alias.identifier + accepted = alias.accepted + canonical_field = field in CANONICAL_REQUIRED_FIELDS + status = "canonical" if alias.canonical else "compatibility_alias" + required_action = alias.required_action + if canonical_field and candidate != record.canonical_project_id: + status = "canonical_field_requires_rewrite" + required_action = "reescrever campo canonico para canonicalProjectId/ownerPlatformId antes de persistir" + case_seed = { + "platform": record.platform_id, + "operation": operation, + "field": field, + "candidate": candidate, + "canonical": record.canonical_project_id, + } + cases.append( + CanonicalIdentityAcceptanceCase( + case_id=f"identity-{stable_digest(case_seed, 20)}", + platform_id=record.platform_id, + operation=operation, + permission_scope=permission, + field_name=field, + candidate_value=candidate, + canonical_project_id=record.canonical_project_id, + accepted=accepted, + status=status, + decision_reason=alias.reason, + required_action=required_action, + mcp_transit_required=True, + direct_platform_bypass_blocked=True, + ) + ) + return tuple(cases) + + +def _generated_records_and_cases() -> tuple[tuple[CanonicalIdentityRecord, ...], tuple[CanonicalIdentityAcceptanceCase, ...]] | None: + try: + from .generated_canonical_identity_registry import iter_acceptance_cases, iter_records + except ImportError: + return None + return tuple(iter_records()), tuple(iter_acceptance_cases()) + + +def build_identity_graph(*, use_generated: bool = True) -> CanonicalIdentityGraph: + """Build the graph, preferring the generated registry when present.""" + + generated = _generated_records_and_cases() if use_generated else None + if generated is None: + records = build_identity_records() + cases = build_acceptance_cases(records) + else: + records, cases = generated + seed = { + "records": [record.canonical_project_id for record in records], + "aliases": sum(len(record.aliases) for record in records), + "cases": len(cases), + "decision": CANONICAL_DECISION_SOURCE, + } + return CanonicalIdentityGraph( + graph_id=f"canonical-identity-{stable_digest(seed, 16)}", + generated_at=utc_now(), + records=records, + acceptance_cases=cases, + decision_source=CANONICAL_DECISION_SOURCE, + compatibility_rule=CANONICAL_COMPATIBILITY_RULE, + ) + + +def _iter_payload_items(payload: Mapping[str, Any], prefix: str = "") -> Iterable[tuple[str, str]]: + for key, value in payload.items(): + field = f"{prefix}.{key}" if prefix else str(key) + if isinstance(value, Mapping): + yield from _iter_payload_items(value, field) + elif isinstance(value, (list, tuple)): + for index, item in enumerate(value): + item_field = f"{field}[{index}]" + if isinstance(item, Mapping): + yield from _iter_payload_items(item, item_field) + elif isinstance(item, str): + yield item_field, item + elif isinstance(value, str): + yield field, value + + +def _base_field_name(field_name: str) -> str: + text = field_name.rsplit(".", 1)[-1] + if "[" in text: + text = text.split("[", 1)[0] + return text + + +def _field_is_identity_relevant(field_name: str) -> bool: + base = _base_field_name(field_name) + return ( + base in CANONICAL_REQUIRED_FIELDS + or base in COMPATIBILITY_IDENTIFIER_FIELDS + or base in REMOTE_IDENTIFIER_FIELDS + or base in CENTRAL_IDENTIFIER_FIELDS + ) + + +def validate_identity_payload( + payload: Mapping[str, Any], + *, + graph: CanonicalIdentityGraph | None = None, +) -> IdentityValidationResult: + """Validate identity fields in an MCP or central payload.""" + + identity_graph = graph or build_identity_graph() + issues: list[IdentityValidationIssue] = [] + accepted_aliases: list[str] = [] + canonical_ids: list[str] = [] + for field_name, value in _iter_payload_items(payload): + if not _field_is_identity_relevant(field_name): + continue + base = _base_field_name(field_name) + record = identity_graph.record_for(value) + if record is None: + issues.append( + IdentityValidationIssue( + field_name=field_name, + value=value, + severity=IdentityIssueSeverity.BLOCKER, + message="identificador nao reconhecido no grafo canonico", + required_action="registrar alias institucional ou corrigir payload antes de publicar", + ) + ) + continue + canonical_ids.append(record.canonical_project_id) + alias = record.alias_for(value) + if alias and not alias.canonical: + accepted_aliases.append(alias.identifier) + if base in CANONICAL_REQUIRED_FIELDS and value != record.canonical_project_id: + issues.append( + IdentityValidationIssue( + field_name=field_name, + value=value, + severity=IdentityIssueSeverity.WARNING, + message="campo canonico recebeu alias aceito; reescrita recomendada antes de publicar estado novo", + canonical_project_id=record.canonical_project_id, + required_action="usar canonical_project_id no owner/canonical field e manter alias apenas como compatibilidade", + ) + ) + elif alias and not alias.canonical: + issues.append( + IdentityValidationIssue( + field_name=field_name, + value=value, + severity=IdentityIssueSeverity.INFO, + message="alias aceito por politica de compatibilidade", + canonical_project_id=record.canonical_project_id, + required_action=alias.required_action, + ) + ) + blockers = tuple(issue for issue in issues if issue.severity == IdentityIssueSeverity.BLOCKER) + return IdentityValidationResult( + ok=not blockers, + canonical_project_ids=merge_unique(canonical_ids), + accepted_aliases=merge_unique(accepted_aliases), + issues=tuple(issues), + ) + + +def identity_graph_payload(graph: CanonicalIdentityGraph, *, limit_cases: int = 120) -> dict[str, Any]: + """Return a compact JSON-safe identity graph payload.""" + + return { + "graphId": graph.graph_id, + "generatedAt": graph.generated_at, + "recordsCount": graph.records_count, + "aliasesCount": graph.aliases_count, + "acceptanceCasesCount": len(graph.acceptance_cases), + "acceptedCasesCount": graph.accepted_cases_count, + "blockedCasesCount": graph.blocked_cases_count, + "decisionSource": graph.decision_source, + "compatibilityRule": graph.compatibility_rule, + "controlPlaneId": MCP_CONTROL_PLANE_ID, + "maisHumanaCanonicalProjectId": CANONICAL_PROJECT_ID, + "maisHumanaCurrentProjectId": CURRENT_PROJECT_ID, + "maisHumanaLegacyAlias": LEGACY_PLATAFORM_ALIAS, + "maisHumanaCentralFolder": CENTRAL_FOLDER_NAME, + "records": [record.to_dict() for record in graph.records], + "acceptanceCasesSample": [case.to_dict() for case in graph.acceptance_cases[:limit_cases]], + } + + +def identity_graph_rows(graph: CanonicalIdentityGraph) -> list[list[str]]: + rows = [ + [ + "platform_id", + "canonical_project_id", + "current_project_id", + "central_folder", + "alias", + "alias_kind", + "alias_canonical", + "decision_status", + "migration_safe_now", + "required_action", + ] + ] + for record in graph.records: + for alias in record.aliases: + rows.append( + [ + record.platform_id, + record.canonical_project_id, + record.current_project_id, + record.central_folder, + alias.identifier, + alias.kind.value, + "yes" if alias.canonical else "no", + record.decision_status, + "yes" if record.migration_safe_now else "no", + alias.required_action, + ] + ) + return rows + + +def identity_acceptance_rows(graph: CanonicalIdentityGraph) -> list[list[str]]: + rows = [ + [ + "case_id", + "platform_id", + "operation", + "permission_scope", + "field_name", + "candidate_value", + "canonical_project_id", + "accepted", + "status", + "required_action", + ] + ] + for case in graph.acceptance_cases: + rows.append( + [ + case.case_id, + case.platform_id, + case.operation, + case.permission_scope, + case.field_name, + case.candidate_value, + case.canonical_project_id, + "yes" if case.accepted else "no", + case.status, + case.required_action, + ] + ) + return rows + + +def rows_to_csv(rows: Sequence[Sequence[str]]) -> str: + buffer = io.StringIO() + writer = csv.writer(buffer, lineterminator="\n") + writer.writerows(rows) + return buffer.getvalue() + + +def identity_graph_markdown(graph: CanonicalIdentityGraph) -> str: + lines = [ + "# Canonical Identity Graph", + "", + f"- graph_id: `{graph.graph_id}`", + f"- generated_at: `{graph.generated_at}`", + f"- records: `{graph.records_count}`", + f"- aliases: `{graph.aliases_count}`", + f"- acceptance_cases: `{len(graph.acceptance_cases)}`", + f"- accepted_cases: `{graph.accepted_cases_count}`", + f"- blocked_cases: `{graph.blocked_cases_count}`", + f"- decision_source: `{graph.decision_source}`", + f"- control_plane: `{MCP_CONTROL_PLANE_ID}`", + "", + "## Regra canonica Mais Humana", + "", + f"- canonico: `{CANONICAL_PROJECT_ID}`", + f"- repo_local_historico: `{CURRENT_PROJECT_ID}`", + f"- alias_plataform: `{LEGACY_PLATAFORM_ALIAS}`", + f"- pasta_central: `{CENTRAL_FOLDER_NAME}`", + f"- regra: {CANONICAL_COMPATIBILITY_RULE}", + "", + "## Plataformas", + "", + ] + for record in sorted(graph.records, key=lambda item: item.platform_id): + lines.extend( + [ + f"### {record.platform_id}", + "", + f"- canonical_project_id: `{record.canonical_project_id}`", + f"- current_project_id: `{record.current_project_id}`", + f"- central_folder: `{record.central_folder}`", + f"- remote: `{record.expected_remote_url}`", + f"- migration_safe_now: `{record.migration_safe_now}`", + f"- aliases: `{', '.join(record.accepted_identifiers)}`", + "", + ] + ) + lines.extend( + [ + "## MCP transit", + "", + "- Todo payload interplataforma deve manter origin, destination, tool, payload, actor, permission, result, traceId, auditId e timestamp.", + "- Campos ownerPlatformId/canonicalProjectId devem usar o canonical_project_id; aliases sao aceitos apenas como compatibilidade rastreavel.", + "- Bypass direto da plataforma permanece bloqueado: a administracao passa pelo MCPs Internos.", + ] + ) + return "\n".join(lines).strip() + "\n" + + +def identity_generated_records(project_root: Path, central_platform_folder: Path | None = None) -> tuple[GeneratedFile, ...]: + relation = "0035_EXECUTIVA__reconciliar-nome-canonico-real-alias-platform" + paths = ( + ("dados/canonical-identity-graph.json", "Grafo canonico de identidades e aliases.", "canonical identity graph", "json"), + ("matrizes/canonical-identity-graph.csv", "Matriz de aliases por plataforma.", "canonical identity matrix", "csv"), + ("matrizes/canonical-identity-acceptance-cases.csv", "Casos MCP de aceitacao de aliases.", "canonical identity acceptance", "csv"), + ("ecossistema/CANONICAL-IDENTITY-GRAPH.md", "Relatorio humano do grafo canonico.", "canonical identity report", "markdown"), + ) + records = [ + GeneratedFile( + path=str(project_root / relative), + description=description, + function=function, + file_type=file_type, + changed_by="mais_humana.canonical_identity", + change_summary="Criado ou atualizado grafo canonico de nomes, aliases e casos MCP.", + relation_to_order=relation, + ) + for relative, description, function, file_type in paths + ] + if central_platform_folder is not None: + records.append( + GeneratedFile( + path=str(central_platform_folder / "reports" / "EXECUTADO__canonical-identity-graph.md"), + description="Copia central do grafo canonico de identidade.", + function="canonical identity central report", + file_type="markdown", + changed_by="mais_humana.canonical_identity", + change_summary="Registrada decisao canonica -platform e aliases no dossie central.", + relation_to_order=relation, + ) + ) + return tuple(records) + + +def write_identity_graph_artifacts( + graph: CanonicalIdentityGraph, + project_root: Path, + *, + central_platform_folder: Path | None = None, +) -> tuple[GeneratedFile, ...]: + targets: list[tuple[Path, str]] = [ + (project_root / "dados" / "canonical-identity-graph.json", json.dumps(identity_graph_payload(graph), ensure_ascii=False, indent=2, sort_keys=True)), + (project_root / "matrizes" / "canonical-identity-graph.csv", rows_to_csv(identity_graph_rows(graph))), + (project_root / "matrizes" / "canonical-identity-acceptance-cases.csv", rows_to_csv(identity_acceptance_rows(graph))), + (project_root / "ecossistema" / "CANONICAL-IDENTITY-GRAPH.md", identity_graph_markdown(graph)), + ] + records = list(identity_generated_records(project_root, central_platform_folder)) + central_failures: list[dict[str, str]] = [] + if central_platform_folder is not None: + targets.append((central_platform_folder / "reports" / "EXECUTADO__canonical-identity-graph.md", identity_graph_markdown(graph))) + for path, content in targets: + try: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + except OSError as exc: + if central_platform_folder is not None and central_platform_folder in path.parents: + central_failures.append({"path": str(path), "error": f"{type(exc).__name__}: {exc}"}) + continue + raise + if central_failures: + status_path = project_root / "dados" / "canonical-identity-central-write-status.json" + status_path.write_text( + json.dumps( + { + "generatedAt": utc_now(), + "centralPlatformFolder": str(central_platform_folder), + "ok": False, + "failures": central_failures, + }, + ensure_ascii=False, + indent=2, + sort_keys=True, + ), + encoding="utf-8", + ) + records.append( + GeneratedFile( + path=str(status_path), + description="Status da escrita central do grafo canonico.", + function="canonical identity central write status", + file_type="json", + changed_by="mais_humana.canonical_identity", + change_summary="Registrada falha de escrita central sem abortar artefatos do projeto real.", + relation_to_order="0034_EXECUTIVA__corrigir-acl-escrita-central-e-sql-semantico-plataforma-15", + ) + ) + return tuple(records) + + +def run_canonical_identity_graph( + *, + project_root: Path, + central_platform_folder: Path | None = None, + use_generated: bool = True, +) -> tuple[CanonicalIdentityGraph, tuple[GeneratedFile, ...]]: + graph = build_identity_graph(use_generated=use_generated) + records = write_identity_graph_artifacts(graph, project_root, central_platform_folder=central_platform_folder) + return graph, records diff --git a/src/mais_humana/cli.py b/src/mais_humana/cli.py index 763b3be..1014908 100644 --- a/src/mais_humana/cli.py +++ b/src/mais_humana/cli.py @@ -9,6 +9,7 @@ from pathlib import Path from .models import as_plain_data from .central_consolidation import run_consolidated_report from .central_materialization import run_central_materialization +from .canonical_identity import identity_graph_payload, run_canonical_identity_graph from .matrix import build_global_recommendations, build_matrix, build_platform_reports from .mcp_contract import build_mcp_contract_report, build_mcp_execute_probe, mcp_provider_compact_json, mcp_provider_payload from .mcp_contract import ( @@ -145,6 +146,13 @@ def build_parser() -> argparse.ArgumentParser: default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects/15_repo_tudo-para-ia-mais-humana-platform", ) central_materialization.add_argument("--overwrite", action="store_true") + canonical_identity = sub.add_parser("canonical-identity", help="Write canonical identity graph and MCP alias acceptance artifacts.") + canonical_identity.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana") + canonical_identity.add_argument( + "--central-platform-folder", + default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects/15_repo_tudo-para-ia-mais-humana-platform", + ) + canonical_identity.add_argument("--no-generated", action="store_true", help="Build graph from runtime targets instead of generated registry.") return parser diff --git a/tools/generate_canonical_identity_registry.py b/tools/generate_canonical_identity_registry.py new file mode 100644 index 0000000..70f257e --- /dev/null +++ b/tools/generate_canonical_identity_registry.py @@ -0,0 +1,171 @@ +"""Generate the canonical identity registry for Mais Humana. + +The output is importable Python source. It lets tests, CLI commands, MCP +providers, and central reports validate aliases without rescanning the +workspace or parsing Markdown during runtime. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Iterable + + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +OUTPUT = SRC / "mais_humana" / "generated_canonical_identity_registry.py" + + +def ensure_import_path() -> None: + import sys + + src = str(SRC) + if src not in sys.path: + sys.path.insert(0, src) + + +def q(value: object) -> str: + return repr(str(value)) + + +def bool_literal(value: bool) -> str: + return "True" if value else "False" + + +def tuple_literal(values: Iterable[object], *, indent: int = 8) -> str: + cleaned = [str(value) for value in values if str(value).strip()] + if not cleaned: + return "()" + if len(cleaned) <= 8: + return "(" + ", ".join(q(value) for value in cleaned) + ("," if len(cleaned) == 1 else "") + ")" + pad = " " * indent + lines = ["("] + for value in cleaned: + lines.append(f"{pad}{q(value)},") + lines.append(" " * (indent - 4) + ")") + return "\n".join(lines) + + +def alias_block(alias: object, *, indent: int = 12) -> str: + return "\n".join( + [ + " " * indent + "CanonicalAlias(", + " " * (indent + 4) + f"identifier={q(getattr(alias, 'identifier'))},", + " " * (indent + 4) + f"kind=CanonicalAliasKind.{getattr(alias, 'kind').name},", + " " * (indent + 4) + f"accepted={bool_literal(getattr(alias, 'accepted'))},", + " " * (indent + 4) + f"canonical={bool_literal(getattr(alias, 'canonical'))},", + " " * (indent + 4) + f"reason={q(getattr(alias, 'reason'))},", + " " * (indent + 4) + f"required_action={q(getattr(alias, 'required_action'))},", + " " * indent + "),", + ] + ) + + +def record_block(record: object) -> str: + alias_lines = [" aliases=("] + alias_lines.extend(alias_block(alias, indent=12) for alias in getattr(record, "aliases")) + alias_lines.append(" ),") + return "\n".join( + [ + " CanonicalIdentityRecord(", + f" platform_id={q(getattr(record, 'platform_id'))},", + f" canonical_project_id={q(getattr(record, 'canonical_project_id'))},", + f" current_project_id={q(getattr(record, 'current_project_id'))},", + f" central_folder={q(getattr(record, 'central_folder'))},", + f" gitea_repo={q(getattr(record, 'gitea_repo'))},", + f" expected_remote_url={q(getattr(record, 'expected_remote_url'))},", + f" owner_platform_id={q(getattr(record, 'owner_platform_id'))},", + *alias_lines, + f" decision_status={q(getattr(record, 'decision_status'))},", + f" decision_source={q(getattr(record, 'decision_source'))},", + f" compatibility_rule={q(getattr(record, 'compatibility_rule'))},", + f" migration_safe_now={bool_literal(getattr(record, 'migration_safe_now'))},", + f" notes={tuple_literal(getattr(record, 'notes'), indent=12)},", + " ),", + ] + ) + + +def case_block(case: object) -> str: + return "\n".join( + [ + " CanonicalIdentityAcceptanceCase(", + f" case_id={q(getattr(case, 'case_id'))},", + f" platform_id={q(getattr(case, 'platform_id'))},", + f" operation={q(getattr(case, 'operation'))},", + f" permission_scope={q(getattr(case, 'permission_scope'))},", + f" field_name={q(getattr(case, 'field_name'))},", + f" candidate_value={q(getattr(case, 'candidate_value'))},", + f" canonical_project_id={q(getattr(case, 'canonical_project_id'))},", + f" accepted={bool_literal(getattr(case, 'accepted'))},", + f" status={q(getattr(case, 'status'))},", + f" decision_reason={q(getattr(case, 'decision_reason'))},", + f" required_action={q(getattr(case, 'required_action'))},", + f" mcp_transit_required={bool_literal(getattr(case, 'mcp_transit_required'))},", + f" direct_platform_bypass_blocked={bool_literal(getattr(case, 'direct_platform_bypass_blocked'))},", + " ),", + ] + ) + + +def main() -> int: + ensure_import_path() + from mais_humana.canonical_identity import build_acceptance_cases, build_identity_records + + records = build_identity_records() + cases = build_acceptance_cases(records) + lines = [ + '"""Generated canonical identity registry for Mais Humana.', + "", + "Do not edit this file by hand. Regenerate it with:", + "", + " python tools/generate_canonical_identity_registry.py", + "", + "The registry is source code so it can be imported, tested, packaged,", + "hashed, and exposed through MCP without parsing Markdown at runtime.", + '"""', + "", + "from __future__ import annotations", + "", + "from .canonical_identity import (", + " CanonicalAlias,", + " CanonicalAliasKind,", + " CanonicalIdentityAcceptanceCase,", + " CanonicalIdentityRecord,", + ")", + "", + f"GENERATED_RECORDS_COUNT = {len(records)}", + f"GENERATED_ACCEPTANCE_CASES_COUNT = {len(cases)}", + "", + "RECORDS = (", + ] + lines.extend(record_block(record) for record in records) + lines.extend( + [ + ")", + "", + "ACCEPTANCE_CASES = (", + ] + ) + lines.extend(case_block(case) for case in cases) + lines.extend( + [ + ")", + "", + "", + "def iter_records():", + " return RECORDS", + "", + "", + "def iter_acceptance_cases():", + " return ACCEPTANCE_CASES", + "", + ] + ) + OUTPUT.write_text("\n".join(lines), encoding="utf-8") + print(f"generated {OUTPUT} records={len(records)} cases={len(cases)} lines={len(lines)}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())