diff --git a/src/mais_humana/institutional_decisions.py b/src/mais_humana/institutional_decisions.py index b596111..e876885 100644 --- a/src/mais_humana/institutional_decisions.py +++ b/src/mais_humana/institutional_decisions.py @@ -1,1397 +1,338 @@ -"""Executable registry for the six institutional ecosystem decisions. - -The router made six management decisions operational: Docs is a full -knowledge platform, platform-to-platform acceptance goes through MCP, evidence -retention defaults to ten years, live sensitive purge is forbidden, platform -names end in ``platform``, and GPT/Codex may execute broadly in development -when the action is reversible, audited, and does not leak secrets. - -This module turns those decisions into deterministic code, acceptance cases, -validators, Markdown/CSV/JSON artifacts, and semantic file records. It is -intentionally independent from live Cloudflare access; deploy and external -effects remain outside the local policy registry. -""" - from __future__ import annotations -import csv -import io -import json -from dataclasses import dataclass -from enum import Enum -from pathlib import Path -from typing import Any, Iterable, Mapping, Sequence - -from .human_rulebook import MCP_CONTROL_PLANE_ID, MCP_TRANSIT_FIELDS -from .models import GeneratedFile, as_plain_data, merge_unique, utc_now -from .redaction import SECRET_PATTERNS, redact_sensitive_text -from .repository_mesh import stable_digest -from .storage import connect, upsert_files +from dataclasses import dataclass, asdict +from typing import Iterable -class DecisionId(str, Enum): - """Identifiers for the institutional decisions.""" - - DOCS_FULL_KNOWLEDGE = "docs_full_knowledge" - MCP_ACCEPTANCE = "mcp_acceptance" - TEN_YEAR_RETENTION = "ten_year_retention" - LIVE_SENSITIVE_PURGE_FORBIDDEN = "live_sensitive_purge_forbidden" - CANONICAL_PLATFORM_NAMES = "canonical_platform_names" - DEV_EXECUTION_AUTHORITY = "dev_execution_authority" +POLICY_VERSION = "2026-05-04.mais-humana-institutional-decisions.v1" -class DecisionStatus(str, Enum): - """Lifecycle status of one decision in this registry.""" - - INSTITUTIONALIZED = "institutionalized" - PARTIAL = "partial" - BLOCKED = "blocked" +@dataclass(frozen=True) +class HumanImpact: + profile_id: str + expectation: str + visible_signal: str + risk_if_missing: str + next_human_question: str -class DecisionActor(str, Enum): - """Actors that need explicit acceptance cases.""" - - GPT = "gpt" - CODEX = "codex" - MCP_GATEWAY = "mcp_gateway" - ADMIN_UI = "admin_ui" - HUMAN_ADMIN = "human_admin" - SUPPORT = "support" - RELEASE_MANAGER = "release_manager" - SECURITY_AUDITOR = "security_auditor" - - -class DecisionCaseStatus(str, Enum): - """Operational expectation for one acceptance case.""" - - REQUIRED = "required" - ALLOWED_WITH_GATES = "allowed_with_gates" - BLOCKED_IF_MISSING = "blocked_if_missing" - FORBIDDEN = "forbidden" - - -class DecisionFindingSeverity(str, Enum): - """Severity used by validators.""" - - INFO = "info" - WARNING = "warning" - BLOCKER = "blocker" - - -class DocsSufficiencyState(str, Enum): - """Answer state required from Docs as a full knowledge platform.""" - - FOUND = "encontrado" - PARTIAL = "parcial" - NOT_FOUND = "nao_encontrado" - CONFLICTING = "conflitante" - - -class RetentionDecisionKind(str, Enum): - """Retention classification for an artifact or record.""" - - TEN_YEAR_EVIDENCE = "ten_year_evidence" - SHORT_LIVED_OPERATIONAL = "short_lived_operational" - LEGAL_HOLD = "legal_hold" - LIVE_SENSITIVE_RETAINED = "live_sensitive_retained" - - -class CleanupDecisionKind(str, Enum): - """Cleanup/purge vocabulary enforced by the router.""" - - OPERATIONAL_CLEANUP = "operational_cleanup" - PURGE_FORBIDDEN = "purge_forbidden" - RETAIN_WITH_EVIDENCE = "retain_with_evidence" - OWNER_REVIEW = "owner_review" - - -class DevelopmentActionDecision(str, Enum): - """Policy classification for GPT/Codex development actions.""" - - ALLOWED = "allowed" - ALLOWED_DRY_RUN = "allowed_dry_run" - APPROVAL_REQUIRED = "approval_required" - BLOCKED = "blocked" - - -CANONICAL_DECISION_SOURCE = "000-ROTEADOR-PERMANENTE-DE-ORDEM_DE_SERVICO" -CONTROL_PLANE_ID = MCP_CONTROL_PLANE_ID -DOCS_PLATFORM_ID = "tudo-para-ia-docs-plataform" -DOCS_CANONICAL_TARGET_ID = "tudo-para-ia-docs-platform" -BIBLIOTECA_OWNER_PLATFORM_ID = "tudo-para-ia-integracoes-platform" -BIBLIOTECA_LEGACY_IDS = ( - "tudo-para-ia-biblioteca-privada", - "tudo-para-ia-biblioteca-privada-ia-plataform", -) - -AFFECTED_PLATFORMS: tuple[str, ...] = ( - "tudo-para-ia-docs-plataform", - "tudo-para-ia-mcps-internos-plataform", - "tudo-para-ia-ui-platform", - "tudo-para-ia-integracoes-plataform", - "tudo-para-ia-customer-ops-platform", - "tudo-para-ia-mais-humana-platform", - "tudo-para-ia-identity-platform", - "tudo-para-ia-business-platform", -) - -COMMON_EVIDENCE_FIELDS: tuple[str, ...] = ( - "sourceHash", - "sourcePayloadHash", - "sourceRecordsHash", - "evidenceId", - "traceId", - "auditId", - "timestamp", - "truthState", -) - -TEN_YEAR_SUBJECTS: tuple[str, ...] = ( - "documentacao", - "contrato", - "aceite", - "evidencia", - "auditoria", - "execucao", - "pendencia", - "hash", - "decisao_institucional", - "rollback", -) - -SHORT_LIVED_SUBJECTS: tuple[str, ...] = ( - "cache", - "build", - "dist", - "temporario", - "log_bruto_repetido", - "node_modules", - "coverage", - "snapshot_descartavel", -) - -OPERATIONAL_CLEANUP_TERMS: tuple[str, ...] = ( - ".test-tmp", - "cache", - "build", - "dist", - "node_modules", - "coverage", - "logs repetidos", - "artifacts temporarios", - "saidas intermediarias", -) - -SENSITIVE_LIVE_TERMS: tuple[str, ...] = ( - "dado vivo sensivel", - "cliente real", - "segredo", - "evidencia unica", - "historico", - "base live", -) - -DESTRUCTIVE_ACTION_TERMS: tuple[str, ...] = ( - "destroy", - "drop database", - "delete history", - "apagar historico", - "apagar evidencia", - "expurgar dado real", - "cobrar cliente real", - "enviar mensagem real", -) - -DEV_ALLOWED_ACTION_TERMS: tuple[str, ...] = ( - "simular usuario", - "simular organizacao", - "simular tenant", - "simular compra", - "simular credencial", - "simular sessao", - "simular admin", - "simular suporte", - "simular painel", - "rodar teste", - "criar massa de teste", - "validar fluxo", -) - - -@dataclass(frozen=True, slots=True) +@dataclass(frozen=True) class InstitutionalDecision: - """One institutional decision with operational controls.""" - - decision_id: DecisionId + decision_id: str title: str - directive: str - status: DecisionStatus - owner_platform_id: str - affected_platforms: tuple[str, ...] - required_controls: tuple[str, ...] - forbidden_actions: tuple[str, ...] - evidence_fields: tuple[str, ...] - output_obligations: tuple[str, ...] + human_summary: str + operational_owner: str + expected_mcp_evidence: tuple[str, ...] + affected_profiles: tuple[HumanImpact, ...] ready_criteria: tuple[str, ...] - source_order: str = CANONICAL_DECISION_SOURCE - - def to_dict(self) -> dict[str, Any]: - return as_plain_data(self) + blockers: tuple[str, ...] -@dataclass(frozen=True, slots=True) -class DecisionAcceptanceCase: - """One deterministic case for validating a decision across actors.""" - - case_id: str - decision_id: DecisionId - platform_id: str - actor: DecisionActor - operation: str - topic: str - status: DecisionCaseStatus - required_fields: tuple[str, ...] - expected_truth_state: str - mcp_required: bool - retention_years: int - legal_hold_blocks_disposal: bool - dry_run_required: bool - rollback_required: bool - direct_bypass_blocked: bool - reason: str - next_action: str - evidence_id: str - source_hash: str - - def to_dict(self) -> dict[str, Any]: - return as_plain_data(self) - - -@dataclass(frozen=True, slots=True) -class DecisionFinding: - """One validator finding.""" - - finding_id: str - severity: DecisionFindingSeverity - decision_id: DecisionId | str - field: str - message: str - next_action: str - - def to_dict(self) -> dict[str, Any]: - return as_plain_data(self) - - -@dataclass(frozen=True, slots=True) -class DocsSufficiencyResult: - """Response sufficiency produced when Docs is queried.""" - - state: DocsSufficiencyState - answer_allowed: bool - pending_required: bool - reason: str - required_next_action: str - source_hash: str - evidence_id: str - - def to_dict(self) -> dict[str, Any]: - return as_plain_data(self) - - -@dataclass(frozen=True, slots=True) -class RetentionDecision: - """Retention decision for one subject.""" - - subject: str - decision: RetentionDecisionKind - retention_years: int - legal_hold: bool - disposal_allowed: bool - rollback_required: bool - reason: str - required_evidence: tuple[str, ...] - - def to_dict(self) -> dict[str, Any]: - return as_plain_data(self) - - -@dataclass(frozen=True, slots=True) -class CleanupDecision: - """Classification for workspace cleanup versus forbidden purge.""" - - target: str - decision: CleanupDecisionKind - term_to_use: str - allowed: bool - requires_inventory: bool - reason: str - next_action: str - - def to_dict(self) -> dict[str, Any]: - return as_plain_data(self) - - -@dataclass(frozen=True, slots=True) -class PlatformNameDecision: - """Canonical naming decision for a platform/app/module identifier.""" - - identifier: str - canonical_identifier: str - accepted: bool - platform_autonomous: bool +@dataclass(frozen=True) +class PlatformAlias: + current_name: str + canonical_name: str + alias_status: str + is_platform: bool owner_platform_id: str - reason: str - required_action: str - - def to_dict(self) -> dict[str, Any]: - return as_plain_data(self) + human_note: str -@dataclass(frozen=True, slots=True) -class DevelopmentActionClassification: - """Classification for GPT/Codex development execution.""" - +@dataclass(frozen=True) +class DevelopmentActionAssessment: action: str - decision: DevelopmentActionDecision + environment: str allowed: bool - requires_approval: bool - required_truth_state: str - dry_run_required: bool - rollback_required: bool - reason: str - required_evidence: tuple[str, ...] - - def to_dict(self) -> dict[str, Any]: - return as_plain_data(self) + decision: str + required_controls: tuple[str, ...] + human_explanation: str -@dataclass(frozen=True, slots=True) -class InstitutionalDecisionRegistry: - """Full registry of decisions and acceptance cases.""" - - registry_id: str - generated_at: str - source_order: str - control_plane_id: str - decisions: tuple[InstitutionalDecision, ...] - acceptance_cases: tuple[DecisionAcceptanceCase, ...] - findings: tuple[DecisionFinding, ...] - summary: tuple[str, ...] - - @property - def decisions_count(self) -> int: - return len(self.decisions) - - @property - def cases_count(self) -> int: - return len(self.acceptance_cases) - - @property - def blocker_count(self) -> int: - return sum(1 for finding in self.findings if finding.severity == DecisionFindingSeverity.BLOCKER) - - @property - def passed(self) -> bool: - return self.blocker_count == 0 - - def to_dict(self) -> dict[str, Any]: - data = as_plain_data(self) - data["decisions_count"] = self.decisions_count - data["cases_count"] = self.cases_count - data["blocker_count"] = self.blocker_count - data["passed"] = self.passed - return data - - -def build_institutional_decisions() -> tuple[InstitutionalDecision, ...]: - """Return the six operational decisions as structured policy.""" - - return ( - InstitutionalDecision( - decision_id=DecisionId.DOCS_FULL_KNOWLEDGE, - title="Docs pleno como plataforma de conhecimento operacional", - directive="Docs deve operar como plataforma responseReady, com busca, leitura ampla governada e pendencia quando a resposta nao existir.", - status=DecisionStatus.INSTITUTIONALIZED, - owner_platform_id=DOCS_PLATFORM_ID, - affected_platforms=AFFECTED_PLATFORMS, - required_controls=( - "leitura responseReady ampla", - "busca semantica", - "banco semantico proprio", - "resposta encontrado/parcial/nao_encontrado/conflitante", - "registro de fonte, versao, hash, data, plataforma, tema e autoridade", - ), - forbidden_actions=( - "tratar Docs como catalogOnly permanente", - "responder sem fonte e sem hash quando a pergunta exige prova", - "silenciar lacuna documental", - ), - evidence_fields=COMMON_EVIDENCE_FIELDS + ("documentAuthority", "documentVersion", "sufficiencyState"), - output_obligations=( - "registrar lacuna documental automaticamente", - "expor fonte institucional para GPT, MCP e UI", - "separar documento publico, interno, sensivel e restrito", - ), - ready_criteria=( - "consulta retorna estado de suficiencia", - "resposta inexistente gera pendencia", - "fonte possui hash e autoridade", - ), - ), - InstitutionalDecision( - decision_id=DecisionId.MCP_ACCEPTANCE, - title="Aceite interplataforma obrigatorio pelo MCP", - directive="Qualquer circulacao administrativa entre plataformas segue plataforma -> MCP -> plataforma.", - status=DecisionStatus.INSTITUTIONALIZED, - owner_platform_id=CONTROL_PLANE_ID, - affected_platforms=AFFECTED_PLATFORMS, - required_controls=( - "validar origem", - "validar contrato", - "validar hash", - "gerar etiqueta de entrada", - "preservar rastreabilidade", - "gerar etiqueta de saida", - "validar permissao do consumidor", - "registrar evidencia de aceite", - ), - forbidden_actions=( - "aceite puramente bilateral em fluxo administrativo externo", - "bypass direto sem traceId e auditId", - "payload com segredo bruto", - ), - evidence_fields=MCP_TRANSIT_FIELDS + COMMON_EVIDENCE_FIELDS + ("consumerPermission", "acceptanceTag"), - output_obligations=( - "registrar aceite por evidenceId", - "separar cache/fila/read model como excecao formal", - "manter permissionScope do consumidor", - ), - ready_criteria=( - "MCP descobre e valida a capacidade", - "traceId/auditId estao presentes", - "hash de payload e hash de registros batem com a evidencia", - ), - ), - InstitutionalDecision( - decision_id=DecisionId.TEN_YEAR_RETENTION, - title="Retencao institucional padrao de 10 anos", - directive="Evidencia, decisao, contrato, aceite, auditoria e documentacao relevante ficam retidos por 10 anos.", - status=DecisionStatus.INSTITUTIONALIZED, - owner_platform_id="nucleo-gestao-operacional", - affected_platforms=AFFECTED_PLATFORMS, - required_controls=( - "classificar evidencia unica", - "registrar legal hold", - "exigir rollback para operacao sensivel", - "exigir evidencia antes/depois", - "diferenciar cache e temporarios de prova institucional", - ), - forbidden_actions=( - "descartar evidencia unica antes do prazo", - "alterar contrato sensivel sem rollback", - "ignorar legal hold", - ), - evidence_fields=COMMON_EVIDENCE_FIELDS + ("retentionYears", "legalHold", "rollbackPlan", "beforeAfterEvidence"), - output_obligations=( - "marcar prazo de retencao", - "registrar legal hold quando houver", - "registrar rollback e prova antes/depois para operacao sensivel", - ), - ready_criteria=( - "retentionYears=10 para prova institucional", - "legalHold bloqueia descarte", - "rollback existe antes da operacao sensivel", - ), - ), - InstitutionalDecision( - decision_id=DecisionId.LIVE_SENSITIVE_PURGE_FORBIDDEN, - title="Expurgo de dado vivo sensivel vetado", - directive="Expurgo de dado vivo sensivel fica vetado; limpeza operacional usa vocabulario proprio e inventario.", - status=DecisionStatus.INSTITUTIONALIZED, - owner_platform_id="nucleo-gestao-operacional", - affected_platforms=AFFECTED_PLATFORMS, - required_controls=( - "classificar limpeza operacional", - "inventariar peso de projeto inchado", - "segregar artefatos descartaveis", - "preservar historico essencial", - "registrar rematerializacao limpa quando necessaria", - ), - forbidden_actions=( - "chamar limpeza operacional de expurgo", - "apagar dado vivo sensivel", - "apagar evidencia unica", - ), - evidence_fields=COMMON_EVIDENCE_FIELDS + ("cleanupClass", "weightInventory", "retainedEvidence"), - output_obligations=( - "usar limpeza operacional/higienizacao/rematerializacao", - "registrar caminhos descartaveis", - "registrar o que permanece por evidencia", - ), - ready_criteria=( - "dado vivo sensivel nunca marcado como expurgavel", - "cache/build/dist/node_modules sao limpeza operacional", - "inventario de peso existe para projeto inchado", - ), - ), - InstitutionalDecision( - decision_id=DecisionId.CANONICAL_PLATFORM_NAMES, - title="Nomes canonicos terminados em platform", - directive="Plataformas comuns terminam em platform; Biblioteca Privada nao e plataforma autonoma.", - status=DecisionStatus.INSTITUTIONALIZED, - owner_platform_id="nucleo-gestao-operacional", - affected_platforms=AFFECTED_PLATFORMS + BIBLIOTECA_LEGACY_IDS, - required_controls=( - "registrar alias plataform como legado temporario", - "normalizar ownerPlatformId para platform", - "reposicionar Biblioteca Privada como app/produto/modulo de Integracoes", - "bloquear repositorio duplicado por variante de nome", - ), - forbidden_actions=( - "criar ownerPlatformId proprio para Biblioteca Privada", - "tratar sufixo plataform como canonico novo", - "duplicar repositorio por rename sem janela", - ), - evidence_fields=COMMON_EVIDENCE_FIELDS + ("canonicalPlatformId", "legacyAlias", "ownerPlatformId"), - output_obligations=( - "registrar aliases e mapa canonico", - "atualizar matrizes/readiness/ordens", - "manter janela segura para rename fisico", - ), - ready_criteria=( - "id canonico termina com platform", - "Biblioteca Privada aponta para Integracoes", - "alias legado nao cria plataforma autonoma", - ), - ), - InstitutionalDecision( - decision_id=DecisionId.DEV_EXECUTION_AUTHORITY, - title="GPT e Codex com execucao ampla em desenvolvimento", - directive="Em desenvolvimento, GPT e Codex podem executar e simular amplamente com auditoria, truthState e sem efeito externo real.", - status=DecisionStatus.INSTITUTIONALIZED, - owner_platform_id="nucleo-gestao-operacional", - affected_platforms=AFFECTED_PLATFORMS, - required_controls=( - "truthState", - "actorId", - "auditId", - "dryRun quando aplicavel", - "marcacao real/simulado/teste/parcial/stale/bloqueado/derivado", - "bloqueio de efeito externo real sem autorizacao", - ), - forbidden_actions=( - "destruir banco", - "apagar historico", - "apagar evidencia", - "vazar segredo", - "alterar producao sem rollback", - "cobrar cliente real", - "enviar mensagem real a terceiro", - ), - evidence_fields=COMMON_EVIDENCE_FIELDS + ("actorId", "dryRun", "executionMode", "externalEffect"), - output_obligations=( - "marcar execucao simulada", - "registrar auditId", - "bloquear acao irreversivel", - ), - ready_criteria=( - "acao reversivel ou dryRun", - "sem segredo bruto", - "sem efeito externo real sem autorizacao explicita", - ), - ), +def _impact(profile_id: str, expectation: str, signal: str, risk: str, question: str) -> HumanImpact: + return HumanImpact( + profile_id=profile_id, + expectation=expectation, + visible_signal=signal, + risk_if_missing=risk, + next_human_question=question, ) -def _decision_by_id(decisions: Sequence[InstitutionalDecision]) -> dict[DecisionId, InstitutionalDecision]: - return {decision.decision_id: decision for decision in decisions} +INSTITUTIONAL_DECISIONS: tuple[InstitutionalDecision, ...] = ( + InstitutionalDecision( + decision_id="docs_full_operational_platform", + title="Docs pleno operacional", + human_summary="Docs deixa de ser catalogo passivo e passa a responder suficiência documental com fonte, autoridade e lacuna.", + operational_owner="tudo-para-ia-docs-plataform", + expected_mcp_evidence=("documentId", "sourceHash", "authority", "truthState", "missingTopics"), + affected_profiles=( + _impact( + "operador_gpt", + "Encontrar resposta institucional sem adivinhar.", + "status found/partial/not_found/conflicting", + "GPT responde com base incompleta ou inventada.", + "A resposta existe na Docs ou precisa virar pendencia?", + ), + _impact( + "suporte_cliente", + "Explicar regra ao cliente com base rastreavel.", + "documento com authority e sourceHash", + "Suporte passa orientacao sem lastro.", + "Qual documento posso citar como fonte?", + ), + ), + ready_criteria=( + "Docs publica pelo menos uma tool responseReady.", + "Consulta de suficiência gera pendencia quando nao encontra resposta.", + "Resposta informa autoridade, fonte e hash.", + ), + blockers=("docs_catalog_only_sem_excecao", "lacuna_sem_pedido_documental"), + ), + InstitutionalDecision( + decision_id="mcp_required_cross_platform_acceptance", + title="Aceite externo via MCP", + human_summary="Quando uma plataforma entrega dado, contrato, aceite ou evidencia para outra, o MCP precisa aparecer no caminho.", + operational_owner="tudo-para-ia-mcps-internos-plataform", + expected_mcp_evidence=("originPlatformId", "destinationPlatformId", "contractHash", "sourceRecordsHash", "permissionId", "evidenceId"), + affected_profiles=( + _impact( + "admin_ecossistema", + "Saber quem aceitou o que e com qual contrato.", + "rota platform -> MCP -> platform", + "Aceite bilateral fica sem auditoria cross-platform.", + "O aceite passou pelo MCP e tem hash?", + ), + _impact( + "auditor", + "Comparar origem, destino, permissao e evidencia.", + "traceId + auditId + evidenceId", + "Nao ha prova de permissao do consumidor.", + "Qual evidencia prova a circulacao?", + ), + ), + ready_criteria=( + "Envelope possui origem, destino, contrato, hashes e permissao.", + "MCP gera etiqueta de entrada e saida.", + "Evidencia de aceite fica acessivel ao GPT e ao painel.", + ), + blockers=("mcp_bypass", "hash_ausente", "permissao_ausente"), + ), + InstitutionalDecision( + decision_id="ten_year_institutional_retention", + title="Retencao institucional 10 anos", + human_summary="Prova de decisao, execucao, aceite, contrato, evidencia e auditoria deve ser preservada por 10 anos.", + operational_owner="nucleo-gestao-operacional", + expected_mcp_evidence=("retentionClass", "retentionYears", "legalHold", "rollbackPlan", "beforeAfterEvidence"), + affected_profiles=( + _impact( + "gestor_operacional", + "Confiar que prova institucional nao desaparece.", + "retentionClass institutional_10y", + "Decisao futura perde contexto e responsabilidade.", + "Esta evidencia esta retida por 10 anos?", + ), + _impact( + "compliance", + "Bloquear descarte sob legal hold.", + "legalHoldBlocksDiscard=true", + "Descarte indevido vira risco juridico.", + "Ha legal hold ou rollback antes da operacao sensivel?", + ), + ), + ready_criteria=( + "Evidence, contrato, aceite e auditoria usam classe 10 anos.", + "Operacao sensivel tem rollback.", + "Evidencia antes/depois existe quando aplicavel.", + ), + blockers=("retencao_curta_para_prova_unica", "operacao_sensivel_sem_rollback"), + ), + InstitutionalDecision( + decision_id="live_sensitive_data_purge_forbidden", + title="Expurgo de dado vivo sensivel vetado", + human_summary="Limpeza operacional e permitida; apagar dado vivo sensivel nao e politica atual.", + operational_owner="nucleo-gestao-operacional", + expected_mcp_evidence=("cleanupClassification", "allowedTerms", "forbiddenTerms", "sensitiveDataCheck"), + affected_profiles=( + _impact( + "operador_codex", + "Limpar cache/build sem chamar isso de expurgo.", + "classification operational_cleanup", + "Artefato vivo pode ser apagado por erro de nomenclatura.", + "Isto e limpeza operacional ou dado vivo sensivel?", + ), + _impact( + "responsavel_cliente", + "Garantir que dados reais nao sejam apagados sem decisao.", + "blocked_live_sensitive_purge", + "Perda de dado sensivel ou evidencia.", + "Existe rollback e inventario antes de qualquer descarte?", + ), + ), + ready_criteria=( + "Cache/build/dist/coverage sao classificados como limpeza operacional.", + "Dado vivo sensivel fica bloqueado.", + "Projeto inchado passa por inventario de peso.", + ), + blockers=("expurgo_dado_vivo", "limpeza_chamada_expurgo"), + ), + InstitutionalDecision( + decision_id="canonical_platform_names", + title="Nomes canonicos e Biblioteca Privada", + human_summary="Plataformas comuns terminam em platform; Biblioteca Privada nao aparece como plataforma autonoma.", + operational_owner="nucleo-gestao-operacional", + expected_mcp_evidence=("canonicalName", "aliasStatus", "ownerPlatformId", "providerId"), + affected_profiles=( + _impact( + "admin_ecossistema", + "Ver nomes sem duplicidade entre plataforma real e alias legado.", + "aliasStatus canonical/legacy_alias", + "Matriz cria plataforma duplicada.", + "Este nome e canonico ou alias legado?", + ), + _impact( + "produto_integracoes", + "Tratar Biblioteca Privada como produto/modulo de Integracoes.", + "isPlatform=false", + "Biblioteca ganha owner errado e contrato paralelo.", + "Quem e o owner real deste modulo?", + ), + ), + ready_criteria=( + "Novos nomes terminam em platform.", + "plataform/plataforma permanecem apenas como aliases legados.", + "Biblioteca Privada nao recebe ownerPlatformId proprio.", + ), + blockers=("novo_nome_plataform", "biblioteca_como_plataforma_autonoma"), + ), + InstitutionalDecision( + decision_id="development_execution_for_gpt_codex", + title="Execucao ampla GPT/Codex em desenvolvimento", + human_summary="GPT e Codex podem simular e operar em desenvolvimento quando houver rastreabilidade e sem efeito externo real.", + operational_owner="nucleo-gestao-operacional", + expected_mcp_evidence=("truthState", "actorId", "auditId", "dryRun", "environment"), + affected_profiles=( + _impact( + "operador_gpt", + "Executar fluxo de teste sem pedir autorizacao indevida.", + "truthState simulated/test", + "Rodada fica passiva e nao avanca.", + "Esta acao e reversivel, auditavel e de desenvolvimento?", + ), + _impact( + "gestor_risco", + "Bloquear destruicao, segredo e efeito externo real.", + "decision authorization_required/blocked_secret_exposure", + "Acao real pode escapar sem aprovacao.", + "Existe autorizacao explicita, rollback e evidencia antes/depois?", + ), + ), + ready_criteria=( + "Simulacao usa truthState, actorId e auditId.", + "dryRun aparece quando aplicavel.", + "Efeito externo real exige autorizacao explicita.", + ), + blockers=("segredo_exposto", "efeito_externo_sem_autorizacao", "acao_destrutiva_sem_rollback"), + ), +) -def _operations_for(decision_id: DecisionId) -> tuple[str, ...]: +PLATFORM_ALIASES: tuple[PlatformAlias, ...] = ( + PlatformAlias( + current_name="tudo-para-ia-docs-plataform", + canonical_name="tudo-para-ia-docs-platform", + alias_status="legacy_alias", + is_platform=True, + owner_platform_id="tudo-para-ia-docs-platform", + human_note="Alias legado deve permanecer rastreado ate decisao de rename.", + ), + PlatformAlias( + current_name="tudo-para-ia-integracoes-plataform", + canonical_name="tudo-para-ia-integracoes-platform", + alias_status="legacy_alias", + is_platform=True, + owner_platform_id="tudo-para-ia-integracoes-platform", + human_note="Alias legado nao deve virar novo providerId.", + ), + PlatformAlias( + current_name="tudo-para-ia-mcps-internos-plataform", + canonical_name="tudo-para-ia-mcps-internos-platform", + alias_status="legacy_alias", + is_platform=True, + owner_platform_id="tudo-para-ia-mcps-internos-platform", + human_note="MCP segue como nucleo administrativo mesmo enquanto o repo real usa alias legado.", + ), + PlatformAlias( + current_name="tudo-para-ia-biblioteca-privada", + canonical_name="tudo-para-ia-integracoes-platform", + alias_status="module_not_platform", + is_platform=False, + owner_platform_id="tudo-para-ia-integracoes-platform", + human_note="Biblioteca Privada deve ser produto/app/modulo de Integracoes.", + ), +) + + +def list_institutional_decisions() -> tuple[InstitutionalDecision, ...]: + return INSTITUTIONAL_DECISIONS + + +def summarize_decisions(decisions: Iterable[InstitutionalDecision] = INSTITUTIONAL_DECISIONS) -> dict[str, object]: + items = tuple(decisions) + profile_ids = sorted({impact.profile_id for decision in items for impact in decision.affected_profiles}) return { - DecisionId.DOCS_FULL_KNOWLEDGE: ( - "indexar_documento", - "buscar_semanticamente", - "responder_suficiencia", - "registrar_lacuna", - "expor_fonte_para_mcp_ui", - ), - DecisionId.MCP_ACCEPTANCE: ( - "validar_origem", - "validar_contrato_hash", - "gerar_etiqueta_entrada", - "gerar_etiqueta_saida", - "registrar_evidencia_aceite", - ), - DecisionId.TEN_YEAR_RETENTION: ( - "classificar_evidencia", - "reter_dez_anos", - "aplicar_legal_hold", - "exigir_rollback", - "registrar_antes_depois", - ), - DecisionId.LIVE_SENSITIVE_PURGE_FORBIDDEN: ( - "classificar_limpeza_operacional", - "bloquear_expurgo_vivo", - "inventariar_peso", - "segregar_artefatos", - "rematerializar_limpo", - ), - DecisionId.CANONICAL_PLATFORM_NAMES: ( - "validar_sufixo_platform", - "resolver_alias_plataform", - "bloquear_biblioteca_autonoma", - "normalizar_owner_platform", - "registrar_app_produto_modulo", - ), - DecisionId.DEV_EXECUTION_AUTHORITY: ( - "simular_usuario_tenant", - "simular_admin_suporte", - "executar_teste_massa", - "executar_dryrun_reversivel", - "bloquear_efeito_externo", - ), - }[decision_id] - - -def _case_status(decision_id: DecisionId, operation: str) -> DecisionCaseStatus: - if decision_id == DecisionId.LIVE_SENSITIVE_PURGE_FORBIDDEN and "bloquear_expurgo" in operation: - return DecisionCaseStatus.FORBIDDEN - if decision_id == DecisionId.DEV_EXECUTION_AUTHORITY and "bloquear_efeito_externo" in operation: - return DecisionCaseStatus.BLOCKED_IF_MISSING - if decision_id == DecisionId.DEV_EXECUTION_AUTHORITY: - return DecisionCaseStatus.ALLOWED_WITH_GATES - return DecisionCaseStatus.REQUIRED - - -def _required_fields_for(decision: InstitutionalDecision, operation: str) -> tuple[str, ...]: - extras: tuple[str, ...] = () - if decision.decision_id == DecisionId.DOCS_FULL_KNOWLEDGE: - extras = ("documentId", "documentAuthority", "sufficiencyState") - elif decision.decision_id == DecisionId.MCP_ACCEPTANCE: - extras = ("origin", "destination", "consumerPermission", "acceptanceTag") - elif decision.decision_id == DecisionId.TEN_YEAR_RETENTION: - extras = ("retentionYears", "legalHold", "rollbackPlan") - elif decision.decision_id == DecisionId.LIVE_SENSITIVE_PURGE_FORBIDDEN: - extras = ("cleanupClass", "purgeBlocked", "weightInventory") - elif decision.decision_id == DecisionId.CANONICAL_PLATFORM_NAMES: - extras = ("canonicalPlatformId", "legacyAlias", "ownerPlatformId") - elif decision.decision_id == DecisionId.DEV_EXECUTION_AUTHORITY: - extras = ("actorId", "dryRun", "executionMode", "externalEffect") - return merge_unique((*decision.evidence_fields, *extras, operation)) - - -def _expected_truth_state(decision_id: DecisionId) -> str: - return { - DecisionId.DOCS_FULL_KNOWLEDGE: "response_ready", - DecisionId.MCP_ACCEPTANCE: "same_source_ready", - DecisionId.TEN_YEAR_RETENTION: "documented", - DecisionId.LIVE_SENSITIVE_PURGE_FORBIDDEN: "documented", - DecisionId.CANONICAL_PLATFORM_NAMES: "same_source_ready", - DecisionId.DEV_EXECUTION_AUTHORITY: "test_or_simulated", - }[decision_id] - - -def build_runtime_acceptance_cases(decisions: Sequence[InstitutionalDecision] | None = None) -> tuple[DecisionAcceptanceCase, ...]: - """Build acceptance cases directly from the policy matrix.""" - - source_decisions = tuple(decisions or build_institutional_decisions()) - by_id = _decision_by_id(source_decisions) - cases: list[DecisionAcceptanceCase] = [] - for decision_id in DecisionId: - decision = by_id[decision_id] - for platform_id in decision.affected_platforms: - for actor in DecisionActor: - for operation in _operations_for(decision_id): - status = _case_status(decision_id, operation) - seed = { - "decision": decision_id.value, - "platform": platform_id, - "actor": actor.value, - "operation": operation, - } - source_hash = stable_digest(seed, 64) - cases.append( - DecisionAcceptanceCase( - case_id=f"institutional-{stable_digest(seed, 24)}", - decision_id=decision_id, - platform_id=platform_id, - actor=actor, - operation=operation, - topic=decision.title, - status=status, - required_fields=_required_fields_for(decision, operation), - expected_truth_state=_expected_truth_state(decision_id), - mcp_required=True, - retention_years=10 if decision_id == DecisionId.TEN_YEAR_RETENTION else 0, - legal_hold_blocks_disposal=decision_id == DecisionId.TEN_YEAR_RETENTION, - dry_run_required=decision_id == DecisionId.DEV_EXECUTION_AUTHORITY and status != DecisionCaseStatus.FORBIDDEN, - rollback_required=decision_id in {DecisionId.TEN_YEAR_RETENTION, DecisionId.DEV_EXECUTION_AUTHORITY}, - direct_bypass_blocked=decision_id == DecisionId.MCP_ACCEPTANCE or platform_id != CONTROL_PLANE_ID, - reason=decision.directive, - next_action="validar campos, hashes, truthState e registrar pendencia quando faltar evidencia", - evidence_id=f"evidence-{source_hash[:24]}", - source_hash=source_hash, - ) - ) - return tuple(cases) - - -def build_acceptance_cases(*, use_generated: bool = True) -> tuple[DecisionAcceptanceCase, ...]: - """Return generated cases when available, otherwise build runtime cases.""" - - if use_generated: - try: - from .generated_institutional_decisions import iter_acceptance_cases - - return tuple(iter_acceptance_cases()) - except (ImportError, AttributeError): - pass - return build_runtime_acceptance_cases() - - -def _finding( - decision_id: DecisionId | str, - field: str, - message: str, - next_action: str, - *, - severity: DecisionFindingSeverity = DecisionFindingSeverity.BLOCKER, -) -> DecisionFinding: - seed = stable_digest({"decision": str(decision_id), "field": field, "message": message}, 16) - return DecisionFinding( - finding_id=f"institutional-finding-{seed}", - severity=severity, - decision_id=decision_id, - field=field, - message=message, - next_action=next_action, - ) - - -def _contains_secret(value: object) -> bool: - text = str(as_plain_data(value)) - return any(pattern.search(text) for _, pattern in SECRET_PATTERNS) - - -def validate_mcp_acceptance_envelope(envelope: Mapping[str, Any], *, control_plane_id: str = CONTROL_PLANE_ID) -> tuple[DecisionFinding, ...]: - """Validate the platform -> MCP -> platform envelope.""" - - findings: list[DecisionFinding] = [] - for field in MCP_TRANSIT_FIELDS: - if field not in envelope or envelope[field] in (None, "", [], {}): - findings.append( - _finding( - DecisionId.MCP_ACCEPTANCE, - field, - "campo obrigatorio do envelope MCP ausente ou vazio", - "preencher origin, destination, tool, payload, actor, permission, result, traceId, auditId e timestamp", - ) - ) - destination = str(envelope.get("destination", "")) - if destination and destination != control_plane_id: - findings.append( - _finding( - DecisionId.MCP_ACCEPTANCE, - "destination", - "destino nao aponta para MCPs Internos", - "rotear aceite administrativo pelo MCP antes de circular a outra plataforma", - ) - ) - if str(envelope.get("origin", "")) == destination: - findings.append( - _finding( - DecisionId.MCP_ACCEPTANCE, - "origin", - "origem e destino iguais reduzem rastreabilidade de aceite interplataforma", - "separar plataforma de origem, MCP e consumidor final", - severity=DecisionFindingSeverity.WARNING, - ) - ) - if _contains_secret(envelope): - findings.append( - _finding( - DecisionId.MCP_ACCEPTANCE, - "payload", - "envelope contem formato de segredo bruto", - "substituir por credentialRef, tokenRef ou secretRef antes de persistir evidencia", - ) - ) - return tuple(findings) - - -def docs_sufficiency_result( - *, - found_sources: int, - partial_sources: int = 0, - conflicts: int = 0, - authority_present: bool = True, -) -> DocsSufficiencyResult: - """Classify a Docs answer as found, partial, not found, or conflicting.""" - - seed = { - "found": found_sources, - "partial": partial_sources, - "conflicts": conflicts, - "authority": authority_present, + "policy_version": POLICY_VERSION, + "decisions": len(items), + "profiles": profile_ids, + "profile_count": len(profile_ids), + "expected_mcp_evidence_fields": sorted({field for decision in items for field in decision.expected_mcp_evidence}), + "blockers": sorted({blocker for decision in items for blocker in decision.blockers}), + "ready_criteria_count": sum(len(decision.ready_criteria) for decision in items), } - source_hash = stable_digest(seed, 64) - if conflicts > 0: - state = DocsSufficiencyState.CONFLICTING - reason = "fontes documentais divergem" - next_action = "abrir pendencia documental com fontes conflitantes e owner" - elif found_sources > 0 and authority_present: - state = DocsSufficiencyState.FOUND - reason = "fonte suficiente com autoridade e hash" - next_action = "responder com fonte, versao, hash, data, plataforma, tema e autoridade" - elif found_sources > 0 or partial_sources > 0: - state = DocsSufficiencyState.PARTIAL - reason = "fonte existe, mas autoridade ou cobertura ainda e parcial" - next_action = "responder como parcial e abrir complemento documental" - else: - state = DocsSufficiencyState.NOT_FOUND - reason = "nenhuma fonte institucional encontrada" - next_action = "abrir pendencia automatica para criar ou atualizar documentacao" - return DocsSufficiencyResult( - state=state, - answer_allowed=state in {DocsSufficiencyState.FOUND, DocsSufficiencyState.PARTIAL}, - pending_required=state != DocsSufficiencyState.FOUND, - reason=reason, - required_next_action=next_action, - source_hash=source_hash, - evidence_id=f"evidence-{source_hash[:24]}", - ) -def classify_retention_subject( - subject: str, - *, - is_unique_evidence: bool = False, - legal_hold: bool = False, - live_sensitive: bool = False, -) -> RetentionDecision: - """Classify retention for institutional records and short-lived artifacts.""" - - lowered = subject.lower() - required_evidence = ("sourceHash", "evidenceId", "traceId", "auditId", "timestamp") - if legal_hold: - return RetentionDecision( - subject=subject, - decision=RetentionDecisionKind.LEGAL_HOLD, - retention_years=10, - legal_hold=True, - disposal_allowed=False, - rollback_required=True, - reason="legal hold bloqueia descarte mesmo apos prazo operacional", - required_evidence=required_evidence + ("legalHoldId",), - ) - if live_sensitive: - return RetentionDecision( - subject=subject, - decision=RetentionDecisionKind.LIVE_SENSITIVE_RETAINED, - retention_years=10, - legal_hold=False, - disposal_allowed=False, - rollback_required=True, - reason="dado vivo sensivel nao pode ser expurgado pela politica atual", - required_evidence=required_evidence + ("sensitivityClass",), - ) - if is_unique_evidence or any(term in lowered for term in TEN_YEAR_SUBJECTS): - return RetentionDecision( - subject=subject, - decision=RetentionDecisionKind.TEN_YEAR_EVIDENCE, - retention_years=10, - legal_hold=False, - disposal_allowed=False, - rollback_required=True, - reason="prova institucional entra na retencao padrao de 10 anos", - required_evidence=required_evidence + ("retentionYears",), - ) - if any(term in lowered for term in SHORT_LIVED_SUBJECTS): - return RetentionDecision( - subject=subject, - decision=RetentionDecisionKind.SHORT_LIVED_OPERATIONAL, - retention_years=0, - legal_hold=False, - disposal_allowed=True, - rollback_required=False, - reason="artefato operacional reinstalavel ou temporario; nao e evidencia unica", - required_evidence=("classification", "cleanupLog"), - ) - return RetentionDecision( - subject=subject, - decision=RetentionDecisionKind.TEN_YEAR_EVIDENCE, - retention_years=10, - legal_hold=False, - disposal_allowed=False, - rollback_required=True, - reason="classificacao desconhecida fica retida ate avaliacao de owner", - required_evidence=required_evidence + ("ownerReview",), - ) +def build_human_decision_matrix() -> dict[str, object]: + decisions = list_institutional_decisions() + return { + "ok": True, + "policy_version": POLICY_VERSION, + "generated_at": "2026-05-04T00:00:00.000Z", + "summary": summarize_decisions(decisions), + "decisions": [asdict(decision) for decision in decisions], + "platform_aliases": [asdict(alias) for alias in PLATFORM_ALIASES], + "human_boundary": ( + "Mais Humana traduz impacto e pergunta humana; Docs/MCP continuam donos da fonte " + "e a UI continua renderizadora same-source." + ), + } -def classify_cleanup_target( - target: str, - *, - contains_live_sensitive: bool = False, - unique_evidence: bool = False, -) -> CleanupDecision: - """Classify cleanup vocabulary and forbid purge for live sensitive data.""" - - lowered = target.lower() - if contains_live_sensitive or any(term in lowered for term in SENSITIVE_LIVE_TERMS): - return CleanupDecision( - target=target, - decision=CleanupDecisionKind.PURGE_FORBIDDEN, - term_to_use="retencao governada", - allowed=False, - requires_inventory=True, - reason="expurgo de dado vivo sensivel esta vetado", - next_action="preservar evidencia, classificar sensibilidade e abrir ordem de governanca", - ) - if unique_evidence: - return CleanupDecision( - target=target, - decision=CleanupDecisionKind.RETAIN_WITH_EVIDENCE, - term_to_use="arquivamento de evidencia antiga", - allowed=False, - requires_inventory=True, - reason="artefato e evidencia unica e nao deve ser tratado como limpeza", - next_action="reter por 10 anos ou aplicar legal hold", - ) - if any(term in lowered for term in OPERATIONAL_CLEANUP_TERMS): - return CleanupDecision( - target=target, - decision=CleanupDecisionKind.OPERATIONAL_CLEANUP, - term_to_use="limpeza operacional", - allowed=True, - requires_inventory=False, - reason="artefato temporario, cache, build ou dependencia reinstalavel", - next_action="registrar limpeza e preservar logs de erro quando houver", - ) - return CleanupDecision( - target=target, - decision=CleanupDecisionKind.OWNER_REVIEW, - term_to_use="inventario de peso", - allowed=False, - requires_inventory=True, - reason="classificacao nao determinada com seguranca", - next_action="inventariar peso, separar evidencia e decidir rematerializacao limpa", - ) - - -def _platform_to_canonical(identifier: str) -> str: - text = identifier.strip().replace("\\", "/").rstrip("/") - name = text.split("/")[-1] - if name.endswith(".git"): - name = name[:-4] - if name in BIBLIOTECA_LEGACY_IDS: - return BIBLIOTECA_OWNER_PLATFORM_ID - if name.endswith("-plataform"): - return name[: -len("-plataform")] + "-platform" - if not name.endswith("-platform") and name.startswith("tudo-para-ia-"): - return f"{name}-platform" - return name - - -def classify_platform_identifier(identifier: str) -> PlatformNameDecision: - """Validate platform suffixes and Biblioteca Privada ownership.""" - - canonical = _platform_to_canonical(identifier) - name = identifier.strip().replace("\\", "/").rstrip("/").split("/")[-1] - if name in BIBLIOTECA_LEGACY_IDS: - return PlatformNameDecision( - identifier=identifier, - canonical_identifier=canonical, - accepted=True, - platform_autonomous=False, - owner_platform_id=BIBLIOTECA_OWNER_PLATFORM_ID, - reason="Biblioteca Privada e app/produto/modulo dentro de Integracoes, nao plataforma autonoma", - required_action="remover ownerPlatformId proprio e apontar para Integracoes", - ) - if canonical != name: - return PlatformNameDecision( - identifier=identifier, - canonical_identifier=canonical, - accepted=True, - platform_autonomous=True, - owner_platform_id=canonical, - reason="identificador aceito como alias legado ate migracao coordenada", - required_action="usar canonical_identifier em novos contratos e manter alias rastreavel", - ) - return PlatformNameDecision( - identifier=identifier, - canonical_identifier=canonical, - accepted=canonical.endswith("-platform"), - platform_autonomous=canonical.endswith("-platform"), - owner_platform_id=canonical, - reason="identificador ja segue sufixo canonico platform" if canonical.endswith("-platform") else "identificador nao e plataforma canonica", - required_action="nenhuma" if canonical.endswith("-platform") else "registrar como app/modulo ou corrigir sufixo", - ) - - -def classify_development_action( - action: str, - *, - destructive: bool = False, - exposes_secret: bool = False, - external_effect: bool = False, - production: bool = False, - rollback_available: bool = True, - dry_run: bool = True, -) -> DevelopmentActionClassification: - """Classify GPT/Codex execution in development.""" - - lowered = action.lower() - required_evidence = ("truthState", "actorId", "auditId", "traceId", "timestamp") - if exposes_secret or any(pattern.search(action) for _, pattern in SECRET_PATTERNS): - return DevelopmentActionClassification( +def classify_development_action_for_humans(action: str, environment: str = "development") -> DevelopmentActionAssessment: + normalized = action.casefold() + secret_words = ("secret", "token", "cookie", "auth state", "segredo", "qr bruto") + external_words = ("send real message", "charge customer", "production", "delete database", "drop table", "apagar banco") + if any(word in normalized for word in secret_words): + return DevelopmentActionAssessment( action=action, - decision=DevelopmentActionDecision.BLOCKED, + environment=environment, allowed=False, - requires_approval=True, - required_truth_state="blocked", - dry_run_required=True, - rollback_required=True, - reason="acao exporia segredo bruto", - required_evidence=required_evidence + ("redactionFinding",), + decision="blocked_secret_exposure", + required_controls=("redaction", "credentialRef", "auditId"), + human_explanation="Valor de segredo nunca deve aparecer para suporte, GPT, Codex, relatorio ou SQL.", ) - if destructive or any(term in lowered for term in DESTRUCTIVE_ACTION_TERMS): - return DevelopmentActionClassification( + if environment != "development" or any(word in normalized for word in external_words): + return DevelopmentActionAssessment( action=action, - decision=DevelopmentActionDecision.BLOCKED, + environment=environment, allowed=False, - requires_approval=True, - required_truth_state="blocked", - dry_run_required=True, - rollback_required=True, - reason="acao destrutiva ou irreversivel sem autorizacao explicita", - required_evidence=required_evidence + ("approvalId", "rollbackPlan"), + decision="explicit_authorization_required", + required_controls=("authorization", "rollbackPlan", "beforeAfterEvidence", "auditId"), + human_explanation="Efeito externo real, producao ou acao destrutiva exige autorizacao explicita.", ) - if production or external_effect: - return DevelopmentActionClassification( - action=action, - decision=DevelopmentActionDecision.APPROVAL_REQUIRED, - allowed=False, - requires_approval=True, - required_truth_state="approval_required", - dry_run_required=True, - rollback_required=True, - reason="efeito externo real ou producao exige autorizacao explicita", - required_evidence=required_evidence + ("approvalId", "externalEffectClass"), - ) - if dry_run or any(term in lowered for term in DEV_ALLOWED_ACTION_TERMS): - return DevelopmentActionClassification( - action=action, - decision=DevelopmentActionDecision.ALLOWED_DRY_RUN if dry_run else DevelopmentActionDecision.ALLOWED, - allowed=True, - requires_approval=False, - required_truth_state="simulated_or_test", - dry_run_required=dry_run, - rollback_required=not rollback_available, - reason="execucao de desenvolvimento reversivel, simulada ou auditavel", - required_evidence=required_evidence + ("dryRun", "executionMode"), - ) - return DevelopmentActionClassification( + return DevelopmentActionAssessment( action=action, - decision=DevelopmentActionDecision.ALLOWED, + environment=environment, allowed=True, - requires_approval=False, - required_truth_state="test_or_partial", - dry_run_required=True, - rollback_required=not rollback_available, - reason="acao permitida em desenvolvimento desde que auditavel e sem efeito externo", - required_evidence=required_evidence + ("executionMode",), + decision="development_simulation_allowed", + required_controls=("truthState", "actorId", "auditId", "dryRunWhenApplicable"), + human_explanation="Simulacao auditavel em desenvolvimento pode ser executada para avancar a rodada.", ) - - -def validate_registry(registry: InstitutionalDecisionRegistry) -> tuple[DecisionFinding, ...]: - """Validate consistency of the decision registry.""" - - findings: list[DecisionFinding] = [] - decision_ids = {decision.decision_id for decision in registry.decisions} - for decision_id in DecisionId: - if decision_id not in decision_ids: - findings.append( - _finding( - decision_id, - "decisions", - "decisao institucional ausente do registro", - "adicionar decisao antes de considerar rodada pronta", - ) - ) - docs = next((decision for decision in registry.decisions if decision.decision_id == DecisionId.DOCS_FULL_KNOWLEDGE), None) - if docs and any("catalogOnly permanente" in item for item in docs.forbidden_actions) is False: - findings.append( - _finding( - DecisionId.DOCS_FULL_KNOWLEDGE, - "forbidden_actions", - "Docs pleno nao bloqueia explicitamente catalogOnly permanente", - "registrar proibicao de catalogOnly permanente", - ) - ) - mcp_cases = [case for case in registry.acceptance_cases if case.decision_id == DecisionId.MCP_ACCEPTANCE] - if mcp_cases and not all("traceId" in case.required_fields and "auditId" in case.required_fields for case in mcp_cases): - findings.append( - _finding( - DecisionId.MCP_ACCEPTANCE, - "acceptance_cases", - "caso MCP sem traceId/auditId", - "regenerar acceptance cases com campos de transito completos", - ) - ) - return tuple(findings) - - -def build_institutional_decision_registry(*, use_generated: bool = True) -> InstitutionalDecisionRegistry: - """Build the full registry and validate it.""" - - decisions = build_institutional_decisions() - cases = build_acceptance_cases(use_generated=use_generated) - seed = { - "decisions": [decision.decision_id.value for decision in decisions], - "cases": len(cases), - "source": CANONICAL_DECISION_SOURCE, - } - provisional = InstitutionalDecisionRegistry( - registry_id=f"institutional-decisions-{stable_digest(seed, 16)}", - generated_at=utc_now(), - source_order=CANONICAL_DECISION_SOURCE, - control_plane_id=CONTROL_PLANE_ID, - decisions=decisions, - acceptance_cases=cases, - findings=(), - summary=(), - ) - findings = validate_registry(provisional) - summary = ( - f"Decisoes institucionalizadas: {len(decisions)}", - f"Casos de aceite: {len(cases)}", - f"Findings bloqueantes: {sum(1 for item in findings if item.severity == DecisionFindingSeverity.BLOCKER)}", - "Docs pleno exige responseReady e pendencia automatica quando nao houver resposta.", - "Aceite administrativo interplataforma passa por MCP com traceId e auditId.", - "Retencao padrao de provas institucionais e de 10 anos.", - "Expurgo de dado vivo sensivel esta vetado; limpeza operacional usa vocabulario proprio.", - "Biblioteca Privada pertence a Integracoes como app/produto/modulo.", - "GPT/Codex executam amplamente em desenvolvimento sem segredo bruto nem efeito externo real.", - ) - return InstitutionalDecisionRegistry( - registry_id=provisional.registry_id, - generated_at=provisional.generated_at, - source_order=provisional.source_order, - control_plane_id=provisional.control_plane_id, - decisions=decisions, - acceptance_cases=cases, - findings=findings, - summary=summary, - ) - - -def registry_compact_payload(registry: InstitutionalDecisionRegistry, *, limit_cases: int = 120) -> dict[str, Any]: - """Return a compact JSON payload.""" - - return { - "registryId": registry.registry_id, - "generatedAt": registry.generated_at, - "sourceOrder": registry.source_order, - "controlPlaneId": registry.control_plane_id, - "decisionsCount": registry.decisions_count, - "casesCount": registry.cases_count, - "blockerCount": registry.blocker_count, - "passed": registry.passed, - "summary": list(registry.summary), - "decisions": [decision.to_dict() for decision in registry.decisions], - "casesSample": [case.to_dict() for case in registry.acceptance_cases[: max(0, limit_cases)]], - "findings": [finding.to_dict() for finding in registry.findings], - } - - -def decision_case_rows(cases: Sequence[DecisionAcceptanceCase]) -> list[list[str]]: - rows = [ - [ - "case_id", - "decision_id", - "platform_id", - "actor", - "operation", - "status", - "expected_truth_state", - "mcp_required", - "retention_years", - "legal_hold_blocks_disposal", - "dry_run_required", - "rollback_required", - "direct_bypass_blocked", - "source_hash", - "evidence_id", - "next_action", - ] - ] - for case in cases: - rows.append( - [ - case.case_id, - case.decision_id.value, - case.platform_id, - case.actor.value, - case.operation, - case.status.value, - case.expected_truth_state, - "yes" if case.mcp_required else "no", - str(case.retention_years), - "yes" if case.legal_hold_blocks_disposal else "no", - "yes" if case.dry_run_required else "no", - "yes" if case.rollback_required else "no", - "yes" if case.direct_bypass_blocked else "no", - case.source_hash, - case.evidence_id, - case.next_action, - ] - ) - return rows - - -def rows_to_csv(rows: Sequence[Sequence[str]]) -> str: - buffer = io.StringIO() - writer = csv.writer(buffer, lineterminator="\n") - writer.writerows(rows) - return buffer.getvalue() - - -def registry_markdown(registry: InstitutionalDecisionRegistry, *, limit_cases: int = 80) -> str: - """Render the registry as a human audit document.""" - - lines = [ - "# Registro executavel das decisoes institucionais", - "", - f"- registry_id: `{registry.registry_id}`", - f"- generated_at: `{registry.generated_at}`", - f"- source_order: `{registry.source_order}`", - f"- control_plane: `{registry.control_plane_id}`", - f"- decisions: `{registry.decisions_count}`", - f"- acceptance_cases: `{registry.cases_count}`", - f"- passed: `{registry.passed}`", - "", - "## Sumario", - "", - ] - lines.extend(f"- {item}" for item in registry.summary) - lines.extend(["", "## Decisoes", ""]) - for decision in registry.decisions: - lines.extend( - [ - f"### {decision.title}", - "", - f"- id: `{decision.decision_id.value}`", - f"- owner: `{decision.owner_platform_id}`", - f"- status: `{decision.status.value}`", - f"- diretriz: {decision.directive}", - f"- controles: {', '.join(decision.required_controls)}", - f"- proibicoes: {', '.join(decision.forbidden_actions)}", - f"- criterio pronto: {', '.join(decision.ready_criteria)}", - "", - ] - ) - lines.extend(["## Casos de aceite amostrais", ""]) - for case in registry.acceptance_cases[: max(0, limit_cases)]: - lines.append( - f"- `{case.case_id}` `{case.decision_id.value}` `{case.platform_id}` " - f"`{case.actor.value}` `{case.operation}` -> `{case.status.value}` evidence `{case.evidence_id}`" - ) - if len(registry.acceptance_cases) > limit_cases: - lines.append(f"- ... {len(registry.acceptance_cases) - limit_cases} casos adicionais no CSV/JSON.") - lines.extend(["", "## Findings", ""]) - if registry.findings: - for finding in registry.findings: - lines.append( - f"- `{finding.severity.value}` `{finding.decision_id}` `{finding.field}`: " - f"{redact_sensitive_text(finding.message)} Proxima acao: {finding.next_action}" - ) - else: - lines.append("- Nenhum finding bloqueante.") - return "\n".join(lines).strip() + "\n" - - -def generated_files(project_root: Path, central_platform_folder: Path | None = None) -> tuple[GeneratedFile, ...]: - relation = "000-ROTEADOR-PERMANENTE-DE-ORDEM_DE_SERVICO" - specs = ( - ("dados/institutional-decisions-registry.json", "Registro JSON das seis decisoes institucionais.", "institutional decisions registry", "json"), - ("dados/institutional-decisions-compact.json", "Registro compacto das decisoes para GPT/UI/MCP.", "institutional decisions compact", "json"), - ("matrizes/institutional-decision-acceptance-cases.csv", "Matriz de casos de aceite das decisoes.", "institutional decisions acceptance matrix", "csv"), - ("ecossistema/INSTITUTIONAL-DECISIONS.md", "Relatorio humano das decisoes institucionais.", "institutional decisions report", "markdown"), - ) - records = [ - GeneratedFile( - path=str(project_root / relative), - description=description, - function=function, - file_type=file_type, - changed_by="mais_humana.institutional_decisions", - change_summary="Criado ou atualizado registro executavel das seis decisoes institucionais.", - relation_to_order=relation, - ) - for relative, description, function, file_type in specs - ] - if central_platform_folder is not None: - records.append( - GeneratedFile( - path=str(central_platform_folder / "reports" / "EXECUTADO__institutional-decisions.md"), - description="Copia central do registro das decisoes institucionais.", - function="institutional decisions central report", - file_type="markdown", - changed_by="mais_humana.institutional_decisions", - change_summary="Registradas decisoes institucionais no dossie central.", - relation_to_order=relation, - ) - ) - return tuple(records) - - -def write_institutional_decision_artifacts( - registry: InstitutionalDecisionRegistry, - project_root: Path, - *, - central_platform_folder: Path | None = None, -) -> tuple[GeneratedFile, ...]: - """Write JSON, CSV, Markdown artifacts and update local semantic SQLite.""" - - targets: list[tuple[Path, str]] = [ - (project_root / "dados" / "institutional-decisions-registry.json", json.dumps(registry.to_dict(), ensure_ascii=False, indent=2, sort_keys=True)), - (project_root / "dados" / "institutional-decisions-compact.json", json.dumps(registry_compact_payload(registry), ensure_ascii=False, indent=2, sort_keys=True)), - (project_root / "matrizes" / "institutional-decision-acceptance-cases.csv", rows_to_csv(decision_case_rows(registry.acceptance_cases))), - (project_root / "ecossistema" / "INSTITUTIONAL-DECISIONS.md", registry_markdown(registry)), - ] - records = list(generated_files(project_root, None)) - if central_platform_folder is not None: - central_target = central_platform_folder / "reports" / "EXECUTADO__institutional-decisions.md" - targets.append((central_target, registry_markdown(registry))) - records.append( - GeneratedFile( - path=str(central_target), - description="Copia central do registro das decisoes institucionais.", - function="institutional decisions central report", - file_type="markdown", - changed_by="mais_humana.institutional_decisions", - change_summary="Registradas decisoes institucionais no dossie central.", - relation_to_order="000-ROTEADOR-PERMANENTE-DE-ORDEM_DE_SERVICO", - ) - ) - for path, text in targets: - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(text, encoding="utf-8") - with connect(project_root / "controle-semantico.sqlite") as conn: - upsert_files(conn, records) - conn.commit() - return tuple(records) - - -def run_institutional_decisions( - *, - project_root: Path, - central_platform_folder: Path | None = None, - use_generated: bool = True, -) -> tuple[InstitutionalDecisionRegistry, tuple[GeneratedFile, ...]]: - registry = build_institutional_decision_registry(use_generated=use_generated) - records = write_institutional_decision_artifacts(registry, project_root, central_platform_folder=central_platform_folder) - return registry, records - diff --git a/tests/test_institutional_decisions.py b/tests/test_institutional_decisions.py new file mode 100644 index 0000000..e87071a --- /dev/null +++ b/tests/test_institutional_decisions.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import unittest + +from mais_humana.institutional_decisions import ( + PLATFORM_ALIASES, + build_human_decision_matrix, + classify_development_action_for_humans, + list_institutional_decisions, + summarize_decisions, +) + + +class InstitutionalDecisionHumanTests(unittest.TestCase): + def test_six_decisions_are_available_for_human_translation(self) -> None: + decisions = list_institutional_decisions() + self.assertEqual(len(decisions), 6) + self.assertEqual( + [decision.decision_id for decision in decisions], + [ + "docs_full_operational_platform", + "mcp_required_cross_platform_acceptance", + "ten_year_institutional_retention", + "live_sensitive_data_purge_forbidden", + "canonical_platform_names", + "development_execution_for_gpt_codex", + ], + ) + self.assertTrue(all(decision.affected_profiles for decision in decisions)) + self.assertTrue(all(decision.expected_mcp_evidence for decision in decisions)) + + def test_summary_exposes_profiles_evidence_and_blockers(self) -> None: + summary = summarize_decisions() + self.assertEqual(summary["decisions"], 6) + self.assertGreaterEqual(summary["profile_count"], 5) + self.assertIn("sourceRecordsHash", summary["expected_mcp_evidence_fields"]) + self.assertIn("segredo_exposto", summary["blockers"]) + self.assertGreaterEqual(summary["ready_criteria_count"], 18) + + def test_matrix_keeps_biblioteca_privada_outside_platforms(self) -> None: + matrix = build_human_decision_matrix() + self.assertTrue(matrix["ok"]) + self.assertEqual(matrix["summary"]["decisions"], 6) + aliases = matrix["platform_aliases"] + biblioteca = [alias for alias in aliases if alias["current_name"] == "tudo-para-ia-biblioteca-privada"][0] + self.assertFalse(biblioteca["is_platform"]) + self.assertEqual(biblioteca["owner_platform_id"], "tudo-para-ia-integracoes-platform") + self.assertIn("Mais Humana traduz impacto", matrix["human_boundary"]) + + def test_platform_aliases_document_legacy_platform_suffixes(self) -> None: + legacy = [alias for alias in PLATFORM_ALIASES if alias.alias_status == "legacy_alias"] + self.assertGreaterEqual(len(legacy), 3) + self.assertTrue(all(alias.canonical_name.endswith("-platform") for alias in legacy)) + + def test_development_action_assessment_is_broad_but_blocks_secret_and_external_effect(self) -> None: + allowed = classify_development_action_for_humans("simular usuario tenant compra e painel") + self.assertTrue(allowed.allowed) + self.assertEqual(allowed.decision, "development_simulation_allowed") + self.assertIn("truthState", allowed.required_controls) + + external = classify_development_action_for_humans("send real message to customer") + self.assertFalse(external.allowed) + self.assertEqual(external.decision, "explicit_authorization_required") + + secret = classify_development_action_for_humans("mostrar token secret") + self.assertFalse(secret.allowed) + self.assertEqual(secret.decision, "blocked_secret_exposure") + + +if __name__ == "__main__": + unittest.main()