auto-sync: tudo-para-ia-mais-humana 2026-05-02 06:19:15

This commit is contained in:
2026-05-02 06:19:15 -03:00
parent 279685f5bb
commit 0de33e11e2
3 changed files with 1021 additions and 0 deletions

View File

@@ -0,0 +1,842 @@
"""Canonical identity graph for repository, MCP, and central aliases.
The Mais Humana platform now has an institutional canonical name:
``tudo-para-ia-mais-humana-platform``. The physical repository is still
materialized as the historical no-suffix folder in this workspace, and other
platforms also carry ``platform``/``plataform`` history. This module makes
that identity policy executable: it builds a graph of accepted identifiers,
validates MCP payloads, and writes auditable artifacts for the central dossier.
"""
from __future__ import annotations
import csv
import io
import json
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Iterable, Mapping, Sequence
from .identity_policy import (
CANONICAL_COMPATIBILITY_RULE,
CANONICAL_DECISION_SOURCE,
CANONICAL_PROJECT_ID,
CENTRAL_FOLDER_NAME,
CURRENT_PROJECT_ID,
LEGACY_PLATAFORM_ALIAS,
MCP_CONTROL_PLANE_ID,
)
from .models import GeneratedFile, as_plain_data, merge_unique, slugify, utc_now
from .repository_mesh import RepositoryTarget, default_repository_targets, stable_digest
from .repository_mesh_naming import plataform_to_platform, platform_to_plataform
class CanonicalAliasKind(str, Enum):
"""Kinds of identifiers accepted by the graph."""
CANONICAL_PROJECT_ID = "canonical_project_id"
CURRENT_PROJECT_ID = "current_project_id"
LEGACY_PROJECT_ID = "legacy_project_id"
SPELLING_VARIANT = "spelling_variant"
CENTRAL_FOLDER = "central_folder"
REMOTE_URL = "remote_url"
GITEA_REPOSITORY = "gitea_repository"
class IdentityIssueSeverity(str, Enum):
"""Severity for identity validation issues."""
INFO = "info"
WARNING = "warning"
BLOCKER = "blocker"
CANONICAL_REQUIRED_FIELDS = {
"canonicalProjectId",
"ownerPlatformId",
"owner_platform_id",
}
COMPATIBILITY_IDENTIFIER_FIELDS = {
"projectId",
"project_id",
"currentProjectId",
"current_project_id",
"platformId",
"platform_id",
"origin",
"destination",
"targetPlatformId",
"target_platform_id",
"repositoryName",
"repoName",
"repo_name",
}
REMOTE_IDENTIFIER_FIELDS = {
"repoRemote",
"remoteOrigin",
"remote_origin",
"originRemote",
}
CENTRAL_IDENTIFIER_FIELDS = {
"centralFolder",
"central_folder",
"centralPlatformFolder",
"central_platform_folder",
}
MCP_ADMIN_OPERATIONS: tuple[tuple[str, str], ...] = (
("consulta", "mcp.admin.readonly"),
("diagnostico", "mcp.admin.diagnostic"),
("acao", "mcp.admin.action.request"),
("auditoria", "mcp.admin.audit"),
("explicacao", "mcp.admin.explain"),
)
IDENTITY_TRANSIT_FIELDS: tuple[str, ...] = (
"origin",
"destination",
"ownerPlatformId",
"targetPlatformId",
"projectId",
"canonicalProjectId",
"currentProjectId",
"repositoryName",
)
@dataclass(frozen=True, slots=True)
class CanonicalAlias:
"""One accepted alias or locator for a platform identity."""
identifier: str
kind: CanonicalAliasKind
accepted: bool
canonical: bool
reason: str
required_action: str = ""
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class CanonicalIdentityRecord:
"""Canonical identity for one managed repository or platform."""
platform_id: str
canonical_project_id: str
current_project_id: str
central_folder: str
gitea_repo: str
expected_remote_url: str
owner_platform_id: str
aliases: tuple[CanonicalAlias, ...]
decision_status: str
decision_source: str
compatibility_rule: str
migration_safe_now: bool
notes: tuple[str, ...] = ()
@property
def accepted_identifiers(self) -> tuple[str, ...]:
return merge_unique(alias.identifier for alias in self.aliases if alias.accepted)
@property
def canonical_aliases(self) -> tuple[CanonicalAlias, ...]:
return tuple(alias for alias in self.aliases if alias.canonical)
def alias_for(self, identifier: str) -> CanonicalAlias | None:
normalized = normalize_identifier(identifier)
for alias in self.aliases:
if normalize_identifier(alias.identifier) == normalized:
return alias
return None
def canonicalize(self, identifier: str) -> str:
alias = self.alias_for(identifier)
if alias and alias.accepted:
return self.canonical_project_id
return identifier
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class CanonicalIdentityAcceptanceCase:
"""Generated acceptance case for MCP transit and payload identifiers."""
case_id: str
platform_id: str
operation: str
permission_scope: str
field_name: str
candidate_value: str
canonical_project_id: str
accepted: bool
status: str
decision_reason: str
required_action: str
mcp_transit_required: bool
direct_platform_bypass_blocked: bool
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class IdentityValidationIssue:
"""One validation issue found in a payload."""
field_name: str
value: str
severity: IdentityIssueSeverity
message: str
canonical_project_id: str = ""
required_action: str = ""
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class IdentityValidationResult:
"""Result of validating identifiers present in one payload."""
ok: bool
canonical_project_ids: tuple[str, ...]
accepted_aliases: tuple[str, ...]
issues: tuple[IdentityValidationIssue, ...]
@property
def blockers(self) -> tuple[IdentityValidationIssue, ...]:
return tuple(issue for issue in self.issues if issue.severity == IdentityIssueSeverity.BLOCKER)
@property
def warnings(self) -> tuple[IdentityValidationIssue, ...]:
return tuple(issue for issue in self.issues if issue.severity == IdentityIssueSeverity.WARNING)
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class CanonicalIdentityGraph:
"""Full identity graph and generated acceptance cases."""
graph_id: str
generated_at: str
records: tuple[CanonicalIdentityRecord, ...]
acceptance_cases: tuple[CanonicalIdentityAcceptanceCase, ...]
decision_source: str
compatibility_rule: str
@property
def records_count(self) -> int:
return len(self.records)
@property
def aliases_count(self) -> int:
return sum(len(record.aliases) for record in self.records)
@property
def accepted_cases_count(self) -> int:
return sum(1 for case in self.acceptance_cases if case.accepted)
@property
def blocked_cases_count(self) -> int:
return sum(1 for case in self.acceptance_cases if not case.accepted)
def record_for(self, identifier: str) -> CanonicalIdentityRecord | None:
normalized = normalize_identifier(identifier)
for record in self.records:
if normalize_identifier(record.canonical_project_id) == normalized:
return record
if normalize_identifier(record.current_project_id) == normalized:
return record
if normalize_identifier(record.central_folder) == normalized:
return record
if normalize_identifier(record.expected_remote_url) == normalized:
return record
if normalize_identifier(record.gitea_repo) == normalized:
return record
if record.alias_for(identifier):
return record
return None
def canonicalize(self, identifier: str) -> str:
record = self.record_for(identifier)
if record is None:
return identifier
return record.canonicalize(identifier)
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
def normalize_identifier(value: str) -> str:
"""Normalize path-ish, URL-ish, and slug-ish identifiers for lookup."""
text = str(value or "").strip().replace("\\", "/").rstrip("/")
if text.endswith(".git"):
text = text[: -len(".git")]
return text.lower()
def platform_id_from_repo_name(name: str) -> str:
text = str(name).strip()
if text.startswith("tudo-para-ia-"):
text = text[len("tudo-para-ia-") :]
for suffix in ("-platform", "-plataform"):
if text.endswith(suffix):
text = text[: -len(suffix)]
break
return text.replace("-", "_")
def _alias(identifier: str, kind: CanonicalAliasKind, canonical: bool, reason: str, required_action: str = "") -> CanonicalAlias:
return CanonicalAlias(
identifier=identifier,
kind=kind,
accepted=True,
canonical=canonical,
reason=reason,
required_action=required_action,
)
def aliases_for_target(target: RepositoryTarget) -> tuple[CanonicalAlias, ...]:
"""Build accepted aliases for one repository target."""
canonical_id = target.canonical_name or target.expected_local_name
raw_values: list[CanonicalAlias] = [
_alias(
canonical_id,
CanonicalAliasKind.CANONICAL_PROJECT_ID,
True,
"identificador canonico do repositorio/plataforma",
),
_alias(
target.expected_local_name,
CanonicalAliasKind.CURRENT_PROJECT_ID,
target.expected_local_name == canonical_id,
"nome local esperado pelo inventario de sincronizacao",
"usar canonico em ownerPlatformId quando houver divergencia",
),
_alias(
target.declared_name,
CanonicalAliasKind.CURRENT_PROJECT_ID,
target.declared_name == canonical_id,
"nome declarado pela ordem permanente de sincronizacao",
"registrar divergencia se diferente do canonico",
),
_alias(
target.central_folder,
CanonicalAliasKind.CENTRAL_FOLDER,
False,
"pasta gerencial da central de ordem de servico",
"nao usar pasta central como ownerPlatformId",
),
_alias(
target.gitea_repo,
CanonicalAliasKind.GITEA_REPOSITORY,
False,
"repositorio Gitea esperado",
"normalizar para remote URL antes de publicar evidencia",
),
_alias(
target.expected_remote_url,
CanonicalAliasKind.REMOTE_URL,
False,
"remote HTTPS esperado",
"validar credencial Git antes de sincronizar",
),
]
for alias in target.aliases:
raw_values.append(
_alias(
alias,
CanonicalAliasKind.LEGACY_PROJECT_ID,
alias == canonical_id,
"alias historico autorizado para compatibilidade",
"preservar alias ate migracao Git/MCP coordenada",
)
)
for variant in (platform_to_plataform(canonical_id), plataform_to_platform(canonical_id)):
if variant != canonical_id:
raw_values.append(
_alias(
variant,
CanonicalAliasKind.SPELLING_VARIANT,
False,
"variante platform/plataform reconhecida para evitar repositorio duplicado",
"registrar como alias e nao criar repositorio paralelo",
)
)
by_identifier: dict[str, CanonicalAlias] = {}
for item in raw_values:
key = normalize_identifier(item.identifier)
if key not in by_identifier or item.canonical:
by_identifier[key] = item
return tuple(by_identifier.values())
def record_for_target(target: RepositoryTarget) -> CanonicalIdentityRecord:
canonical_id = target.canonical_name or target.expected_local_name
platform_id = platform_id_from_repo_name(canonical_id)
if canonical_id == CANONICAL_PROJECT_ID:
platform_id = "mais_humana"
current_id = CURRENT_PROJECT_ID if canonical_id == CANONICAL_PROJECT_ID else target.expected_local_name
decision_source = CANONICAL_DECISION_SOURCE if canonical_id == CANONICAL_PROJECT_ID else "000_sincronizacao-dos-espelhos.md"
compatibility_rule = (
CANONICAL_COMPATIBILITY_RULE
if canonical_id == CANONICAL_PROJECT_ID
else "Identidade aceita conforme inventario permanente; variantes platform/plataform sao aliases ate reconciliacao segura."
)
migration_safe_now = bool(canonical_id == target.expected_local_name and not target.requires_nominal_reconciliation)
return CanonicalIdentityRecord(
platform_id=platform_id,
canonical_project_id=canonical_id,
current_project_id=current_id,
central_folder=target.central_folder,
gitea_repo=target.gitea_repo,
expected_remote_url=target.expected_remote_url,
owner_platform_id=canonical_id,
aliases=aliases_for_target(target),
decision_status="approved" if canonical_id == CANONICAL_PROJECT_ID else "inventory_declared",
decision_source=decision_source,
compatibility_rule=compatibility_rule,
migration_safe_now=migration_safe_now,
notes=target.notes,
)
def build_identity_records(targets: Sequence[RepositoryTarget] | None = None) -> tuple[CanonicalIdentityRecord, ...]:
"""Build identity records from repository targets."""
return tuple(record_for_target(target) for target in (targets or default_repository_targets()))
def build_acceptance_cases(records: Sequence[CanonicalIdentityRecord]) -> tuple[CanonicalIdentityAcceptanceCase, ...]:
"""Build exhaustive MCP transit acceptance cases for records and aliases."""
cases: list[CanonicalIdentityAcceptanceCase] = []
for record in records:
candidates = tuple(record.aliases)
for operation, permission in MCP_ADMIN_OPERATIONS:
for field in IDENTITY_TRANSIT_FIELDS:
for alias in candidates:
candidate = alias.identifier
accepted = alias.accepted
canonical_field = field in CANONICAL_REQUIRED_FIELDS
status = "canonical" if alias.canonical else "compatibility_alias"
required_action = alias.required_action
if canonical_field and candidate != record.canonical_project_id:
status = "canonical_field_requires_rewrite"
required_action = "reescrever campo canonico para canonicalProjectId/ownerPlatformId antes de persistir"
case_seed = {
"platform": record.platform_id,
"operation": operation,
"field": field,
"candidate": candidate,
"canonical": record.canonical_project_id,
}
cases.append(
CanonicalIdentityAcceptanceCase(
case_id=f"identity-{stable_digest(case_seed, 20)}",
platform_id=record.platform_id,
operation=operation,
permission_scope=permission,
field_name=field,
candidate_value=candidate,
canonical_project_id=record.canonical_project_id,
accepted=accepted,
status=status,
decision_reason=alias.reason,
required_action=required_action,
mcp_transit_required=True,
direct_platform_bypass_blocked=True,
)
)
return tuple(cases)
def _generated_records_and_cases() -> tuple[tuple[CanonicalIdentityRecord, ...], tuple[CanonicalIdentityAcceptanceCase, ...]] | None:
try:
from .generated_canonical_identity_registry import iter_acceptance_cases, iter_records
except ImportError:
return None
return tuple(iter_records()), tuple(iter_acceptance_cases())
def build_identity_graph(*, use_generated: bool = True) -> CanonicalIdentityGraph:
"""Build the graph, preferring the generated registry when present."""
generated = _generated_records_and_cases() if use_generated else None
if generated is None:
records = build_identity_records()
cases = build_acceptance_cases(records)
else:
records, cases = generated
seed = {
"records": [record.canonical_project_id for record in records],
"aliases": sum(len(record.aliases) for record in records),
"cases": len(cases),
"decision": CANONICAL_DECISION_SOURCE,
}
return CanonicalIdentityGraph(
graph_id=f"canonical-identity-{stable_digest(seed, 16)}",
generated_at=utc_now(),
records=records,
acceptance_cases=cases,
decision_source=CANONICAL_DECISION_SOURCE,
compatibility_rule=CANONICAL_COMPATIBILITY_RULE,
)
def _iter_payload_items(payload: Mapping[str, Any], prefix: str = "") -> Iterable[tuple[str, str]]:
for key, value in payload.items():
field = f"{prefix}.{key}" if prefix else str(key)
if isinstance(value, Mapping):
yield from _iter_payload_items(value, field)
elif isinstance(value, (list, tuple)):
for index, item in enumerate(value):
item_field = f"{field}[{index}]"
if isinstance(item, Mapping):
yield from _iter_payload_items(item, item_field)
elif isinstance(item, str):
yield item_field, item
elif isinstance(value, str):
yield field, value
def _base_field_name(field_name: str) -> str:
text = field_name.rsplit(".", 1)[-1]
if "[" in text:
text = text.split("[", 1)[0]
return text
def _field_is_identity_relevant(field_name: str) -> bool:
base = _base_field_name(field_name)
return (
base in CANONICAL_REQUIRED_FIELDS
or base in COMPATIBILITY_IDENTIFIER_FIELDS
or base in REMOTE_IDENTIFIER_FIELDS
or base in CENTRAL_IDENTIFIER_FIELDS
)
def validate_identity_payload(
payload: Mapping[str, Any],
*,
graph: CanonicalIdentityGraph | None = None,
) -> IdentityValidationResult:
"""Validate identity fields in an MCP or central payload."""
identity_graph = graph or build_identity_graph()
issues: list[IdentityValidationIssue] = []
accepted_aliases: list[str] = []
canonical_ids: list[str] = []
for field_name, value in _iter_payload_items(payload):
if not _field_is_identity_relevant(field_name):
continue
base = _base_field_name(field_name)
record = identity_graph.record_for(value)
if record is None:
issues.append(
IdentityValidationIssue(
field_name=field_name,
value=value,
severity=IdentityIssueSeverity.BLOCKER,
message="identificador nao reconhecido no grafo canonico",
required_action="registrar alias institucional ou corrigir payload antes de publicar",
)
)
continue
canonical_ids.append(record.canonical_project_id)
alias = record.alias_for(value)
if alias and not alias.canonical:
accepted_aliases.append(alias.identifier)
if base in CANONICAL_REQUIRED_FIELDS and value != record.canonical_project_id:
issues.append(
IdentityValidationIssue(
field_name=field_name,
value=value,
severity=IdentityIssueSeverity.WARNING,
message="campo canonico recebeu alias aceito; reescrita recomendada antes de publicar estado novo",
canonical_project_id=record.canonical_project_id,
required_action="usar canonical_project_id no owner/canonical field e manter alias apenas como compatibilidade",
)
)
elif alias and not alias.canonical:
issues.append(
IdentityValidationIssue(
field_name=field_name,
value=value,
severity=IdentityIssueSeverity.INFO,
message="alias aceito por politica de compatibilidade",
canonical_project_id=record.canonical_project_id,
required_action=alias.required_action,
)
)
blockers = tuple(issue for issue in issues if issue.severity == IdentityIssueSeverity.BLOCKER)
return IdentityValidationResult(
ok=not blockers,
canonical_project_ids=merge_unique(canonical_ids),
accepted_aliases=merge_unique(accepted_aliases),
issues=tuple(issues),
)
def identity_graph_payload(graph: CanonicalIdentityGraph, *, limit_cases: int = 120) -> dict[str, Any]:
"""Return a compact JSON-safe identity graph payload."""
return {
"graphId": graph.graph_id,
"generatedAt": graph.generated_at,
"recordsCount": graph.records_count,
"aliasesCount": graph.aliases_count,
"acceptanceCasesCount": len(graph.acceptance_cases),
"acceptedCasesCount": graph.accepted_cases_count,
"blockedCasesCount": graph.blocked_cases_count,
"decisionSource": graph.decision_source,
"compatibilityRule": graph.compatibility_rule,
"controlPlaneId": MCP_CONTROL_PLANE_ID,
"maisHumanaCanonicalProjectId": CANONICAL_PROJECT_ID,
"maisHumanaCurrentProjectId": CURRENT_PROJECT_ID,
"maisHumanaLegacyAlias": LEGACY_PLATAFORM_ALIAS,
"maisHumanaCentralFolder": CENTRAL_FOLDER_NAME,
"records": [record.to_dict() for record in graph.records],
"acceptanceCasesSample": [case.to_dict() for case in graph.acceptance_cases[:limit_cases]],
}
def identity_graph_rows(graph: CanonicalIdentityGraph) -> list[list[str]]:
rows = [
[
"platform_id",
"canonical_project_id",
"current_project_id",
"central_folder",
"alias",
"alias_kind",
"alias_canonical",
"decision_status",
"migration_safe_now",
"required_action",
]
]
for record in graph.records:
for alias in record.aliases:
rows.append(
[
record.platform_id,
record.canonical_project_id,
record.current_project_id,
record.central_folder,
alias.identifier,
alias.kind.value,
"yes" if alias.canonical else "no",
record.decision_status,
"yes" if record.migration_safe_now else "no",
alias.required_action,
]
)
return rows
def identity_acceptance_rows(graph: CanonicalIdentityGraph) -> list[list[str]]:
rows = [
[
"case_id",
"platform_id",
"operation",
"permission_scope",
"field_name",
"candidate_value",
"canonical_project_id",
"accepted",
"status",
"required_action",
]
]
for case in graph.acceptance_cases:
rows.append(
[
case.case_id,
case.platform_id,
case.operation,
case.permission_scope,
case.field_name,
case.candidate_value,
case.canonical_project_id,
"yes" if case.accepted else "no",
case.status,
case.required_action,
]
)
return rows
def rows_to_csv(rows: Sequence[Sequence[str]]) -> str:
buffer = io.StringIO()
writer = csv.writer(buffer, lineterminator="\n")
writer.writerows(rows)
return buffer.getvalue()
def identity_graph_markdown(graph: CanonicalIdentityGraph) -> str:
lines = [
"# Canonical Identity Graph",
"",
f"- graph_id: `{graph.graph_id}`",
f"- generated_at: `{graph.generated_at}`",
f"- records: `{graph.records_count}`",
f"- aliases: `{graph.aliases_count}`",
f"- acceptance_cases: `{len(graph.acceptance_cases)}`",
f"- accepted_cases: `{graph.accepted_cases_count}`",
f"- blocked_cases: `{graph.blocked_cases_count}`",
f"- decision_source: `{graph.decision_source}`",
f"- control_plane: `{MCP_CONTROL_PLANE_ID}`",
"",
"## Regra canonica Mais Humana",
"",
f"- canonico: `{CANONICAL_PROJECT_ID}`",
f"- repo_local_historico: `{CURRENT_PROJECT_ID}`",
f"- alias_plataform: `{LEGACY_PLATAFORM_ALIAS}`",
f"- pasta_central: `{CENTRAL_FOLDER_NAME}`",
f"- regra: {CANONICAL_COMPATIBILITY_RULE}",
"",
"## Plataformas",
"",
]
for record in sorted(graph.records, key=lambda item: item.platform_id):
lines.extend(
[
f"### {record.platform_id}",
"",
f"- canonical_project_id: `{record.canonical_project_id}`",
f"- current_project_id: `{record.current_project_id}`",
f"- central_folder: `{record.central_folder}`",
f"- remote: `{record.expected_remote_url}`",
f"- migration_safe_now: `{record.migration_safe_now}`",
f"- aliases: `{', '.join(record.accepted_identifiers)}`",
"",
]
)
lines.extend(
[
"## MCP transit",
"",
"- Todo payload interplataforma deve manter origin, destination, tool, payload, actor, permission, result, traceId, auditId e timestamp.",
"- Campos ownerPlatformId/canonicalProjectId devem usar o canonical_project_id; aliases sao aceitos apenas como compatibilidade rastreavel.",
"- Bypass direto da plataforma permanece bloqueado: a administracao passa pelo MCPs Internos.",
]
)
return "\n".join(lines).strip() + "\n"
def identity_generated_records(project_root: Path, central_platform_folder: Path | None = None) -> tuple[GeneratedFile, ...]:
relation = "0035_EXECUTIVA__reconciliar-nome-canonico-real-alias-platform"
paths = (
("dados/canonical-identity-graph.json", "Grafo canonico de identidades e aliases.", "canonical identity graph", "json"),
("matrizes/canonical-identity-graph.csv", "Matriz de aliases por plataforma.", "canonical identity matrix", "csv"),
("matrizes/canonical-identity-acceptance-cases.csv", "Casos MCP de aceitacao de aliases.", "canonical identity acceptance", "csv"),
("ecossistema/CANONICAL-IDENTITY-GRAPH.md", "Relatorio humano do grafo canonico.", "canonical identity report", "markdown"),
)
records = [
GeneratedFile(
path=str(project_root / relative),
description=description,
function=function,
file_type=file_type,
changed_by="mais_humana.canonical_identity",
change_summary="Criado ou atualizado grafo canonico de nomes, aliases e casos MCP.",
relation_to_order=relation,
)
for relative, description, function, file_type in paths
]
if central_platform_folder is not None:
records.append(
GeneratedFile(
path=str(central_platform_folder / "reports" / "EXECUTADO__canonical-identity-graph.md"),
description="Copia central do grafo canonico de identidade.",
function="canonical identity central report",
file_type="markdown",
changed_by="mais_humana.canonical_identity",
change_summary="Registrada decisao canonica -platform e aliases no dossie central.",
relation_to_order=relation,
)
)
return tuple(records)
def write_identity_graph_artifacts(
graph: CanonicalIdentityGraph,
project_root: Path,
*,
central_platform_folder: Path | None = None,
) -> tuple[GeneratedFile, ...]:
targets: list[tuple[Path, str]] = [
(project_root / "dados" / "canonical-identity-graph.json", json.dumps(identity_graph_payload(graph), ensure_ascii=False, indent=2, sort_keys=True)),
(project_root / "matrizes" / "canonical-identity-graph.csv", rows_to_csv(identity_graph_rows(graph))),
(project_root / "matrizes" / "canonical-identity-acceptance-cases.csv", rows_to_csv(identity_acceptance_rows(graph))),
(project_root / "ecossistema" / "CANONICAL-IDENTITY-GRAPH.md", identity_graph_markdown(graph)),
]
records = list(identity_generated_records(project_root, central_platform_folder))
central_failures: list[dict[str, str]] = []
if central_platform_folder is not None:
targets.append((central_platform_folder / "reports" / "EXECUTADO__canonical-identity-graph.md", identity_graph_markdown(graph)))
for path, content in targets:
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
except OSError as exc:
if central_platform_folder is not None and central_platform_folder in path.parents:
central_failures.append({"path": str(path), "error": f"{type(exc).__name__}: {exc}"})
continue
raise
if central_failures:
status_path = project_root / "dados" / "canonical-identity-central-write-status.json"
status_path.write_text(
json.dumps(
{
"generatedAt": utc_now(),
"centralPlatformFolder": str(central_platform_folder),
"ok": False,
"failures": central_failures,
},
ensure_ascii=False,
indent=2,
sort_keys=True,
),
encoding="utf-8",
)
records.append(
GeneratedFile(
path=str(status_path),
description="Status da escrita central do grafo canonico.",
function="canonical identity central write status",
file_type="json",
changed_by="mais_humana.canonical_identity",
change_summary="Registrada falha de escrita central sem abortar artefatos do projeto real.",
relation_to_order="0034_EXECUTIVA__corrigir-acl-escrita-central-e-sql-semantico-plataforma-15",
)
)
return tuple(records)
def run_canonical_identity_graph(
*,
project_root: Path,
central_platform_folder: Path | None = None,
use_generated: bool = True,
) -> tuple[CanonicalIdentityGraph, tuple[GeneratedFile, ...]]:
graph = build_identity_graph(use_generated=use_generated)
records = write_identity_graph_artifacts(graph, project_root, central_platform_folder=central_platform_folder)
return graph, records

