From 860212465728fa392bd06ea49f4bb57d548447ec Mon Sep 17 00:00:00 2001 From: codex-server Date: Mon, 4 May 2026 12:58:36 -0300 Subject: [PATCH] auto-sync: tudo-para-ia-mais-humana 2026-05-04 12:58:35 --- src/mais_humana/institutional_decisions.py | 1397 ++++++++++++++++++++ 1 file changed, 1397 insertions(+) create mode 100644 src/mais_humana/institutional_decisions.py diff --git a/src/mais_humana/institutional_decisions.py b/src/mais_humana/institutional_decisions.py new file mode 100644 index 0000000..b596111 --- /dev/null +++ b/src/mais_humana/institutional_decisions.py @@ -0,0 +1,1397 @@ +"""Executable registry for the six institutional ecosystem decisions. + +The router made six management decisions operational: Docs is a full +knowledge platform, platform-to-platform acceptance goes through MCP, evidence +retention defaults to ten years, live sensitive purge is forbidden, platform +names end in ``platform``, and GPT/Codex may execute broadly in development +when the action is reversible, audited, and does not leak secrets. + +This module turns those decisions into deterministic code, acceptance cases, +validators, Markdown/CSV/JSON artifacts, and semantic file records. It is +intentionally independent from live Cloudflare access; deploy and external +effects remain outside the local policy registry. +""" + +from __future__ import annotations + +import csv +import io +import json +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Any, Iterable, Mapping, Sequence + +from .human_rulebook import MCP_CONTROL_PLANE_ID, MCP_TRANSIT_FIELDS +from .models import GeneratedFile, as_plain_data, merge_unique, utc_now +from .redaction import SECRET_PATTERNS, redact_sensitive_text +from .repository_mesh import stable_digest +from .storage import connect, upsert_files + + +class DecisionId(str, Enum): + """Identifiers for the institutional decisions.""" + + DOCS_FULL_KNOWLEDGE = "docs_full_knowledge" + MCP_ACCEPTANCE = "mcp_acceptance" + TEN_YEAR_RETENTION = "ten_year_retention" + LIVE_SENSITIVE_PURGE_FORBIDDEN = "live_sensitive_purge_forbidden" + CANONICAL_PLATFORM_NAMES = "canonical_platform_names" + DEV_EXECUTION_AUTHORITY = "dev_execution_authority" + + +class DecisionStatus(str, Enum): + """Lifecycle status of one decision in this registry.""" + + INSTITUTIONALIZED = "institutionalized" + PARTIAL = "partial" + BLOCKED = "blocked" + + +class DecisionActor(str, Enum): + """Actors that need explicit acceptance cases.""" + + GPT = "gpt" + CODEX = "codex" + MCP_GATEWAY = "mcp_gateway" + ADMIN_UI = "admin_ui" + HUMAN_ADMIN = "human_admin" + SUPPORT = "support" + RELEASE_MANAGER = "release_manager" + SECURITY_AUDITOR = "security_auditor" + + +class DecisionCaseStatus(str, Enum): + """Operational expectation for one acceptance case.""" + + REQUIRED = "required" + ALLOWED_WITH_GATES = "allowed_with_gates" + BLOCKED_IF_MISSING = "blocked_if_missing" + FORBIDDEN = "forbidden" + + +class DecisionFindingSeverity(str, Enum): + """Severity used by validators.""" + + INFO = "info" + WARNING = "warning" + BLOCKER = "blocker" + + +class DocsSufficiencyState(str, Enum): + """Answer state required from Docs as a full knowledge platform.""" + + FOUND = "encontrado" + PARTIAL = "parcial" + NOT_FOUND = "nao_encontrado" + CONFLICTING = "conflitante" + + +class RetentionDecisionKind(str, Enum): + """Retention classification for an artifact or record.""" + + TEN_YEAR_EVIDENCE = "ten_year_evidence" + SHORT_LIVED_OPERATIONAL = "short_lived_operational" + LEGAL_HOLD = "legal_hold" + LIVE_SENSITIVE_RETAINED = "live_sensitive_retained" + + +class CleanupDecisionKind(str, Enum): + """Cleanup/purge vocabulary enforced by the router.""" + + OPERATIONAL_CLEANUP = "operational_cleanup" + PURGE_FORBIDDEN = "purge_forbidden" + RETAIN_WITH_EVIDENCE = "retain_with_evidence" + OWNER_REVIEW = "owner_review" + + +class DevelopmentActionDecision(str, Enum): + """Policy classification for GPT/Codex development actions.""" + + ALLOWED = "allowed" + ALLOWED_DRY_RUN = "allowed_dry_run" + APPROVAL_REQUIRED = "approval_required" + BLOCKED = "blocked" + + +CANONICAL_DECISION_SOURCE = "000-ROTEADOR-PERMANENTE-DE-ORDEM_DE_SERVICO" +CONTROL_PLANE_ID = MCP_CONTROL_PLANE_ID +DOCS_PLATFORM_ID = "tudo-para-ia-docs-plataform" +DOCS_CANONICAL_TARGET_ID = "tudo-para-ia-docs-platform" +BIBLIOTECA_OWNER_PLATFORM_ID = "tudo-para-ia-integracoes-platform" +BIBLIOTECA_LEGACY_IDS = ( + "tudo-para-ia-biblioteca-privada", + "tudo-para-ia-biblioteca-privada-ia-plataform", +) + +AFFECTED_PLATFORMS: tuple[str, ...] = ( + "tudo-para-ia-docs-plataform", + "tudo-para-ia-mcps-internos-plataform", + "tudo-para-ia-ui-platform", + "tudo-para-ia-integracoes-plataform", + "tudo-para-ia-customer-ops-platform", + "tudo-para-ia-mais-humana-platform", + "tudo-para-ia-identity-platform", + "tudo-para-ia-business-platform", +) + +COMMON_EVIDENCE_FIELDS: tuple[str, ...] = ( + "sourceHash", + "sourcePayloadHash", + "sourceRecordsHash", + "evidenceId", + "traceId", + "auditId", + "timestamp", + "truthState", +) + +TEN_YEAR_SUBJECTS: tuple[str, ...] = ( + "documentacao", + "contrato", + "aceite", + "evidencia", + "auditoria", + "execucao", + "pendencia", + "hash", + "decisao_institucional", + "rollback", +) + +SHORT_LIVED_SUBJECTS: tuple[str, ...] = ( + "cache", + "build", + "dist", + "temporario", + "log_bruto_repetido", + "node_modules", + "coverage", + "snapshot_descartavel", +) + +OPERATIONAL_CLEANUP_TERMS: tuple[str, ...] = ( + ".test-tmp", + "cache", + "build", + "dist", + "node_modules", + "coverage", + "logs repetidos", + "artifacts temporarios", + "saidas intermediarias", +) + +SENSITIVE_LIVE_TERMS: tuple[str, ...] = ( + "dado vivo sensivel", + "cliente real", + "segredo", + "evidencia unica", + "historico", + "base live", +) + +DESTRUCTIVE_ACTION_TERMS: tuple[str, ...] = ( + "destroy", + "drop database", + "delete history", + "apagar historico", + "apagar evidencia", + "expurgar dado real", + "cobrar cliente real", + "enviar mensagem real", +) + +DEV_ALLOWED_ACTION_TERMS: tuple[str, ...] = ( + "simular usuario", + "simular organizacao", + "simular tenant", + "simular compra", + "simular credencial", + "simular sessao", + "simular admin", + "simular suporte", + "simular painel", + "rodar teste", + "criar massa de teste", + "validar fluxo", +) + + +@dataclass(frozen=True, slots=True) +class InstitutionalDecision: + """One institutional decision with operational controls.""" + + decision_id: DecisionId + title: str + directive: str + status: DecisionStatus + owner_platform_id: str + affected_platforms: tuple[str, ...] + required_controls: tuple[str, ...] + forbidden_actions: tuple[str, ...] + evidence_fields: tuple[str, ...] + output_obligations: tuple[str, ...] + ready_criteria: tuple[str, ...] + source_order: str = CANONICAL_DECISION_SOURCE + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class DecisionAcceptanceCase: + """One deterministic case for validating a decision across actors.""" + + case_id: str + decision_id: DecisionId + platform_id: str + actor: DecisionActor + operation: str + topic: str + status: DecisionCaseStatus + required_fields: tuple[str, ...] + expected_truth_state: str + mcp_required: bool + retention_years: int + legal_hold_blocks_disposal: bool + dry_run_required: bool + rollback_required: bool + direct_bypass_blocked: bool + reason: str + next_action: str + evidence_id: str + source_hash: str + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class DecisionFinding: + """One validator finding.""" + + finding_id: str + severity: DecisionFindingSeverity + decision_id: DecisionId | str + field: str + message: str + next_action: str + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class DocsSufficiencyResult: + """Response sufficiency produced when Docs is queried.""" + + state: DocsSufficiencyState + answer_allowed: bool + pending_required: bool + reason: str + required_next_action: str + source_hash: str + evidence_id: str + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class RetentionDecision: + """Retention decision for one subject.""" + + subject: str + decision: RetentionDecisionKind + retention_years: int + legal_hold: bool + disposal_allowed: bool + rollback_required: bool + reason: str + required_evidence: tuple[str, ...] + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class CleanupDecision: + """Classification for workspace cleanup versus forbidden purge.""" + + target: str + decision: CleanupDecisionKind + term_to_use: str + allowed: bool + requires_inventory: bool + reason: str + next_action: str + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class PlatformNameDecision: + """Canonical naming decision for a platform/app/module identifier.""" + + identifier: str + canonical_identifier: str + accepted: bool + platform_autonomous: bool + owner_platform_id: str + reason: str + required_action: str + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class DevelopmentActionClassification: + """Classification for GPT/Codex development execution.""" + + action: str + decision: DevelopmentActionDecision + allowed: bool + requires_approval: bool + required_truth_state: str + dry_run_required: bool + rollback_required: bool + reason: str + required_evidence: tuple[str, ...] + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class InstitutionalDecisionRegistry: + """Full registry of decisions and acceptance cases.""" + + registry_id: str + generated_at: str + source_order: str + control_plane_id: str + decisions: tuple[InstitutionalDecision, ...] + acceptance_cases: tuple[DecisionAcceptanceCase, ...] + findings: tuple[DecisionFinding, ...] + summary: tuple[str, ...] + + @property + def decisions_count(self) -> int: + return len(self.decisions) + + @property + def cases_count(self) -> int: + return len(self.acceptance_cases) + + @property + def blocker_count(self) -> int: + return sum(1 for finding in self.findings if finding.severity == DecisionFindingSeverity.BLOCKER) + + @property + def passed(self) -> bool: + return self.blocker_count == 0 + + def to_dict(self) -> dict[str, Any]: + data = as_plain_data(self) + data["decisions_count"] = self.decisions_count + data["cases_count"] = self.cases_count + data["blocker_count"] = self.blocker_count + data["passed"] = self.passed + return data + + +def build_institutional_decisions() -> tuple[InstitutionalDecision, ...]: + """Return the six operational decisions as structured policy.""" + + return ( + InstitutionalDecision( + decision_id=DecisionId.DOCS_FULL_KNOWLEDGE, + title="Docs pleno como plataforma de conhecimento operacional", + directive="Docs deve operar como plataforma responseReady, com busca, leitura ampla governada e pendencia quando a resposta nao existir.", + status=DecisionStatus.INSTITUTIONALIZED, + owner_platform_id=DOCS_PLATFORM_ID, + affected_platforms=AFFECTED_PLATFORMS, + required_controls=( + "leitura responseReady ampla", + "busca semantica", + "banco semantico proprio", + "resposta encontrado/parcial/nao_encontrado/conflitante", + "registro de fonte, versao, hash, data, plataforma, tema e autoridade", + ), + forbidden_actions=( + "tratar Docs como catalogOnly permanente", + "responder sem fonte e sem hash quando a pergunta exige prova", + "silenciar lacuna documental", + ), + evidence_fields=COMMON_EVIDENCE_FIELDS + ("documentAuthority", "documentVersion", "sufficiencyState"), + output_obligations=( + "registrar lacuna documental automaticamente", + "expor fonte institucional para GPT, MCP e UI", + "separar documento publico, interno, sensivel e restrito", + ), + ready_criteria=( + "consulta retorna estado de suficiencia", + "resposta inexistente gera pendencia", + "fonte possui hash e autoridade", + ), + ), + InstitutionalDecision( + decision_id=DecisionId.MCP_ACCEPTANCE, + title="Aceite interplataforma obrigatorio pelo MCP", + directive="Qualquer circulacao administrativa entre plataformas segue plataforma -> MCP -> plataforma.", + status=DecisionStatus.INSTITUTIONALIZED, + owner_platform_id=CONTROL_PLANE_ID, + affected_platforms=AFFECTED_PLATFORMS, + required_controls=( + "validar origem", + "validar contrato", + "validar hash", + "gerar etiqueta de entrada", + "preservar rastreabilidade", + "gerar etiqueta de saida", + "validar permissao do consumidor", + "registrar evidencia de aceite", + ), + forbidden_actions=( + "aceite puramente bilateral em fluxo administrativo externo", + "bypass direto sem traceId e auditId", + "payload com segredo bruto", + ), + evidence_fields=MCP_TRANSIT_FIELDS + COMMON_EVIDENCE_FIELDS + ("consumerPermission", "acceptanceTag"), + output_obligations=( + "registrar aceite por evidenceId", + "separar cache/fila/read model como excecao formal", + "manter permissionScope do consumidor", + ), + ready_criteria=( + "MCP descobre e valida a capacidade", + "traceId/auditId estao presentes", + "hash de payload e hash de registros batem com a evidencia", + ), + ), + InstitutionalDecision( + decision_id=DecisionId.TEN_YEAR_RETENTION, + title="Retencao institucional padrao de 10 anos", + directive="Evidencia, decisao, contrato, aceite, auditoria e documentacao relevante ficam retidos por 10 anos.", + status=DecisionStatus.INSTITUTIONALIZED, + owner_platform_id="nucleo-gestao-operacional", + affected_platforms=AFFECTED_PLATFORMS, + required_controls=( + "classificar evidencia unica", + "registrar legal hold", + "exigir rollback para operacao sensivel", + "exigir evidencia antes/depois", + "diferenciar cache e temporarios de prova institucional", + ), + forbidden_actions=( + "descartar evidencia unica antes do prazo", + "alterar contrato sensivel sem rollback", + "ignorar legal hold", + ), + evidence_fields=COMMON_EVIDENCE_FIELDS + ("retentionYears", "legalHold", "rollbackPlan", "beforeAfterEvidence"), + output_obligations=( + "marcar prazo de retencao", + "registrar legal hold quando houver", + "registrar rollback e prova antes/depois para operacao sensivel", + ), + ready_criteria=( + "retentionYears=10 para prova institucional", + "legalHold bloqueia descarte", + "rollback existe antes da operacao sensivel", + ), + ), + InstitutionalDecision( + decision_id=DecisionId.LIVE_SENSITIVE_PURGE_FORBIDDEN, + title="Expurgo de dado vivo sensivel vetado", + directive="Expurgo de dado vivo sensivel fica vetado; limpeza operacional usa vocabulario proprio e inventario.", + status=DecisionStatus.INSTITUTIONALIZED, + owner_platform_id="nucleo-gestao-operacional", + affected_platforms=AFFECTED_PLATFORMS, + required_controls=( + "classificar limpeza operacional", + "inventariar peso de projeto inchado", + "segregar artefatos descartaveis", + "preservar historico essencial", + "registrar rematerializacao limpa quando necessaria", + ), + forbidden_actions=( + "chamar limpeza operacional de expurgo", + "apagar dado vivo sensivel", + "apagar evidencia unica", + ), + evidence_fields=COMMON_EVIDENCE_FIELDS + ("cleanupClass", "weightInventory", "retainedEvidence"), + output_obligations=( + "usar limpeza operacional/higienizacao/rematerializacao", + "registrar caminhos descartaveis", + "registrar o que permanece por evidencia", + ), + ready_criteria=( + "dado vivo sensivel nunca marcado como expurgavel", + "cache/build/dist/node_modules sao limpeza operacional", + "inventario de peso existe para projeto inchado", + ), + ), + InstitutionalDecision( + decision_id=DecisionId.CANONICAL_PLATFORM_NAMES, + title="Nomes canonicos terminados em platform", + directive="Plataformas comuns terminam em platform; Biblioteca Privada nao e plataforma autonoma.", + status=DecisionStatus.INSTITUTIONALIZED, + owner_platform_id="nucleo-gestao-operacional", + affected_platforms=AFFECTED_PLATFORMS + BIBLIOTECA_LEGACY_IDS, + required_controls=( + "registrar alias plataform como legado temporario", + "normalizar ownerPlatformId para platform", + "reposicionar Biblioteca Privada como app/produto/modulo de Integracoes", + "bloquear repositorio duplicado por variante de nome", + ), + forbidden_actions=( + "criar ownerPlatformId proprio para Biblioteca Privada", + "tratar sufixo plataform como canonico novo", + "duplicar repositorio por rename sem janela", + ), + evidence_fields=COMMON_EVIDENCE_FIELDS + ("canonicalPlatformId", "legacyAlias", "ownerPlatformId"), + output_obligations=( + "registrar aliases e mapa canonico", + "atualizar matrizes/readiness/ordens", + "manter janela segura para rename fisico", + ), + ready_criteria=( + "id canonico termina com platform", + "Biblioteca Privada aponta para Integracoes", + "alias legado nao cria plataforma autonoma", + ), + ), + InstitutionalDecision( + decision_id=DecisionId.DEV_EXECUTION_AUTHORITY, + title="GPT e Codex com execucao ampla em desenvolvimento", + directive="Em desenvolvimento, GPT e Codex podem executar e simular amplamente com auditoria, truthState e sem efeito externo real.", + status=DecisionStatus.INSTITUTIONALIZED, + owner_platform_id="nucleo-gestao-operacional", + affected_platforms=AFFECTED_PLATFORMS, + required_controls=( + "truthState", + "actorId", + "auditId", + "dryRun quando aplicavel", + "marcacao real/simulado/teste/parcial/stale/bloqueado/derivado", + "bloqueio de efeito externo real sem autorizacao", + ), + forbidden_actions=( + "destruir banco", + "apagar historico", + "apagar evidencia", + "vazar segredo", + "alterar producao sem rollback", + "cobrar cliente real", + "enviar mensagem real a terceiro", + ), + evidence_fields=COMMON_EVIDENCE_FIELDS + ("actorId", "dryRun", "executionMode", "externalEffect"), + output_obligations=( + "marcar execucao simulada", + "registrar auditId", + "bloquear acao irreversivel", + ), + ready_criteria=( + "acao reversivel ou dryRun", + "sem segredo bruto", + "sem efeito externo real sem autorizacao explicita", + ), + ), + ) + + +def _decision_by_id(decisions: Sequence[InstitutionalDecision]) -> dict[DecisionId, InstitutionalDecision]: + return {decision.decision_id: decision for decision in decisions} + + +def _operations_for(decision_id: DecisionId) -> tuple[str, ...]: + return { + DecisionId.DOCS_FULL_KNOWLEDGE: ( + "indexar_documento", + "buscar_semanticamente", + "responder_suficiencia", + "registrar_lacuna", + "expor_fonte_para_mcp_ui", + ), + DecisionId.MCP_ACCEPTANCE: ( + "validar_origem", + "validar_contrato_hash", + "gerar_etiqueta_entrada", + "gerar_etiqueta_saida", + "registrar_evidencia_aceite", + ), + DecisionId.TEN_YEAR_RETENTION: ( + "classificar_evidencia", + "reter_dez_anos", + "aplicar_legal_hold", + "exigir_rollback", + "registrar_antes_depois", + ), + DecisionId.LIVE_SENSITIVE_PURGE_FORBIDDEN: ( + "classificar_limpeza_operacional", + "bloquear_expurgo_vivo", + "inventariar_peso", + "segregar_artefatos", + "rematerializar_limpo", + ), + DecisionId.CANONICAL_PLATFORM_NAMES: ( + "validar_sufixo_platform", + "resolver_alias_plataform", + "bloquear_biblioteca_autonoma", + "normalizar_owner_platform", + "registrar_app_produto_modulo", + ), + DecisionId.DEV_EXECUTION_AUTHORITY: ( + "simular_usuario_tenant", + "simular_admin_suporte", + "executar_teste_massa", + "executar_dryrun_reversivel", + "bloquear_efeito_externo", + ), + }[decision_id] + + +def _case_status(decision_id: DecisionId, operation: str) -> DecisionCaseStatus: + if decision_id == DecisionId.LIVE_SENSITIVE_PURGE_FORBIDDEN and "bloquear_expurgo" in operation: + return DecisionCaseStatus.FORBIDDEN + if decision_id == DecisionId.DEV_EXECUTION_AUTHORITY and "bloquear_efeito_externo" in operation: + return DecisionCaseStatus.BLOCKED_IF_MISSING + if decision_id == DecisionId.DEV_EXECUTION_AUTHORITY: + return DecisionCaseStatus.ALLOWED_WITH_GATES + return DecisionCaseStatus.REQUIRED + + +def _required_fields_for(decision: InstitutionalDecision, operation: str) -> tuple[str, ...]: + extras: tuple[str, ...] = () + if decision.decision_id == DecisionId.DOCS_FULL_KNOWLEDGE: + extras = ("documentId", "documentAuthority", "sufficiencyState") + elif decision.decision_id == DecisionId.MCP_ACCEPTANCE: + extras = ("origin", "destination", "consumerPermission", "acceptanceTag") + elif decision.decision_id == DecisionId.TEN_YEAR_RETENTION: + extras = ("retentionYears", "legalHold", "rollbackPlan") + elif decision.decision_id == DecisionId.LIVE_SENSITIVE_PURGE_FORBIDDEN: + extras = ("cleanupClass", "purgeBlocked", "weightInventory") + elif decision.decision_id == DecisionId.CANONICAL_PLATFORM_NAMES: + extras = ("canonicalPlatformId", "legacyAlias", "ownerPlatformId") + elif decision.decision_id == DecisionId.DEV_EXECUTION_AUTHORITY: + extras = ("actorId", "dryRun", "executionMode", "externalEffect") + return merge_unique((*decision.evidence_fields, *extras, operation)) + + +def _expected_truth_state(decision_id: DecisionId) -> str: + return { + DecisionId.DOCS_FULL_KNOWLEDGE: "response_ready", + DecisionId.MCP_ACCEPTANCE: "same_source_ready", + DecisionId.TEN_YEAR_RETENTION: "documented", + DecisionId.LIVE_SENSITIVE_PURGE_FORBIDDEN: "documented", + DecisionId.CANONICAL_PLATFORM_NAMES: "same_source_ready", + DecisionId.DEV_EXECUTION_AUTHORITY: "test_or_simulated", + }[decision_id] + + +def build_runtime_acceptance_cases(decisions: Sequence[InstitutionalDecision] | None = None) -> tuple[DecisionAcceptanceCase, ...]: + """Build acceptance cases directly from the policy matrix.""" + + source_decisions = tuple(decisions or build_institutional_decisions()) + by_id = _decision_by_id(source_decisions) + cases: list[DecisionAcceptanceCase] = [] + for decision_id in DecisionId: + decision = by_id[decision_id] + for platform_id in decision.affected_platforms: + for actor in DecisionActor: + for operation in _operations_for(decision_id): + status = _case_status(decision_id, operation) + seed = { + "decision": decision_id.value, + "platform": platform_id, + "actor": actor.value, + "operation": operation, + } + source_hash = stable_digest(seed, 64) + cases.append( + DecisionAcceptanceCase( + case_id=f"institutional-{stable_digest(seed, 24)}", + decision_id=decision_id, + platform_id=platform_id, + actor=actor, + operation=operation, + topic=decision.title, + status=status, + required_fields=_required_fields_for(decision, operation), + expected_truth_state=_expected_truth_state(decision_id), + mcp_required=True, + retention_years=10 if decision_id == DecisionId.TEN_YEAR_RETENTION else 0, + legal_hold_blocks_disposal=decision_id == DecisionId.TEN_YEAR_RETENTION, + dry_run_required=decision_id == DecisionId.DEV_EXECUTION_AUTHORITY and status != DecisionCaseStatus.FORBIDDEN, + rollback_required=decision_id in {DecisionId.TEN_YEAR_RETENTION, DecisionId.DEV_EXECUTION_AUTHORITY}, + direct_bypass_blocked=decision_id == DecisionId.MCP_ACCEPTANCE or platform_id != CONTROL_PLANE_ID, + reason=decision.directive, + next_action="validar campos, hashes, truthState e registrar pendencia quando faltar evidencia", + evidence_id=f"evidence-{source_hash[:24]}", + source_hash=source_hash, + ) + ) + return tuple(cases) + + +def build_acceptance_cases(*, use_generated: bool = True) -> tuple[DecisionAcceptanceCase, ...]: + """Return generated cases when available, otherwise build runtime cases.""" + + if use_generated: + try: + from .generated_institutional_decisions import iter_acceptance_cases + + return tuple(iter_acceptance_cases()) + except (ImportError, AttributeError): + pass + return build_runtime_acceptance_cases() + + +def _finding( + decision_id: DecisionId | str, + field: str, + message: str, + next_action: str, + *, + severity: DecisionFindingSeverity = DecisionFindingSeverity.BLOCKER, +) -> DecisionFinding: + seed = stable_digest({"decision": str(decision_id), "field": field, "message": message}, 16) + return DecisionFinding( + finding_id=f"institutional-finding-{seed}", + severity=severity, + decision_id=decision_id, + field=field, + message=message, + next_action=next_action, + ) + + +def _contains_secret(value: object) -> bool: + text = str(as_plain_data(value)) + return any(pattern.search(text) for _, pattern in SECRET_PATTERNS) + + +def validate_mcp_acceptance_envelope(envelope: Mapping[str, Any], *, control_plane_id: str = CONTROL_PLANE_ID) -> tuple[DecisionFinding, ...]: + """Validate the platform -> MCP -> platform envelope.""" + + findings: list[DecisionFinding] = [] + for field in MCP_TRANSIT_FIELDS: + if field not in envelope or envelope[field] in (None, "", [], {}): + findings.append( + _finding( + DecisionId.MCP_ACCEPTANCE, + field, + "campo obrigatorio do envelope MCP ausente ou vazio", + "preencher origin, destination, tool, payload, actor, permission, result, traceId, auditId e timestamp", + ) + ) + destination = str(envelope.get("destination", "")) + if destination and destination != control_plane_id: + findings.append( + _finding( + DecisionId.MCP_ACCEPTANCE, + "destination", + "destino nao aponta para MCPs Internos", + "rotear aceite administrativo pelo MCP antes de circular a outra plataforma", + ) + ) + if str(envelope.get("origin", "")) == destination: + findings.append( + _finding( + DecisionId.MCP_ACCEPTANCE, + "origin", + "origem e destino iguais reduzem rastreabilidade de aceite interplataforma", + "separar plataforma de origem, MCP e consumidor final", + severity=DecisionFindingSeverity.WARNING, + ) + ) + if _contains_secret(envelope): + findings.append( + _finding( + DecisionId.MCP_ACCEPTANCE, + "payload", + "envelope contem formato de segredo bruto", + "substituir por credentialRef, tokenRef ou secretRef antes de persistir evidencia", + ) + ) + return tuple(findings) + + +def docs_sufficiency_result( + *, + found_sources: int, + partial_sources: int = 0, + conflicts: int = 0, + authority_present: bool = True, +) -> DocsSufficiencyResult: + """Classify a Docs answer as found, partial, not found, or conflicting.""" + + seed = { + "found": found_sources, + "partial": partial_sources, + "conflicts": conflicts, + "authority": authority_present, + } + source_hash = stable_digest(seed, 64) + if conflicts > 0: + state = DocsSufficiencyState.CONFLICTING + reason = "fontes documentais divergem" + next_action = "abrir pendencia documental com fontes conflitantes e owner" + elif found_sources > 0 and authority_present: + state = DocsSufficiencyState.FOUND + reason = "fonte suficiente com autoridade e hash" + next_action = "responder com fonte, versao, hash, data, plataforma, tema e autoridade" + elif found_sources > 0 or partial_sources > 0: + state = DocsSufficiencyState.PARTIAL + reason = "fonte existe, mas autoridade ou cobertura ainda e parcial" + next_action = "responder como parcial e abrir complemento documental" + else: + state = DocsSufficiencyState.NOT_FOUND + reason = "nenhuma fonte institucional encontrada" + next_action = "abrir pendencia automatica para criar ou atualizar documentacao" + return DocsSufficiencyResult( + state=state, + answer_allowed=state in {DocsSufficiencyState.FOUND, DocsSufficiencyState.PARTIAL}, + pending_required=state != DocsSufficiencyState.FOUND, + reason=reason, + required_next_action=next_action, + source_hash=source_hash, + evidence_id=f"evidence-{source_hash[:24]}", + ) + + +def classify_retention_subject( + subject: str, + *, + is_unique_evidence: bool = False, + legal_hold: bool = False, + live_sensitive: bool = False, +) -> RetentionDecision: + """Classify retention for institutional records and short-lived artifacts.""" + + lowered = subject.lower() + required_evidence = ("sourceHash", "evidenceId", "traceId", "auditId", "timestamp") + if legal_hold: + return RetentionDecision( + subject=subject, + decision=RetentionDecisionKind.LEGAL_HOLD, + retention_years=10, + legal_hold=True, + disposal_allowed=False, + rollback_required=True, + reason="legal hold bloqueia descarte mesmo apos prazo operacional", + required_evidence=required_evidence + ("legalHoldId",), + ) + if live_sensitive: + return RetentionDecision( + subject=subject, + decision=RetentionDecisionKind.LIVE_SENSITIVE_RETAINED, + retention_years=10, + legal_hold=False, + disposal_allowed=False, + rollback_required=True, + reason="dado vivo sensivel nao pode ser expurgado pela politica atual", + required_evidence=required_evidence + ("sensitivityClass",), + ) + if is_unique_evidence or any(term in lowered for term in TEN_YEAR_SUBJECTS): + return RetentionDecision( + subject=subject, + decision=RetentionDecisionKind.TEN_YEAR_EVIDENCE, + retention_years=10, + legal_hold=False, + disposal_allowed=False, + rollback_required=True, + reason="prova institucional entra na retencao padrao de 10 anos", + required_evidence=required_evidence + ("retentionYears",), + ) + if any(term in lowered for term in SHORT_LIVED_SUBJECTS): + return RetentionDecision( + subject=subject, + decision=RetentionDecisionKind.SHORT_LIVED_OPERATIONAL, + retention_years=0, + legal_hold=False, + disposal_allowed=True, + rollback_required=False, + reason="artefato operacional reinstalavel ou temporario; nao e evidencia unica", + required_evidence=("classification", "cleanupLog"), + ) + return RetentionDecision( + subject=subject, + decision=RetentionDecisionKind.TEN_YEAR_EVIDENCE, + retention_years=10, + legal_hold=False, + disposal_allowed=False, + rollback_required=True, + reason="classificacao desconhecida fica retida ate avaliacao de owner", + required_evidence=required_evidence + ("ownerReview",), + ) + + +def classify_cleanup_target( + target: str, + *, + contains_live_sensitive: bool = False, + unique_evidence: bool = False, +) -> CleanupDecision: + """Classify cleanup vocabulary and forbid purge for live sensitive data.""" + + lowered = target.lower() + if contains_live_sensitive or any(term in lowered for term in SENSITIVE_LIVE_TERMS): + return CleanupDecision( + target=target, + decision=CleanupDecisionKind.PURGE_FORBIDDEN, + term_to_use="retencao governada", + allowed=False, + requires_inventory=True, + reason="expurgo de dado vivo sensivel esta vetado", + next_action="preservar evidencia, classificar sensibilidade e abrir ordem de governanca", + ) + if unique_evidence: + return CleanupDecision( + target=target, + decision=CleanupDecisionKind.RETAIN_WITH_EVIDENCE, + term_to_use="arquivamento de evidencia antiga", + allowed=False, + requires_inventory=True, + reason="artefato e evidencia unica e nao deve ser tratado como limpeza", + next_action="reter por 10 anos ou aplicar legal hold", + ) + if any(term in lowered for term in OPERATIONAL_CLEANUP_TERMS): + return CleanupDecision( + target=target, + decision=CleanupDecisionKind.OPERATIONAL_CLEANUP, + term_to_use="limpeza operacional", + allowed=True, + requires_inventory=False, + reason="artefato temporario, cache, build ou dependencia reinstalavel", + next_action="registrar limpeza e preservar logs de erro quando houver", + ) + return CleanupDecision( + target=target, + decision=CleanupDecisionKind.OWNER_REVIEW, + term_to_use="inventario de peso", + allowed=False, + requires_inventory=True, + reason="classificacao nao determinada com seguranca", + next_action="inventariar peso, separar evidencia e decidir rematerializacao limpa", + ) + + +def _platform_to_canonical(identifier: str) -> str: + text = identifier.strip().replace("\\", "/").rstrip("/") + name = text.split("/")[-1] + if name.endswith(".git"): + name = name[:-4] + if name in BIBLIOTECA_LEGACY_IDS: + return BIBLIOTECA_OWNER_PLATFORM_ID + if name.endswith("-plataform"): + return name[: -len("-plataform")] + "-platform" + if not name.endswith("-platform") and name.startswith("tudo-para-ia-"): + return f"{name}-platform" + return name + + +def classify_platform_identifier(identifier: str) -> PlatformNameDecision: + """Validate platform suffixes and Biblioteca Privada ownership.""" + + canonical = _platform_to_canonical(identifier) + name = identifier.strip().replace("\\", "/").rstrip("/").split("/")[-1] + if name in BIBLIOTECA_LEGACY_IDS: + return PlatformNameDecision( + identifier=identifier, + canonical_identifier=canonical, + accepted=True, + platform_autonomous=False, + owner_platform_id=BIBLIOTECA_OWNER_PLATFORM_ID, + reason="Biblioteca Privada e app/produto/modulo dentro de Integracoes, nao plataforma autonoma", + required_action="remover ownerPlatformId proprio e apontar para Integracoes", + ) + if canonical != name: + return PlatformNameDecision( + identifier=identifier, + canonical_identifier=canonical, + accepted=True, + platform_autonomous=True, + owner_platform_id=canonical, + reason="identificador aceito como alias legado ate migracao coordenada", + required_action="usar canonical_identifier em novos contratos e manter alias rastreavel", + ) + return PlatformNameDecision( + identifier=identifier, + canonical_identifier=canonical, + accepted=canonical.endswith("-platform"), + platform_autonomous=canonical.endswith("-platform"), + owner_platform_id=canonical, + reason="identificador ja segue sufixo canonico platform" if canonical.endswith("-platform") else "identificador nao e plataforma canonica", + required_action="nenhuma" if canonical.endswith("-platform") else "registrar como app/modulo ou corrigir sufixo", + ) + + +def classify_development_action( + action: str, + *, + destructive: bool = False, + exposes_secret: bool = False, + external_effect: bool = False, + production: bool = False, + rollback_available: bool = True, + dry_run: bool = True, +) -> DevelopmentActionClassification: + """Classify GPT/Codex execution in development.""" + + lowered = action.lower() + required_evidence = ("truthState", "actorId", "auditId", "traceId", "timestamp") + if exposes_secret or any(pattern.search(action) for _, pattern in SECRET_PATTERNS): + return DevelopmentActionClassification( + action=action, + decision=DevelopmentActionDecision.BLOCKED, + allowed=False, + requires_approval=True, + required_truth_state="blocked", + dry_run_required=True, + rollback_required=True, + reason="acao exporia segredo bruto", + required_evidence=required_evidence + ("redactionFinding",), + ) + if destructive or any(term in lowered for term in DESTRUCTIVE_ACTION_TERMS): + return DevelopmentActionClassification( + action=action, + decision=DevelopmentActionDecision.BLOCKED, + allowed=False, + requires_approval=True, + required_truth_state="blocked", + dry_run_required=True, + rollback_required=True, + reason="acao destrutiva ou irreversivel sem autorizacao explicita", + required_evidence=required_evidence + ("approvalId", "rollbackPlan"), + ) + if production or external_effect: + return DevelopmentActionClassification( + action=action, + decision=DevelopmentActionDecision.APPROVAL_REQUIRED, + allowed=False, + requires_approval=True, + required_truth_state="approval_required", + dry_run_required=True, + rollback_required=True, + reason="efeito externo real ou producao exige autorizacao explicita", + required_evidence=required_evidence + ("approvalId", "externalEffectClass"), + ) + if dry_run or any(term in lowered for term in DEV_ALLOWED_ACTION_TERMS): + return DevelopmentActionClassification( + action=action, + decision=DevelopmentActionDecision.ALLOWED_DRY_RUN if dry_run else DevelopmentActionDecision.ALLOWED, + allowed=True, + requires_approval=False, + required_truth_state="simulated_or_test", + dry_run_required=dry_run, + rollback_required=not rollback_available, + reason="execucao de desenvolvimento reversivel, simulada ou auditavel", + required_evidence=required_evidence + ("dryRun", "executionMode"), + ) + return DevelopmentActionClassification( + action=action, + decision=DevelopmentActionDecision.ALLOWED, + allowed=True, + requires_approval=False, + required_truth_state="test_or_partial", + dry_run_required=True, + rollback_required=not rollback_available, + reason="acao permitida em desenvolvimento desde que auditavel e sem efeito externo", + required_evidence=required_evidence + ("executionMode",), + ) + + +def validate_registry(registry: InstitutionalDecisionRegistry) -> tuple[DecisionFinding, ...]: + """Validate consistency of the decision registry.""" + + findings: list[DecisionFinding] = [] + decision_ids = {decision.decision_id for decision in registry.decisions} + for decision_id in DecisionId: + if decision_id not in decision_ids: + findings.append( + _finding( + decision_id, + "decisions", + "decisao institucional ausente do registro", + "adicionar decisao antes de considerar rodada pronta", + ) + ) + docs = next((decision for decision in registry.decisions if decision.decision_id == DecisionId.DOCS_FULL_KNOWLEDGE), None) + if docs and any("catalogOnly permanente" in item for item in docs.forbidden_actions) is False: + findings.append( + _finding( + DecisionId.DOCS_FULL_KNOWLEDGE, + "forbidden_actions", + "Docs pleno nao bloqueia explicitamente catalogOnly permanente", + "registrar proibicao de catalogOnly permanente", + ) + ) + mcp_cases = [case for case in registry.acceptance_cases if case.decision_id == DecisionId.MCP_ACCEPTANCE] + if mcp_cases and not all("traceId" in case.required_fields and "auditId" in case.required_fields for case in mcp_cases): + findings.append( + _finding( + DecisionId.MCP_ACCEPTANCE, + "acceptance_cases", + "caso MCP sem traceId/auditId", + "regenerar acceptance cases com campos de transito completos", + ) + ) + return tuple(findings) + + +def build_institutional_decision_registry(*, use_generated: bool = True) -> InstitutionalDecisionRegistry: + """Build the full registry and validate it.""" + + decisions = build_institutional_decisions() + cases = build_acceptance_cases(use_generated=use_generated) + seed = { + "decisions": [decision.decision_id.value for decision in decisions], + "cases": len(cases), + "source": CANONICAL_DECISION_SOURCE, + } + provisional = InstitutionalDecisionRegistry( + registry_id=f"institutional-decisions-{stable_digest(seed, 16)}", + generated_at=utc_now(), + source_order=CANONICAL_DECISION_SOURCE, + control_plane_id=CONTROL_PLANE_ID, + decisions=decisions, + acceptance_cases=cases, + findings=(), + summary=(), + ) + findings = validate_registry(provisional) + summary = ( + f"Decisoes institucionalizadas: {len(decisions)}", + f"Casos de aceite: {len(cases)}", + f"Findings bloqueantes: {sum(1 for item in findings if item.severity == DecisionFindingSeverity.BLOCKER)}", + "Docs pleno exige responseReady e pendencia automatica quando nao houver resposta.", + "Aceite administrativo interplataforma passa por MCP com traceId e auditId.", + "Retencao padrao de provas institucionais e de 10 anos.", + "Expurgo de dado vivo sensivel esta vetado; limpeza operacional usa vocabulario proprio.", + "Biblioteca Privada pertence a Integracoes como app/produto/modulo.", + "GPT/Codex executam amplamente em desenvolvimento sem segredo bruto nem efeito externo real.", + ) + return InstitutionalDecisionRegistry( + registry_id=provisional.registry_id, + generated_at=provisional.generated_at, + source_order=provisional.source_order, + control_plane_id=provisional.control_plane_id, + decisions=decisions, + acceptance_cases=cases, + findings=findings, + summary=summary, + ) + + +def registry_compact_payload(registry: InstitutionalDecisionRegistry, *, limit_cases: int = 120) -> dict[str, Any]: + """Return a compact JSON payload.""" + + return { + "registryId": registry.registry_id, + "generatedAt": registry.generated_at, + "sourceOrder": registry.source_order, + "controlPlaneId": registry.control_plane_id, + "decisionsCount": registry.decisions_count, + "casesCount": registry.cases_count, + "blockerCount": registry.blocker_count, + "passed": registry.passed, + "summary": list(registry.summary), + "decisions": [decision.to_dict() for decision in registry.decisions], + "casesSample": [case.to_dict() for case in registry.acceptance_cases[: max(0, limit_cases)]], + "findings": [finding.to_dict() for finding in registry.findings], + } + + +def decision_case_rows(cases: Sequence[DecisionAcceptanceCase]) -> list[list[str]]: + rows = [ + [ + "case_id", + "decision_id", + "platform_id", + "actor", + "operation", + "status", + "expected_truth_state", + "mcp_required", + "retention_years", + "legal_hold_blocks_disposal", + "dry_run_required", + "rollback_required", + "direct_bypass_blocked", + "source_hash", + "evidence_id", + "next_action", + ] + ] + for case in cases: + rows.append( + [ + case.case_id, + case.decision_id.value, + case.platform_id, + case.actor.value, + case.operation, + case.status.value, + case.expected_truth_state, + "yes" if case.mcp_required else "no", + str(case.retention_years), + "yes" if case.legal_hold_blocks_disposal else "no", + "yes" if case.dry_run_required else "no", + "yes" if case.rollback_required else "no", + "yes" if case.direct_bypass_blocked else "no", + case.source_hash, + case.evidence_id, + case.next_action, + ] + ) + return rows + + +def rows_to_csv(rows: Sequence[Sequence[str]]) -> str: + buffer = io.StringIO() + writer = csv.writer(buffer, lineterminator="\n") + writer.writerows(rows) + return buffer.getvalue() + + +def registry_markdown(registry: InstitutionalDecisionRegistry, *, limit_cases: int = 80) -> str: + """Render the registry as a human audit document.""" + + lines = [ + "# Registro executavel das decisoes institucionais", + "", + f"- registry_id: `{registry.registry_id}`", + f"- generated_at: `{registry.generated_at}`", + f"- source_order: `{registry.source_order}`", + f"- control_plane: `{registry.control_plane_id}`", + f"- decisions: `{registry.decisions_count}`", + f"- acceptance_cases: `{registry.cases_count}`", + f"- passed: `{registry.passed}`", + "", + "## Sumario", + "", + ] + lines.extend(f"- {item}" for item in registry.summary) + lines.extend(["", "## Decisoes", ""]) + for decision in registry.decisions: + lines.extend( + [ + f"### {decision.title}", + "", + f"- id: `{decision.decision_id.value}`", + f"- owner: `{decision.owner_platform_id}`", + f"- status: `{decision.status.value}`", + f"- diretriz: {decision.directive}", + f"- controles: {', '.join(decision.required_controls)}", + f"- proibicoes: {', '.join(decision.forbidden_actions)}", + f"- criterio pronto: {', '.join(decision.ready_criteria)}", + "", + ] + ) + lines.extend(["## Casos de aceite amostrais", ""]) + for case in registry.acceptance_cases[: max(0, limit_cases)]: + lines.append( + f"- `{case.case_id}` `{case.decision_id.value}` `{case.platform_id}` " + f"`{case.actor.value}` `{case.operation}` -> `{case.status.value}` evidence `{case.evidence_id}`" + ) + if len(registry.acceptance_cases) > limit_cases: + lines.append(f"- ... {len(registry.acceptance_cases) - limit_cases} casos adicionais no CSV/JSON.") + lines.extend(["", "## Findings", ""]) + if registry.findings: + for finding in registry.findings: + lines.append( + f"- `{finding.severity.value}` `{finding.decision_id}` `{finding.field}`: " + f"{redact_sensitive_text(finding.message)} Proxima acao: {finding.next_action}" + ) + else: + lines.append("- Nenhum finding bloqueante.") + return "\n".join(lines).strip() + "\n" + + +def generated_files(project_root: Path, central_platform_folder: Path | None = None) -> tuple[GeneratedFile, ...]: + relation = "000-ROTEADOR-PERMANENTE-DE-ORDEM_DE_SERVICO" + specs = ( + ("dados/institutional-decisions-registry.json", "Registro JSON das seis decisoes institucionais.", "institutional decisions registry", "json"), + ("dados/institutional-decisions-compact.json", "Registro compacto das decisoes para GPT/UI/MCP.", "institutional decisions compact", "json"), + ("matrizes/institutional-decision-acceptance-cases.csv", "Matriz de casos de aceite das decisoes.", "institutional decisions acceptance matrix", "csv"), + ("ecossistema/INSTITUTIONAL-DECISIONS.md", "Relatorio humano das decisoes institucionais.", "institutional decisions report", "markdown"), + ) + records = [ + GeneratedFile( + path=str(project_root / relative), + description=description, + function=function, + file_type=file_type, + changed_by="mais_humana.institutional_decisions", + change_summary="Criado ou atualizado registro executavel das seis decisoes institucionais.", + relation_to_order=relation, + ) + for relative, description, function, file_type in specs + ] + if central_platform_folder is not None: + records.append( + GeneratedFile( + path=str(central_platform_folder / "reports" / "EXECUTADO__institutional-decisions.md"), + description="Copia central do registro das decisoes institucionais.", + function="institutional decisions central report", + file_type="markdown", + changed_by="mais_humana.institutional_decisions", + change_summary="Registradas decisoes institucionais no dossie central.", + relation_to_order=relation, + ) + ) + return tuple(records) + + +def write_institutional_decision_artifacts( + registry: InstitutionalDecisionRegistry, + project_root: Path, + *, + central_platform_folder: Path | None = None, +) -> tuple[GeneratedFile, ...]: + """Write JSON, CSV, Markdown artifacts and update local semantic SQLite.""" + + targets: list[tuple[Path, str]] = [ + (project_root / "dados" / "institutional-decisions-registry.json", json.dumps(registry.to_dict(), ensure_ascii=False, indent=2, sort_keys=True)), + (project_root / "dados" / "institutional-decisions-compact.json", json.dumps(registry_compact_payload(registry), ensure_ascii=False, indent=2, sort_keys=True)), + (project_root / "matrizes" / "institutional-decision-acceptance-cases.csv", rows_to_csv(decision_case_rows(registry.acceptance_cases))), + (project_root / "ecossistema" / "INSTITUTIONAL-DECISIONS.md", registry_markdown(registry)), + ] + records = list(generated_files(project_root, None)) + if central_platform_folder is not None: + central_target = central_platform_folder / "reports" / "EXECUTADO__institutional-decisions.md" + targets.append((central_target, registry_markdown(registry))) + records.append( + GeneratedFile( + path=str(central_target), + description="Copia central do registro das decisoes institucionais.", + function="institutional decisions central report", + file_type="markdown", + changed_by="mais_humana.institutional_decisions", + change_summary="Registradas decisoes institucionais no dossie central.", + relation_to_order="000-ROTEADOR-PERMANENTE-DE-ORDEM_DE_SERVICO", + ) + ) + for path, text in targets: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(text, encoding="utf-8") + with connect(project_root / "controle-semantico.sqlite") as conn: + upsert_files(conn, records) + conn.commit() + return tuple(records) + + +def run_institutional_decisions( + *, + project_root: Path, + central_platform_folder: Path | None = None, + use_generated: bool = True, +) -> tuple[InstitutionalDecisionRegistry, tuple[GeneratedFile, ...]]: + registry = build_institutional_decision_registry(use_generated=use_generated) + records = write_institutional_decision_artifacts(registry, project_root, central_platform_folder=central_platform_folder) + return registry, records +