diff --git a/src/mais_humana/institutional_assurance.py b/src/mais_humana/institutional_assurance.py new file mode 100644 index 0000000..9b5361c --- /dev/null +++ b/src/mais_humana/institutional_assurance.py @@ -0,0 +1,1100 @@ +"""Institutional assurance for the six router decisions. + +This module turns the router decisions into an executable assurance report. +It scans the real repositories and their central dossiers, checks whether the +expected institutional evidence is present, keeps secret-shaped values out of +generated artifacts, and writes compact material for GPT, UI, Markdown reports, +CSV matrices, and the local semantic SQLite. +""" + +from __future__ import annotations + +import csv +import hashlib +import io +import json +import re +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Any, Iterable, Mapping, Sequence + +from .institutional_decisions import INSTITUTIONAL_DECISIONS, POLICY_VERSION +from .models import GeneratedFile, as_plain_data, merge_unique, slugify, utc_now +from .redaction import RedactionFinding, redact_sensitive_text, scan_text_for_secrets +from .storage import connect, upsert_files + + +ASSURANCE_VERSION = "2026-05-04.institutional-assurance.v1" + +TEXT_SUFFIXES = { + ".md", + ".txt", + ".json", + ".jsonl", + ".csv", + ".ts", + ".tsx", + ".js", + ".jsx", + ".mjs", + ".cjs", + ".py", + ".java", + ".toml", + ".yaml", + ".yml", +} + +CODE_SUFFIXES = {".ts", ".tsx", ".js", ".jsx", ".mjs", ".cjs", ".py", ".java"} + +SKIP_PARTS = { + ".git", + ".venv", + ".test-tmp", + "__pycache__", + "node_modules", + "dist", + "build", + "coverage", + ".cache", + "tmp", + "temp", +} + +SKIP_FILE_NAMES = { + "controle-semantico.sqlite", + "controle-semantico.sqlite-wal", + "controle-semantico.sqlite-shm", +} + + +class AssuranceStatus(str, Enum): + """Readiness status for institutional checks.""" + + READY = "ready" + PARTIAL = "partial" + BLOCKED = "blocked" + + +class SourceKind(str, Enum): + """Where a signal came from.""" + + REAL_REPO = "real_repo" + CENTRAL_DOSSIER = "central_dossier" + UNKNOWN = "unknown" + + +@dataclass(frozen=True, slots=True) +class PlatformScope: + """One managed platform or repository involved in the router decision.""" + + platform_id: str + canonical_project_id: str + real_repo: Path + central_folder: Path + owner_role: str + decision_focus: tuple[str, ...] + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class AssuranceTerm: + """One text/evidence term used to prove an institutional decision.""" + + term_id: str + label: str + aliases: tuple[str, ...] + required: bool = True + evidence_field: str = "" + + def matches(self, text: str) -> bool: + lowered = text.casefold() + return any(alias.casefold() in lowered for alias in self.aliases) + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class DecisionRequirement: + """Terms and owners expected for one institutional decision.""" + + decision_id: str + title: str + owner_platform_id: str + terms: tuple[AssuranceTerm, ...] + required_evidence_fields: tuple[str, ...] + ready_threshold: float = 1.0 + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + @property + def required_term_ids(self) -> tuple[str, ...]: + return tuple(term.term_id for term in self.terms if term.required) + + +@dataclass(frozen=True, slots=True) +class FileSignal: + """One redacted signal found in a scanned file.""" + + signal_id: str + decision_id: str + term_id: str + platform_id: str + source_kind: SourceKind + path: str + line: int + snippet: str + digest: str + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class ScopeScan: + """Scanning metrics for one platform scope.""" + + platform_id: str + canonical_project_id: str + real_repo: str + central_folder: str + real_repo_exists: bool + central_folder_exists: bool + files_scanned: int + code_lines_analyzed: int + central_active_executive_orders: int + central_active_managerial_orders: int + signals: tuple[FileSignal, ...] + redaction_findings: tuple[RedactionFinding, ...] + warnings: tuple[str, ...] + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + @property + def active_orders_count(self) -> int: + return self.central_active_executive_orders + self.central_active_managerial_orders + + +@dataclass(frozen=True, slots=True) +class DecisionAssurance: + """Aggregated evidence and status for one router decision.""" + + decision_id: str + title: str + owner_platform_id: str + status: AssuranceStatus + score: int + found_terms: tuple[str, ...] + missing_terms: tuple[str, ...] + found_evidence_fields: tuple[str, ...] + missing_evidence_fields: tuple[str, ...] + signal_count: int + source_paths: tuple[str, ...] + affected_platforms: tuple[str, ...] + next_actions: tuple[str, ...] + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class PlatformAssurance: + """Compact platform coverage across the six decisions.""" + + platform_id: str + canonical_project_id: str + status: AssuranceStatus + decisions_with_signal: tuple[str, ...] + missing_decisions: tuple[str, ...] + files_scanned: int + code_lines_analyzed: int + active_orders_count: int + redaction_findings: int + warnings: tuple[str, ...] + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +@dataclass(frozen=True, slots=True) +class InstitutionalAssuranceReport: + """Full assurance report for the round.""" + + report_id: str + generated_at: str + policy_version: str + assurance_version: str + sandbox_mode: str + plugin_auth_attempt: str + ok: bool + summary: Mapping[str, Any] + decision_checks: tuple[DecisionAssurance, ...] + platform_checks: tuple[PlatformAssurance, ...] + scans: tuple[ScopeScan, ...] + redaction_findings: tuple[RedactionFinding, ...] + pending_items: tuple[str, ...] + source_hash: str + + def to_dict(self) -> dict[str, Any]: + return as_plain_data(self) + + +def _term(term_id: str, label: str, aliases: Iterable[str], *, evidence_field: str = "", required: bool = True) -> AssuranceTerm: + return AssuranceTerm( + term_id=term_id, + label=label, + aliases=tuple(aliases), + required=required, + evidence_field=evidence_field or term_id, + ) + + +def decision_requirements() -> tuple[DecisionRequirement, ...]: + """Return executable term requirements for the six institutional decisions.""" + + titles = {decision.decision_id: decision.title for decision in INSTITUTIONAL_DECISIONS} + owners = {decision.decision_id: decision.operational_owner for decision in INSTITUTIONAL_DECISIONS} + evidence = {decision.decision_id: decision.expected_mcp_evidence for decision in INSTITUTIONAL_DECISIONS} + return ( + DecisionRequirement( + decision_id="docs_full_operational_platform", + title=titles["docs_full_operational_platform"], + owner_platform_id=owners["docs_full_operational_platform"], + required_evidence_fields=evidence["docs_full_operational_platform"], + terms=( + _term("document_id", "documentId", ("documentId", "document_id", "documento"), evidence_field="documentId"), + _term("source_hash", "sourceHash", ("sourceHash", "source_hash", "sourcehash"), evidence_field="sourceHash"), + _term("authority", "authority", ("authority", "autoridade", "fonte institucional"), evidence_field="authority"), + _term("truth_state", "truthState", ("truthState", "truth_state", "found/partial/not_found/conflicting"), evidence_field="truthState"), + _term("missing_topics", "missingTopics", ("missingTopics", "missing_topics", "lacuna documental", "pendencia automatica"), evidence_field="missingTopics"), + _term("response_ready", "responseReady", ("responseReady", "response_ready", "response ready"), evidence_field="responseReady"), + _term("semantic_search", "semantic search", ("busca semantica", "semantic search", "semantic index", "sqlite semantico"), required=False), + ), + ), + DecisionRequirement( + decision_id="mcp_required_cross_platform_acceptance", + title=titles["mcp_required_cross_platform_acceptance"], + owner_platform_id=owners["mcp_required_cross_platform_acceptance"], + required_evidence_fields=evidence["mcp_required_cross_platform_acceptance"], + terms=( + _term("origin_platform", "originPlatformId", ("originPlatformId", "origin_platform_id", "origin", "sourcePlatformId"), evidence_field="originPlatformId"), + _term("destination_platform", "destinationPlatformId", ("destinationPlatformId", "destination_platform_id", "destination", "targetPlatformId"), evidence_field="destinationPlatformId"), + _term("contract_hash", "contractHash", ("contractHash", "contract_hash", "sourceRecordsHash", "contractSourceHash"), evidence_field="contractHash"), + _term("source_records_hash", "sourceRecordsHash", ("sourceRecordsHash", "source_records_hash", "sourcePayloadHash"), evidence_field="sourceRecordsHash"), + _term("permission_id", "permissionId", ("permissionId", "permission_id", "requiredPermission", "permission"), evidence_field="permissionId"), + _term("evidence_id", "evidenceId", ("evidenceId", "evidence_id", "evidence-"), evidence_field="evidenceId"), + _term("trace_audit", "traceId/auditId", ("traceId", "auditId", "audit_id", "trace_id"), required=False), + _term("mcp_route", "platform -> MCP -> platform", ("platform -> mcp -> platform", "plataforma -> mcp -> plataforma", "control_plane", "mcp_control_plane"), required=False), + ), + ), + DecisionRequirement( + decision_id="ten_year_institutional_retention", + title=titles["ten_year_institutional_retention"], + owner_platform_id=owners["ten_year_institutional_retention"], + required_evidence_fields=evidence["ten_year_institutional_retention"], + terms=( + _term("retention_class", "retentionClass", ("retentionClass", "retention_class", "retencao institucional", "institutional_10y"), evidence_field="retentionClass"), + _term("retention_years", "retentionYears", ("retentionYears", "retention_years", "10 anos", "ten_year"), evidence_field="retentionYears"), + _term("legal_hold", "legalHold", ("legalHold", "legal_hold", "legal hold"), evidence_field="legalHold"), + _term("rollback_plan", "rollbackPlan", ("rollbackPlan", "rollback_plan", "rollback"), evidence_field="rollbackPlan"), + _term("before_after", "beforeAfterEvidence", ("beforeAfterEvidence", "before_after", "antes/depois", "before/after"), evidence_field="beforeAfterEvidence"), + ), + ), + DecisionRequirement( + decision_id="live_sensitive_data_purge_forbidden", + title=titles["live_sensitive_data_purge_forbidden"], + owner_platform_id=owners["live_sensitive_data_purge_forbidden"], + required_evidence_fields=evidence["live_sensitive_data_purge_forbidden"], + terms=( + _term("cleanup_classification", "cleanupClassification", ("cleanupClassification", "cleanup_classification", "operational_cleanup", "limpeza operacional"), evidence_field="cleanupClassification"), + _term("allowed_terms", "allowedTerms", ("allowedTerms", "allowed_terms", "cache", "build", "dist", "coverage"), evidence_field="allowedTerms"), + _term("forbidden_terms", "forbiddenTerms", ("forbiddenTerms", "forbidden_terms", "blocked_live_sensitive_purge", "dado vivo sensivel"), evidence_field="forbiddenTerms"), + _term("sensitive_data_check", "sensitiveDataCheck", ("sensitiveDataCheck", "sensitive_data_check", "live_sensitive", "dado vivo"), evidence_field="sensitiveDataCheck"), + _term("weight_inventory", "inventario de peso", ("inventario de peso", "workspace hygiene", "higiene de workspace"), required=False), + ), + ), + DecisionRequirement( + decision_id="canonical_platform_names", + title=titles["canonical_platform_names"], + owner_platform_id=owners["canonical_platform_names"], + required_evidence_fields=evidence["canonical_platform_names"], + terms=( + _term("canonical_name", "canonicalName", ("canonicalName", "canonical_name", "canonical_project_id", "-platform"), evidence_field="canonicalName"), + _term("alias_status", "aliasStatus", ("aliasStatus", "alias_status", "legacy_alias", "compatibility_alias"), evidence_field="aliasStatus"), + _term("owner_platform", "ownerPlatformId", ("ownerPlatformId", "owner_platform_id", "owner_platform"), evidence_field="ownerPlatformId"), + _term("provider_id", "providerId", ("providerId", "provider_id", "provider"), evidence_field="providerId"), + _term("biblioteca_module", "Biblioteca Privada como modulo", ("biblioteca privada", "module_not_platform", "modulo de integracoes", "is_platform=false"), required=False), + ), + ), + DecisionRequirement( + decision_id="development_execution_for_gpt_codex", + title=titles["development_execution_for_gpt_codex"], + owner_platform_id=owners["development_execution_for_gpt_codex"], + required_evidence_fields=evidence["development_execution_for_gpt_codex"], + terms=( + _term("truth_state", "truthState", ("truthState", "truth_state", "simulated", "test"), evidence_field="truthState"), + _term("actor_id", "actorId", ("actorId", "actor_id", "requestedBy"), evidence_field="actorId"), + _term("audit_id", "auditId", ("auditId", "audit_id"), evidence_field="auditId"), + _term("dry_run", "dryRun", ("dryRun", "dry_run", "dry-run"), evidence_field="dryRun"), + _term("environment", "environment", ("environment", "development", "ambiente de desenvolvimento"), evidence_field="environment"), + _term("external_block", "external authorization", ("explicit_authorization_required", "efeito externo", "authorization_required"), required=False), + ), + ), + ) + + +def default_platform_scopes( + ecosystem_root: Path, + central_projects_root: Path, +) -> tuple[PlatformScope, ...]: + """Return the platform scopes affected by the router decision.""" + + return ( + PlatformScope( + platform_id="nucleo-gestao-operacional", + canonical_project_id="nucleo-gestao-operacional", + real_repo=ecosystem_root / "nucleo-gestao-operacional", + central_folder=central_projects_root / "_repo_nucleo-gestao-operacional", + owner_role="governance", + decision_focus=("ten_year_institutional_retention", "live_sensitive_data_purge_forbidden", "canonical_platform_names", "development_execution_for_gpt_codex"), + ), + PlatformScope( + platform_id="tudo-para-ia-docs-plataform", + canonical_project_id="tudo-para-ia-docs-platform", + real_repo=ecosystem_root / "tudo-para-ia-docs-plataform", + central_folder=central_projects_root / "04_repo_tudo-para-ia-docs-plataform", + owner_role="docs_full_platform", + decision_focus=("docs_full_operational_platform", "ten_year_institutional_retention"), + ), + PlatformScope( + platform_id="tudo-para-ia-mcps-internos-plataform", + canonical_project_id="tudo-para-ia-mcps-internos-platform", + real_repo=ecosystem_root / "tudo-para-ia-mcps-internos-plataform", + central_folder=central_projects_root / "10_repo_tudo-para-ia-mcps-internos-plataform", + owner_role="mcp_control_plane", + decision_focus=("mcp_required_cross_platform_acceptance", "development_execution_for_gpt_codex"), + ), + PlatformScope( + platform_id="tudo-para-ia-ui-platform", + canonical_project_id="tudo-para-ia-ui-platform", + real_repo=ecosystem_root / "tudo-para-ia-ui-platform", + central_folder=central_projects_root / "14_repo_tudo-para-ia-ui-platform", + owner_role="same_source_renderer", + decision_focus=("docs_full_operational_platform", "mcp_required_cross_platform_acceptance"), + ), + PlatformScope( + platform_id="tudo-para-ia-integracoes-plataform", + canonical_project_id="tudo-para-ia-integracoes-platform", + real_repo=ecosystem_root / "tudo-para-ia-integracoes-plataform", + central_folder=central_projects_root / "08_repo_tudo-para-ia-integracoes-plataform", + owner_role="integration_owner", + decision_focus=("mcp_required_cross_platform_acceptance", "canonical_platform_names"), + ), + PlatformScope( + platform_id="tudo-para-ia-customer-ops-platform", + canonical_project_id="tudo-para-ia-customer-ops-platform", + real_repo=ecosystem_root / "tudo-para-ia-customer-ops-platform", + central_folder=central_projects_root / "03_repo_tudo-para-ia-customer-ops-platform", + owner_role="customer_operations", + decision_focus=("ten_year_institutional_retention", "development_execution_for_gpt_codex"), + ), + PlatformScope( + platform_id="tudo-para-ia-mais-humana", + canonical_project_id="tudo-para-ia-mais-humana-platform", + real_repo=ecosystem_root / "tudo-para-ia-mais-humana", + central_folder=central_projects_root / "15_repo_tudo-para-ia-mais-humana-platform", + owner_role="human_assurance", + decision_focus=tuple(decision.decision_id for decision in INSTITUTIONAL_DECISIONS), + ), + ) + + +def _stable_digest(value: object, length: int = 16) -> str: + encoded = json.dumps(as_plain_data(value), ensure_ascii=True, sort_keys=True).encode("utf-8") + return hashlib.sha256(encoded).hexdigest()[:length] + + +def _path_has_skipped_part(path: Path) -> bool: + return any(part in SKIP_PARTS for part in path.parts) + + +def _iter_candidate_files(root: Path, *, max_file_bytes: int = 1_000_000) -> Iterable[Path]: + if not root.exists(): + return + for path in root.rglob("*"): + if not path.is_file(): + continue + if _path_has_skipped_part(path): + continue + if path.name in SKIP_FILE_NAMES: + continue + if path.suffix.lower() not in TEXT_SUFFIXES: + continue + try: + if path.stat().st_size > max_file_bytes: + continue + except OSError: + continue + yield path + + +def _source_kind(path: Path, scope: PlatformScope) -> SourceKind: + try: + path.relative_to(scope.real_repo) + return SourceKind.REAL_REPO + except ValueError: + pass + try: + path.relative_to(scope.central_folder) + return SourceKind.CENTRAL_DOSSIER + except ValueError: + return SourceKind.UNKNOWN + + +def _safe_rel(path: Path, base: Path) -> str: + try: + return str(path.relative_to(base)).replace("\\", "/") + except ValueError: + return str(path).replace("\\", "/") + + +def _count_active_orders(folder: Path) -> int: + if not folder.exists(): + return 0 + return sum(1 for path in folder.glob("*.md") if path.is_file() and path.name != ".md") + + +def _scan_file_for_signals( + path: Path, + *, + scope: PlatformScope, + requirements: Sequence[DecisionRequirement], +) -> tuple[int, tuple[FileSignal, ...], tuple[RedactionFinding, ...], str]: + try: + text = path.read_text(encoding="utf-8", errors="ignore") + except OSError as exc: + return 0, (), (), f"{type(exc).__name__}: {exc}" + lines = text.splitlines() + code_lines = len(lines) if path.suffix.lower() in CODE_SUFFIXES else 0 + findings = scan_text_for_secrets(str(path), text) + signals: list[FileSignal] = [] + for line_no, line in enumerate(lines, start=1): + if len(line) > 600: + candidate = line[:600] + else: + candidate = line + for requirement in requirements: + for term in requirement.terms: + if not term.matches(candidate): + continue + seed = { + "decision": requirement.decision_id, + "term": term.term_id, + "platform": scope.platform_id, + "path": str(path), + "line": line_no, + } + signals.append( + FileSignal( + signal_id=f"assurance-{_stable_digest(seed, 20)}", + decision_id=requirement.decision_id, + term_id=term.term_id, + platform_id=scope.platform_id, + source_kind=_source_kind(path, scope), + path=str(path), + line=line_no, + snippet=redact_sensitive_text(candidate.strip())[:260], + digest=_stable_digest({"path": str(path), "line": line_no, "snippet": candidate}, 24), + ) + ) + return code_lines, tuple(signals), findings, "" + + +def scan_scope(scope: PlatformScope, requirements: Sequence[DecisionRequirement]) -> ScopeScan: + """Scan one platform scope for institutional evidence.""" + + signals: list[FileSignal] = [] + redaction_findings: list[RedactionFinding] = [] + warnings: list[str] = [] + files_scanned = 0 + code_lines = 0 + for root in (scope.real_repo, scope.central_folder): + if not root.exists(): + warnings.append(f"missing root: {root}") + continue + for path in _iter_candidate_files(root): + files_scanned += 1 + file_code_lines, file_signals, file_findings, error = _scan_file_for_signals( + path, + scope=scope, + requirements=requirements, + ) + code_lines += file_code_lines + signals.extend(file_signals) + redaction_findings.extend(file_findings) + if error: + warnings.append(f"{path}: {error}") + return ScopeScan( + platform_id=scope.platform_id, + canonical_project_id=scope.canonical_project_id, + real_repo=str(scope.real_repo), + central_folder=str(scope.central_folder), + real_repo_exists=scope.real_repo.exists(), + central_folder_exists=scope.central_folder.exists(), + files_scanned=files_scanned, + code_lines_analyzed=code_lines, + central_active_executive_orders=_count_active_orders(scope.central_folder / "orders" / "executivas"), + central_active_managerial_orders=_count_active_orders(scope.central_folder / "orders" / "gerenciais"), + signals=tuple(signals), + redaction_findings=tuple(redaction_findings), + warnings=merge_unique(warnings), + ) + + +def _status_for_score(score: int) -> AssuranceStatus: + if score >= 100: + return AssuranceStatus.READY + if score >= 40: + return AssuranceStatus.PARTIAL + return AssuranceStatus.BLOCKED + + +def _paths_for_signals(signals: Sequence[FileSignal], limit: int = 10) -> tuple[str, ...]: + paths = [] + for signal in signals: + paths.append(f"{signal.path}:{signal.line}") + return merge_unique(paths)[:limit] + + +def build_decision_assurance( + requirement: DecisionRequirement, + scans: Sequence[ScopeScan], +) -> DecisionAssurance: + """Aggregate all signals for one decision into a readiness check.""" + + decision_signals = tuple(signal for scan in scans for signal in scan.signals if signal.decision_id == requirement.decision_id) + found_terms = merge_unique(signal.term_id for signal in decision_signals) + found_term_set = set(found_terms) + required_terms = requirement.required_term_ids + missing_terms = tuple(term_id for term_id in required_terms if term_id not in found_term_set) + found_evidence_fields = merge_unique( + term.evidence_field + for term in requirement.terms + if term.evidence_field in requirement.required_evidence_fields and term.term_id in found_term_set + ) + missing_evidence_fields = tuple(field for field in requirement.required_evidence_fields if field not in set(found_evidence_fields)) + score = 100 if not required_terms else round(100 * (len(required_terms) - len(missing_terms)) / len(required_terms)) + next_actions: list[str] = [] + if missing_terms: + next_actions.append("materializar evidencia operacional para termos ausentes: " + ", ".join(missing_terms)) + if missing_evidence_fields: + next_actions.append("publicar campos de evidencia ausentes: " + ", ".join(missing_evidence_fields)) + if not decision_signals: + next_actions.append("criar artefato fonte ou teste que prove a decisao no repo owner") + return DecisionAssurance( + decision_id=requirement.decision_id, + title=requirement.title, + owner_platform_id=requirement.owner_platform_id, + status=_status_for_score(score), + score=score, + found_terms=found_terms, + missing_terms=missing_terms, + found_evidence_fields=found_evidence_fields, + missing_evidence_fields=missing_evidence_fields, + signal_count=len(decision_signals), + source_paths=_paths_for_signals(decision_signals), + affected_platforms=merge_unique(signal.platform_id for signal in decision_signals), + next_actions=merge_unique(next_actions), + ) + + +def build_platform_assurance( + scan: ScopeScan, + requirements: Sequence[DecisionRequirement], +) -> PlatformAssurance: + """Build compact coverage for one platform scope.""" + + decisions_with_signal = merge_unique(signal.decision_id for signal in scan.signals) + all_decision_ids = tuple(requirement.decision_id for requirement in requirements) + missing = tuple(decision_id for decision_id in all_decision_ids if decision_id not in set(decisions_with_signal)) + if not missing and scan.redaction_findings: + status = AssuranceStatus.PARTIAL + elif not missing: + status = AssuranceStatus.READY + elif decisions_with_signal: + status = AssuranceStatus.PARTIAL + else: + status = AssuranceStatus.BLOCKED + return PlatformAssurance( + platform_id=scan.platform_id, + canonical_project_id=scan.canonical_project_id, + status=status, + decisions_with_signal=decisions_with_signal, + missing_decisions=missing, + files_scanned=scan.files_scanned, + code_lines_analyzed=scan.code_lines_analyzed, + active_orders_count=scan.active_orders_count, + redaction_findings=len(scan.redaction_findings), + warnings=scan.warnings, + ) + + +def build_institutional_assurance_report( + scopes: Sequence[PlatformScope], + *, + sandbox_mode: str = "workspace-write", + plugin_auth_attempt: str = "", + requirements: Sequence[DecisionRequirement] | None = None, +) -> InstitutionalAssuranceReport: + """Scan scopes and build the full institutional assurance report.""" + + reqs = tuple(requirements or decision_requirements()) + scans = tuple(scan_scope(scope, reqs) for scope in scopes) + decisions = tuple(build_decision_assurance(requirement, scans) for requirement in reqs) + platforms = tuple(build_platform_assurance(scan, reqs) for scan in scans) + redaction_findings = tuple(finding for scan in scans for finding in scan.redaction_findings) + ready_decisions = sum(1 for decision in decisions if decision.status == AssuranceStatus.READY) + partial_decisions = sum(1 for decision in decisions if decision.status == AssuranceStatus.PARTIAL) + blocked_decisions = sum(1 for decision in decisions if decision.status == AssuranceStatus.BLOCKED) + total_code_lines = sum(scan.code_lines_analyzed for scan in scans) + active_orders = sum(scan.active_orders_count for scan in scans) + pending: list[str] = [] + if sandbox_mode != "danger-full-access": + pending.append("workspace-write: deploy Cloudflare e git push permanecem pendencia real da rodada") + if plugin_auth_attempt: + pending.append(f"cloudflare plugin auth attempt: {plugin_auth_attempt}") + if redaction_findings: + pending.append(f"redaction: {len(redaction_findings)} ocorrencias sensiveis detectadas em fontes ou dossies") + for decision in decisions: + if decision.status != AssuranceStatus.READY: + pending.extend(f"{decision.decision_id}: {action}" for action in decision.next_actions) + summary = { + "decisionsTotal": len(decisions), + "decisionsReady": ready_decisions, + "decisionsPartial": partial_decisions, + "decisionsBlocked": blocked_decisions, + "platformsTotal": len(platforms), + "platformsReady": sum(1 for platform in platforms if platform.status == AssuranceStatus.READY), + "platformsPartial": sum(1 for platform in platforms if platform.status == AssuranceStatus.PARTIAL), + "platformsBlocked": sum(1 for platform in platforms if platform.status == AssuranceStatus.BLOCKED), + "filesScanned": sum(scan.files_scanned for scan in scans), + "codeLinesAnalyzed": total_code_lines, + "activeOrdersObserved": active_orders, + "redactionFindings": len(redaction_findings), + "sandboxMode": sandbox_mode, + "pluginAuthAttempt": plugin_auth_attempt, + } + seed = { + "version": ASSURANCE_VERSION, + "decisions": [decision.to_dict() for decision in decisions], + "platforms": [platform.to_dict() for platform in platforms], + "summary": summary, + } + source_hash = "sha256:" + _stable_digest(seed, 64) + ok = blocked_decisions == 0 and not redaction_findings + return InstitutionalAssuranceReport( + report_id=f"institutional-assurance-{_stable_digest(seed, 16)}", + generated_at=utc_now(), + policy_version=POLICY_VERSION, + assurance_version=ASSURANCE_VERSION, + sandbox_mode=sandbox_mode, + plugin_auth_attempt=plugin_auth_attempt, + ok=ok, + summary=summary, + decision_checks=decisions, + platform_checks=platforms, + scans=scans, + redaction_findings=redaction_findings, + pending_items=merge_unique(pending), + source_hash=source_hash, + ) + + +def compact_assurance_payload(report: InstitutionalAssuranceReport, *, limit: int = 40) -> dict[str, Any]: + """Return compact JSON-safe payload for GPT/UI/MCP transport.""" + + return { + "reportId": report.report_id, + "generatedAt": report.generated_at, + "ok": report.ok, + "policyVersion": report.policy_version, + "assuranceVersion": report.assurance_version, + "sourceHash": report.source_hash, + "summary": dict(report.summary), + "decisions": [ + { + "decisionId": item.decision_id, + "status": item.status.value, + "score": item.score, + "missingEvidenceFields": list(item.missing_evidence_fields), + "signalCount": item.signal_count, + "nextActions": list(item.next_actions[:4]), + } + for item in report.decision_checks + ], + "platforms": [ + { + "platformId": item.platform_id, + "canonicalProjectId": item.canonical_project_id, + "status": item.status.value, + "decisionsWithSignal": list(item.decisions_with_signal), + "missingDecisions": list(item.missing_decisions), + "codeLinesAnalyzed": item.code_lines_analyzed, + "activeOrdersCount": item.active_orders_count, + } + for item in report.platform_checks + ], + "pendingItems": list(report.pending_items[:limit]), + "redactionFindings": [ + { + "path": item.path, + "patternId": item.pattern_id, + "line": item.line, + "severity": item.severity, + "sample": item.sample, + } + for item in report.redaction_findings[:limit] + ], + } + + +def assurance_rows(report: InstitutionalAssuranceReport) -> list[list[str]]: + rows = [ + [ + "kind", + "id", + "owner_or_platform", + "status", + "score_or_lines", + "signals_or_orders", + "missing", + "source_hash", + ] + ] + for decision in report.decision_checks: + rows.append( + [ + "decision", + decision.decision_id, + decision.owner_platform_id, + decision.status.value, + str(decision.score), + str(decision.signal_count), + "; ".join(decision.missing_evidence_fields or decision.missing_terms), + report.source_hash, + ] + ) + for platform in report.platform_checks: + rows.append( + [ + "platform", + platform.platform_id, + platform.canonical_project_id, + platform.status.value, + str(platform.code_lines_analyzed), + str(platform.active_orders_count), + "; ".join(platform.missing_decisions), + report.source_hash, + ] + ) + return rows + + +def rows_to_csv(rows: Sequence[Sequence[str]]) -> str: + buffer = io.StringIO() + writer = csv.writer(buffer, lineterminator="\n") + writer.writerows(rows) + return buffer.getvalue() + + +def assurance_markdown(report: InstitutionalAssuranceReport) -> str: + """Render a redacted human-readable assurance report.""" + + lines = [ + "# Institutional Assurance - Router 000", + "", + f"- report_id: `{report.report_id}`", + f"- generated_at: `{report.generated_at}`", + f"- ok: `{report.ok}`", + f"- source_hash: `{report.source_hash}`", + f"- sandbox_mode: `{report.sandbox_mode}`", + f"- cloudflare_plugin_auth_attempt: `{redact_sensitive_text(report.plugin_auth_attempt or 'not_recorded')}`", + "", + "## Summary", + "", + ] + for key, value in report.summary.items(): + lines.append(f"- {key}: `{value}`") + lines.extend(["", "## Decisions", ""]) + for item in report.decision_checks: + lines.extend( + [ + f"### {item.decision_id}", + "", + f"- title: {item.title}", + f"- owner: `{item.owner_platform_id}`", + f"- status: `{item.status.value}`", + f"- score: `{item.score}`", + f"- signals: `{item.signal_count}`", + f"- found_evidence_fields: `{', '.join(item.found_evidence_fields) or 'none'}`", + f"- missing_evidence_fields: `{', '.join(item.missing_evidence_fields) or 'none'}`", + ] + ) + if item.source_paths: + lines.append("- sample_sources:") + for path in item.source_paths[:6]: + lines.append(f" - `{redact_sensitive_text(path)}`") + if item.next_actions: + lines.append("- next_actions:") + for action in item.next_actions: + lines.append(f" - {redact_sensitive_text(action)}") + lines.append("") + lines.extend(["## Platforms", ""]) + for item in report.platform_checks: + lines.extend( + [ + f"### {item.platform_id}", + "", + f"- canonical_project_id: `{item.canonical_project_id}`", + f"- status: `{item.status.value}`", + f"- code_lines_analyzed: `{item.code_lines_analyzed}`", + f"- active_orders_count: `{item.active_orders_count}`", + f"- decisions_with_signal: `{', '.join(item.decisions_with_signal) or 'none'}`", + f"- missing_decisions: `{', '.join(item.missing_decisions) or 'none'}`", + f"- redaction_findings: `{item.redaction_findings}`", + "", + ] + ) + lines.extend(["## Pending", ""]) + if report.pending_items: + for item in report.pending_items: + lines.append(f"- {redact_sensitive_text(item)}") + else: + lines.append("- Nenhuma pendencia compacta.") + lines.extend(["", "## Redaction", ""]) + if report.redaction_findings: + for finding in report.redaction_findings[:80]: + lines.append( + f"- `{finding.severity}` `{redact_sensitive_text(finding.path)}:{finding.line}` " + f"{finding.pattern_id} sample `{finding.sample}`" + ) + if len(report.redaction_findings) > 80: + lines.append(f"- ... {len(report.redaction_findings) - 80} additional findings in JSON.") + else: + lines.append("- No secret-shaped value was persisted in generated assurance artifacts.") + return "\n".join(lines).strip() + "\n" + + +def platform_closeout_markdown(report: InstitutionalAssuranceReport, platform: PlatformAssurance) -> str: + """Render a platform-specific closeout record for central folders.""" + + related_decisions = [ + decision for decision in report.decision_checks + if decision.decision_id in set(platform.decisions_with_signal) or decision.owner_platform_id in {platform.platform_id, platform.canonical_project_id} + ] + lines = [ + "# EXECUTADO - Institutional Assurance", + "", + f"- platform_id: `{platform.platform_id}`", + f"- canonical_project_id: `{platform.canonical_project_id}`", + f"- report_id: `{report.report_id}`", + f"- source_hash: `{report.source_hash}`", + f"- status: `{platform.status.value}`", + f"- code_lines_analyzed: `{platform.code_lines_analyzed}`", + f"- active_orders_observed: `{platform.active_orders_count}`", + "", + "## Decisions Covered", + "", + ] + if related_decisions: + for decision in related_decisions: + lines.append( + f"- `{decision.decision_id}`: `{decision.status.value}` " + f"score `{decision.score}`, signals `{decision.signal_count}`" + ) + else: + lines.append("- No direct decision signal found; platform remains in ecosystem coverage only.") + lines.extend(["", "## Pending", ""]) + if platform.missing_decisions: + for decision_id in platform.missing_decisions: + lines.append(f"- Materializar evidencia local para `{decision_id}` quando a plataforma for owner ou consumidora direta.") + if platform.redaction_findings: + lines.append(f"- Revisar {platform.redaction_findings} achados de redaction em fontes/dossies; valores brutos nao foram reproduzidos aqui.") + if report.sandbox_mode != "danger-full-access": + lines.append("- workspace-write: deploy Cloudflare e git push ficam como pendencia real.") + if not platform.missing_decisions and not platform.redaction_findings and report.sandbox_mode == "danger-full-access": + lines.append("- Nenhuma pendencia especifica da plataforma nesta assurance.") + lines.extend(["", "## File Roles", ""]) + lines.append("- Este arquivo registra fechamento sintetico da assurance institucional para a pasta central da plataforma.") + lines.append("- O JSON/CSV canonico fica no projeto real Mais Humana para consumo GPT/UI/MCP.") + return "\n".join(lines).strip() + "\n" + + +def assurance_generated_records(project_root: Path) -> tuple[GeneratedFile, ...]: + relation = "000-ROTEADOR-PERMANENTE-DE-ORDEM_DE_SERVICO" + entries = ( + ("dados/institutional-assurance-report.json", "Relatorio completo de assurance institucional.", "institutional assurance full report", "json"), + ("dados/institutional-assurance-compacto.json", "Relatorio compacto para GPT/UI/MCP.", "institutional assurance compact payload", "json"), + ("matrizes/institutional-assurance.csv", "Matriz tabular das decisoes e plataformas.", "institutional assurance matrix", "csv"), + ("ecossistema/INSTITUTIONAL-ASSURANCE.md", "Relatorio humano redigido da assurance institucional.", "institutional assurance human report", "markdown"), + ) + return tuple( + GeneratedFile( + path=str(project_root / rel), + description=description, + function=function, + file_type=file_type, + changed_by="mais_humana.institutional_assurance", + change_summary="Criado ou atualizado artefato de assurance das seis decisoes institucionais.", + relation_to_order=relation, + ) + for rel, description, function, file_type in entries + ) + + +def central_assurance_records(report: InstitutionalAssuranceReport) -> tuple[GeneratedFile, ...]: + records: list[GeneratedFile] = [] + relation = "000-ROTEADOR-PERMANENTE-DE-ORDEM_DE_SERVICO" + for platform in report.platform_checks: + scan = next((item for item in report.scans if item.platform_id == platform.platform_id), None) + if scan is None or not scan.central_folder_exists: + continue + path = Path(scan.central_folder) / "reports" / "EXECUTADO__institutional-assurance.md" + records.append( + GeneratedFile( + path=str(path), + description=f"Fechamento sintetico da assurance institucional para {platform.platform_id}.", + function="platform institutional assurance closeout", + file_type="markdown", + changed_by="mais_humana.institutional_assurance", + change_summary="Registrada cobertura das seis decisoes e pendencias reais da plataforma.", + relation_to_order=relation, + ) + ) + return tuple(records) + + +def write_assurance_artifacts( + report: InstitutionalAssuranceReport, + project_root: Path, + *, + write_central: bool = True, +) -> tuple[GeneratedFile, ...]: + """Write report artifacts and platform closeout records.""" + + targets: list[tuple[Path, str]] = [ + (project_root / "dados" / "institutional-assurance-report.json", json.dumps(report.to_dict(), ensure_ascii=False, indent=2, sort_keys=True)), + (project_root / "dados" / "institutional-assurance-compacto.json", json.dumps(compact_assurance_payload(report), ensure_ascii=False, indent=2, sort_keys=True)), + (project_root / "matrizes" / "institutional-assurance.csv", rows_to_csv(assurance_rows(report))), + (project_root / "ecossistema" / "INSTITUTIONAL-ASSURANCE.md", assurance_markdown(report)), + ] + records = list(assurance_generated_records(project_root)) + if write_central: + for platform in report.platform_checks: + scan = next((item for item in report.scans if item.platform_id == platform.platform_id), None) + if scan is None or not scan.central_folder_exists: + continue + target = Path(scan.central_folder) / "reports" / "EXECUTADO__institutional-assurance.md" + targets.append((target, platform_closeout_markdown(report, platform))) + records.extend(central_assurance_records(report)) + for path, content in targets: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + return tuple(records) + + +def write_assurance_semantic_records( + report: InstitutionalAssuranceReport, + records: Sequence[GeneratedFile], + *, + project_root: Path, + update_central: bool = True, +) -> tuple[GeneratedFile, ...]: + """Update local and central semantic SQLite files with generated file roles.""" + + status_record = GeneratedFile( + path=str(project_root / "dados" / "institutional-assurance-semantic-write-status.json"), + description="Status da escrita semantica da assurance institucional.", + function="institutional assurance semantic write status", + file_type="json", + changed_by="mais_humana.institutional_assurance", + change_summary="Registrado resultado da atualizacao dos controles semanticos.", + relation_to_order="000-ROTEADOR-PERMANENTE-DE-ORDEM_DE_SERVICO", + ) + errors: list[dict[str, str]] = [] + tracked = tuple(records) + (status_record,) + try: + with connect(project_root / "controle-semantico.sqlite") as conn: + upsert_files(conn, tracked) + conn.commit() + except Exception as exc: + errors.append({"sqlite": str(project_root / "controle-semantico.sqlite"), "error": f"{type(exc).__name__}: {exc}"}) + if update_central: + by_central: dict[str, list[GeneratedFile]] = {} + for record in records: + path = Path(record.path) + for scan in report.scans: + central = Path(scan.central_folder) + try: + path.relative_to(central) + except ValueError: + continue + by_central.setdefault(str(central), []).append(record) + for central_path, central_records in by_central.items(): + sqlite_path = Path(central_path) / "controle-semantico.sqlite" + try: + with connect(sqlite_path) as conn: + upsert_files(conn, central_records) + conn.commit() + except Exception as exc: + errors.append({"sqlite": str(sqlite_path), "error": f"{type(exc).__name__}: {exc}"}) + status_payload = { + "generatedAt": utc_now(), + "ok": not errors, + "recordsCount": len(tracked), + "errors": errors, + } + status_path = Path(status_record.path) + status_path.parent.mkdir(parents=True, exist_ok=True) + status_path.write_text(json.dumps(status_payload, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") + try: + with connect(project_root / "controle-semantico.sqlite") as conn: + upsert_files(conn, (status_record,)) + conn.commit() + except Exception: + pass + return tracked + + +def run_institutional_assurance( + *, + ecosystem_root: Path, + project_root: Path, + central_projects_root: Path, + sandbox_mode: str = "workspace-write", + plugin_auth_attempt: str = "", + write_central: bool = True, +) -> tuple[InstitutionalAssuranceReport, tuple[GeneratedFile, ...]]: + """Run the full assurance scan, write artifacts, and update semantic SQL.""" + + scopes = default_platform_scopes(ecosystem_root, central_projects_root) + report = build_institutional_assurance_report( + scopes, + sandbox_mode=sandbox_mode, + plugin_auth_attempt=plugin_auth_attempt, + ) + records = write_assurance_artifacts(report, project_root, write_central=write_central) + semantic_records = write_assurance_semantic_records( + report, + records, + project_root=project_root, + update_central=write_central, + ) + return report, semantic_records +