View File

@@ -9,6 +9,7 @@ from pathlib import Path
from .models import as_plain_data
from .central_consolidation import run_consolidated_report
from .central_materialization import run_central_materialization
from .canonical_identity import identity_graph_payload, run_canonical_identity_graph
from .matrix import build_global_recommendations, build_matrix, build_platform_reports
from .mcp_contract import build_mcp_contract_report, build_mcp_execute_probe, mcp_provider_compact_json, mcp_provider_payload
from .mcp_contract import (
@@ -145,6 +146,13 @@ def build_parser() -> argparse.ArgumentParser:
default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects/15_repo_tudo-para-ia-mais-humana-platform",
)
central_materialization.add_argument("--overwrite", action="store_true")
canonical_identity = sub.add_parser("canonical-identity", help="Write canonical identity graph and MCP alias acceptance artifacts.")
canonical_identity.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
canonical_identity.add_argument(
"--central-platform-folder",
default="G:/_codex-git/nucleo-gestao-operacional/central-de-ordem-de-servico/projects/15_repo_tudo-para-ia-mais-humana-platform",
)
canonical_identity.add_argument("--no-generated", action="store_true", help="Build graph from runtime targets instead of generated registry.")
return parser

View File

@@ -0,0 +1,171 @@
"""Generate the canonical identity registry for Mais Humana.
The output is importable Python source. It lets tests, CLI commands, MCP
providers, and central reports validate aliases without rescanning the
workspace or parsing Markdown during runtime.
"""
from __future__ import annotations
from pathlib import Path
from typing import Iterable
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
OUTPUT = SRC / "mais_humana" / "generated_canonical_identity_registry.py"
def ensure_import_path() -> None:
import sys
src = str(SRC)
if src not in sys.path:
sys.path.insert(0, src)
def q(value: object) -> str:
return repr(str(value))
def bool_literal(value: bool) -> str:
return "True" if value else "False"
def tuple_literal(values: Iterable[object], *, indent: int = 8) -> str:
cleaned = [str(value) for value in values if str(value).strip()]
if not cleaned:
return "()"
if len(cleaned) <= 8:
return "(" + ", ".join(q(value) for value in cleaned) + ("," if len(cleaned) == 1 else "") + ")"
pad = " " * indent
lines = ["("]
for value in cleaned:
lines.append(f"{pad}{q(value)},")
lines.append(" " * (indent - 4) + ")")
return "\n".join(lines)
def alias_block(alias: object, *, indent: int = 12) -> str:
return "\n".join(
[
" " * indent + "CanonicalAlias(",
" " * (indent + 4) + f"identifier={q(getattr(alias, 'identifier'))},",
" " * (indent + 4) + f"kind=CanonicalAliasKind.{getattr(alias, 'kind').name},",
" " * (indent + 4) + f"accepted={bool_literal(getattr(alias, 'accepted'))},",
" " * (indent + 4) + f"canonical={bool_literal(getattr(alias, 'canonical'))},",
" " * (indent + 4) + f"reason={q(getattr(alias, 'reason'))},",
" " * (indent + 4) + f"required_action={q(getattr(alias, 'required_action'))},",
" " * indent + "),",
]
)
def record_block(record: object) -> str:
alias_lines = [" aliases=("]
alias_lines.extend(alias_block(alias, indent=12) for alias in getattr(record, "aliases"))
alias_lines.append(" ),")
return "\n".join(
[
" CanonicalIdentityRecord(",
f" platform_id={q(getattr(record, 'platform_id'))},",
f" canonical_project_id={q(getattr(record, 'canonical_project_id'))},",
f" current_project_id={q(getattr(record, 'current_project_id'))},",
f" central_folder={q(getattr(record, 'central_folder'))},",
f" gitea_repo={q(getattr(record, 'gitea_repo'))},",
f" expected_remote_url={q(getattr(record, 'expected_remote_url'))},",
f" owner_platform_id={q(getattr(record, 'owner_platform_id'))},",
*alias_lines,
f" decision_status={q(getattr(record, 'decision_status'))},",
f" decision_source={q(getattr(record, 'decision_source'))},",
f" compatibility_rule={q(getattr(record, 'compatibility_rule'))},",
f" migration_safe_now={bool_literal(getattr(record, 'migration_safe_now'))},",
f" notes={tuple_literal(getattr(record, 'notes'), indent=12)},",
" ),",
]
)
def case_block(case: object) -> str:
return "\n".join(
[
" CanonicalIdentityAcceptanceCase(",
f" case_id={q(getattr(case, 'case_id'))},",
f" platform_id={q(getattr(case, 'platform_id'))},",
f" operation={q(getattr(case, 'operation'))},",
f" permission_scope={q(getattr(case, 'permission_scope'))},",
f" field_name={q(getattr(case, 'field_name'))},",
f" candidate_value={q(getattr(case, 'candidate_value'))},",
f" canonical_project_id={q(getattr(case, 'canonical_project_id'))},",
f" accepted={bool_literal(getattr(case, 'accepted'))},",
f" status={q(getattr(case, 'status'))},",
f" decision_reason={q(getattr(case, 'decision_reason'))},",
f" required_action={q(getattr(case, 'required_action'))},",
f" mcp_transit_required={bool_literal(getattr(case, 'mcp_transit_required'))},",
f" direct_platform_bypass_blocked={bool_literal(getattr(case, 'direct_platform_bypass_blocked'))},",
" ),",
]
)
def main() -> int:
ensure_import_path()
from mais_humana.canonical_identity import build_acceptance_cases, build_identity_records
records = build_identity_records()
cases = build_acceptance_cases(records)
lines = [
'"""Generated canonical identity registry for Mais Humana.',
"",
"Do not edit this file by hand. Regenerate it with:",
"",
" python tools/generate_canonical_identity_registry.py",
"",
"The registry is source code so it can be imported, tested, packaged,",
"hashed, and exposed through MCP without parsing Markdown at runtime.",
'"""',
"",
"from __future__ import annotations",
"",
"from .canonical_identity import (",
" CanonicalAlias,",
" CanonicalAliasKind,",
" CanonicalIdentityAcceptanceCase,",
" CanonicalIdentityRecord,",
")",
"",
f"GENERATED_RECORDS_COUNT = {len(records)}",
f"GENERATED_ACCEPTANCE_CASES_COUNT = {len(cases)}",
"",
"RECORDS = (",
]
lines.extend(record_block(record) for record in records)
lines.extend(
[
")",
"",
"ACCEPTANCE_CASES = (",
]
)
lines.extend(case_block(case) for case in cases)
lines.extend(
[
")",
"",
"",
"def iter_records():",
" return RECORDS",
"",
"",
"def iter_acceptance_cases():",
" return ACCEPTANCE_CASES",
"",
]
)
OUTPUT.write_text("\n".join(lines), encoding="utf-8")
print(f"generated {OUTPUT} records={len(records)} cases={len(cases)} lines={len(lines)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())