"""Generate the Mais Humana source rulebook from canonical catalogs. The generated module is committed as source because the operational round needs an inspectable Python rulebook, not only JSON output. The generator keeps the large catalog deterministic and tied to the existing platform/profile catalog. """ from __future__ import annotations import argparse import textwrap from pathlib import Path from typing import Iterable ROOT = Path(__file__).resolve().parents[1] SRC = ROOT / "src" def ensure_import_path() -> None: import sys src = str(SRC) if src not in sys.path: sys.path.insert(0, src) def q(value: str) -> str: return repr(str(value)) def tuple_literal(values: Iterable[str], *, indent: int = 8) -> str: cleaned = [str(value) for value in values if str(value).strip()] if not cleaned: return "()" if len(cleaned) <= 18: return "(" + ", ".join(f"{q(value)}" for value in cleaned) + ("," if len(cleaned) == 1 else "") + ")" pad = " " * indent lines = ["("] for value in cleaned: lines.append(f"{pad}{q(value)},") lines.append(" " * (indent - 4) + ")") return "\n".join(lines) def normalize_surface(value: str) -> str: return value.replace("_", "-").replace(" ", "-").lower() def category_value(category: object) -> str: return getattr(category, "value", str(category)) def category_markers(category: object, platform_id: str) -> tuple[str, ...]: value = category_value(category) base = { "administration": ("admin", "organization", "tenant", "permission", "operator"), "support": ("support", "incident", "diagnostic", "nextAction", "ticket"), "finance": ("invoice", "usage", "quota", "billing", "reconciliation"), "legal": ("contract", "policy", "consent", "risk", "evidence"), "security": ("identity", "rbac", "credentialRef", "audit", "redaction"), "operations": ("health", "readiness", "smoke", "runbook", "status"), "strategy": ("roadmap", "maturity", "risk", "priority", "executive"), "documentation": ("docs", "canonical", "version", "hash", "contract"), "self_service": ("onboarding", "portal", "wizard", "action", "status"), "commercial": ("plan", "entitlement", "checkout", "sellable", "block"), "experience": ("screen", "panelReady", "sameSource", "sourceHash", "renderedState"), "governance": ("audit", "trace", "policy", "schema", "release"), "integration": ("provider", "BYOK", "credentialRef", "smoke", "tenant"), "observability": ("metrics", "latency", "trace", "audit", "evidence"), } markers = list(base.get(value, (value, "readiness", "evidence"))) markers.append(platform_id) return tuple(markers) def negative_markers(platform_id: str, surface: str = "") -> tuple[str, ...]: markers = ["blocked", "unsupported", "missing", "needs_token", "catalogOnly", "catalog_only"] if platform_id == "docs": markers.extend(["catalogOnly", "responseReady ausente"]) if platform_id == "integracoes": markers.extend(["test_user_not_found", "credential leak", "secret leaked"]) if platform_id == "intelligence": markers.extend(["unsupported", "sem backend"]) if surface: markers.append(f"{surface} ausente") return tuple(dict.fromkeys(markers)) def source_of_truth(platform_id: str) -> str: if platform_id == "mcps": return "tudo-para-ia-mcps-internos-plataform" return f"{platform_id} via tudo-para-ia-mcps-internos-plataform" def validation_steps(platform_id: str, profile_id: str, surface: str, category: str) -> tuple[str, ...]: return ( f"confirmar que {platform_id} expoe dados por MCP antes de qualquer painel paralelo", f"validar campos de transito MCP para {profile_id}", f"comparar sourceHash/sameSource para superficie {surface}", f"registrar evidencia sanitizada da categoria {category}", "registrar pendencia real se a validacao depender de credencial ou decisao externa", ) def payload_fields(platform_id: str, profile_id: str, surface: str, category: str) -> tuple[str, ...]: return ( "projectId", "platformId", "profileId", "surfaceId", "category", "sourceEndpoint", "sourceToolId", "sourcePayloadHash", "sourceRecordsHash", "truthState", "panelReady", "gptExplainable", "humanNextAction", f"{platform_id}Status", f"{profile_id}Need", f"{normalize_surface(surface)}State", f"{category}Gate", ) def rule_block( var_name: str, *, rule_id: str, scope: str, platform_id: str, profile_id: str, title: str, purpose: str, surfaces: tuple[str, ...], success: tuple[str, ...], evidence: tuple[str, ...], negative: tuple[str, ...], payload: tuple[str, ...], validations: tuple[str, ...], next_order: str, priority: str, generated_from: str, ) -> str: return "\n".join( [ f"{var_name} = HumanControlRule(", f" rule_id={q(rule_id)},", f" scope=RuleScope.{scope},", f" platform_id={q(platform_id)},", f" profile_id={q(profile_id)},", f" title={q(title)},", f" purpose={q(purpose)},", f" source_of_truth={q(source_of_truth(platform_id))},", f" required_surfaces={tuple_literal(surfaces)},", f" success_markers={tuple_literal(success)},", f" evidence_terms={tuple_literal(evidence)},", f" negative_terms={tuple_literal(negative)},", " mcp_transit_fields=MCP_TRANSIT_FIELDS,", f" expected_payload_fields={tuple_literal(payload)},", f" validation_steps={tuple_literal(validations)},", f" next_order_hint={q(next_order)},", f" priority={q(priority)},", f" generated_from={q(generated_from)},", ")", ] ) def build_rules() -> str: ensure_import_path() from mais_humana.catalog import HUMAN_PROFILES, PLATFORM_BY_ID, PLATFORMS blocks: list[str] = [] names: list[str] = [] index = 1 for platform in PLATFORMS: for profile in HUMAN_PROFILES: common = set(platform.primary_categories).intersection(profile.priority_needs) categories = tuple(common or platform.primary_categories[:1] or profile.priority_needs[:1]) category = category_value(categories[0]) surfaces = tuple(platform.expected_surfaces[:3] or ("readiness", "evidence")) title = f"{platform.title} atende {profile.name} por MCP" purpose = ( f"Garantir que {profile.name} receba leitura humana de {platform.title} " "pela mesma fonte administrativa que o GPT explica." ) evidence = ( platform.platform_id, platform.repo_name, profile.profile_id, category, "admin_ui", "sameSource", "panelReady", "traceId", "auditId", ) success = category_markers(categories[0], platform.platform_id) + ( "sameSource", "panelReady", "gptExplainable", ) name = f"RULE_{index:04d}" names.append(name) blocks.append( rule_block( name, rule_id=f"{platform.platform_id}__{profile.profile_id}__mcp", scope="PROFILE_PLATFORM", platform_id=platform.platform_id, profile_id=profile.profile_id, title=title, purpose=purpose, surfaces=surfaces, success=success, evidence=evidence, negative=negative_markers(platform.platform_id), payload=payload_fields(platform.platform_id, profile.profile_id, surfaces[0], category), validations=validation_steps(platform.platform_id, profile.profile_id, surfaces[0], category), next_order=f"criar ou validar superficie MCP de {platform.platform_id} para {profile.profile_id}", priority="alta" if platform.known_blockers else "media", generated_from="platform_profile_catalog", ) ) index += 1 for surface in platform.expected_surfaces: surface_id = normalize_surface(surface) category = category_value(platform.primary_categories[0]) if platform.primary_categories else "governance" name = f"RULE_{index:04d}" names.append(name) blocks.append( rule_block( name, rule_id=f"{platform.platform_id}__surface__{surface_id}", scope="PLATFORM_SURFACE", platform_id=platform.platform_id, profile_id="todos", title=f"Superficie {surface} de {platform.title} passa pelo MCP", purpose=( f"Formalizar que a superficie {surface} de {platform.title} possui dados, " "diagnostico, evidencia e acao controlados pelo MCP." ), surfaces=(surface, "admin_ui", "screenData", "screenDiagnostic"), success=( surface, "sourceEndpoint", "sourceToolId", "sourcePayloadHash", "sourceRecordsHash", "sameSource", "panelReady", ), evidence=( surface, platform.repo_name, "admin_ui", "screen", "readiness", "evidence", ), negative=negative_markers(platform.platform_id, surface), payload=payload_fields(platform.platform_id, "todos", surface, category), validations=validation_steps(platform.platform_id, "todos", surface, category), next_order=f"promover {surface} de {platform.platform_id} a painel MCP same-source", priority="alta" if surface.lower() in {"byok", "canonical-docs", "admin_ui"} else "media", generated_from="platform_surface_catalog", ) ) index += 1 for profile in HUMAN_PROFILES: for surface in platform.expected_surfaces: if profile.profile_id not in platform.expected_profiles: continue profile_categories = tuple(category_value(item) for item in profile.priority_needs) platform_categories = tuple(category_value(item) for item in platform.primary_categories) category = next((item for item in profile_categories if item in platform_categories), platform_categories[0] if platform_categories else "governance") surface_id = normalize_surface(surface) name = f"RULE_{index:04d}" names.append(name) blocks.append( rule_block( name, rule_id=f"{platform.platform_id}__{profile.profile_id}__{surface_id}", scope="PROFILE_SURFACE", platform_id=platform.platform_id, profile_id=profile.profile_id, title=f"{profile.name} usa {surface} de {platform.title} com verdade rastreavel", purpose=( f"Exigir que {surface} sirva {profile.name} com payload sanitizado, " "fonte unica MCP e criterio humano de pronto." ), surfaces=(surface, "admin_ui", "screenReport", "screenEvidence"), success=category_markers(category, platform.platform_id) + ( surface, "sourceRecordsHash", "humanNextAction", ), evidence=( profile.profile_id, surface, platform.platform_id, category, "screenData", "sameSource", "traceId", "auditId", ), negative=negative_markers(platform.platform_id, surface), payload=payload_fields(platform.platform_id, profile.profile_id, surface, category), validations=validation_steps(platform.platform_id, profile.profile_id, surface, category), next_order=f"validar {surface} para {profile.profile_id} em {platform.platform_id}", priority="alta" if profile.profile_id in platform.expected_profiles else "media", generated_from="profile_surface_catalog", ) ) index += 1 for related in platform.related_platforms: related_platform = PLATFORM_BY_ID.get(related) related_repo = related_platform.repo_name if related_platform is not None else related name = f"RULE_{index:04d}" names.append(name) blocks.append( rule_block( name, rule_id=f"{platform.platform_id}__depends__{related}", scope="DEPENDENCY", platform_id=platform.platform_id, profile_id="gestor_operacional", title=f"{platform.title} declara dependencia controlada com {related}", purpose=( f"Evitar integracao direta invisivel entre {platform.platform_id} e {related}; " "todo transito precisa passar pelo MCP com trace e audit." ), surfaces=("admin_ui", "dependencyGraph", "screenDiagnostic", "screenEvidence"), success=( platform.platform_id, related, related_repo, "origin", "destination", "traceId", "auditId", "sameSource", ), evidence=( platform.repo_name, related_repo, "related_platforms", "dependency", "mcp", "audit", ), negative=negative_markers(platform.platform_id), payload=( "origin", "destination", "relationType", "sourcePayloadHash", "sourceRecordsHash", "permission", "result", "traceId", "auditId", ), validations=( f"listar dependencia {platform.platform_id}->{related} no grafo humano", "validar que a dependencia possui payload hash e records hash", "registrar auditId e traceId no relatorio humano", "criar OS se a dependencia exigir chamada direta fora do MCP", ), next_order=f"consolidar dependencia {platform.platform_id}->{related} no MCP central", priority="alta" if related in {"mcps", "identity", "business", "docs"} else "media", generated_from="platform_dependency_catalog", ) ) index += 1 identity_rules = [ ( "canonical-name", "Nome canonico Mais Humana Plafatorm", "Registrar nome canonico tudo-para-ia-mais-humana-plataform sem apagar o historico do nome atual.", ("tudo-para-ia-mais-humana", "tudo-para-ia-mais-humana-plataform", "admin/tudo-para-ia-mais-humana-plataform"), ), ( "mcp-only", "Administracao somente pelo MCP", "Impedir que a Mais Humana use atalhos diretos entre plataformas fora da tudo-para-ia-mcps-internos-plataform.", ("origin", "destination", "tool", "payload", "actor", "permission", "result", "traceId", "auditId", "timestamp"), ), ( "ui-support", "UI como apoio visual, nao fonte paralela", "Formalizar que a UI renderiza contratos e dados vindos do MCP, sem inventar verdade operacional.", ("ui", "screenData", "sameSource", "panelReady", "sourceRecordsHash"), ), ] for suffix, title, purpose, evidence in identity_rules: name = f"RULE_{index:04d}" names.append(name) blocks.append( rule_block( name, rule_id=f"mais_humana__identity__{suffix}", scope="CANONICAL_IDENTITY", platform_id="mcps", profile_id="administrador_empresa", title=title, purpose=purpose, surfaces=("admin_ui", "screenDiagnostic", "screenEvidence"), success=evidence, evidence=evidence + ("central-de-ordem-de-servico", "controle-semantico.sqlite"), negative=("nome antigo sem alias", "conexao direta sem MCP", "fonte paralela"), payload=( "currentProjectId", "canonicalProjectId", "controlPlaneId", "uiSupportPlatformId", "renameStatus", "permissionStatus", "auditId", "traceId", ), validations=( "confirmar README com nome canonico e alias historico", "confirmar SQLite com identidade canonica e dependencia MCP", "confirmar que relatorios humanos citam MCP como caminho unico", ), next_order="executar renome institucional quando houver permissao de repositorio e janela segura", priority="alta", generated_from="canonical_identity_order_0027", ) ) index += 1 header = textwrap.dedent( '''\ """Generated human control rulebook. Do not edit this file by hand. Regenerate it with: python tools/generate_human_rulebook.py The entries are Python source on purpose: the operational platform can import, inspect, test, and package the rulebook without parsing external YAML/JSON during a service-order round. """ from __future__ import annotations from .human_rulebook import HumanControlRule, MCP_TRANSIT_FIELDS, RuleScope ''' ) body = "\n\n".join(blocks) rules_tuple = "RULES: tuple[HumanControlRule, ...] = (\n" + "".join(f" {name},\n" for name in names) + ")\n" indexes = textwrap.dedent( '''\ RULES_BY_PLATFORM: dict[str, tuple[HumanControlRule, ...]] = { platform_id: tuple(rule for rule in RULES if rule.platform_id == platform_id) for platform_id in sorted({rule.platform_id for rule in RULES}) } RULES_BY_PROFILE: dict[str, tuple[HumanControlRule, ...]] = { profile_id: tuple(rule for rule in RULES if rule.profile_id == profile_id) for profile_id in sorted({rule.profile_id for rule in RULES}) } def rule_ids() -> tuple[str, ...]: return tuple(rule.rule_id for rule in RULES) def rules_for_platform(platform_id: str) -> tuple[HumanControlRule, ...]: return RULES_BY_PLATFORM.get(platform_id, ()) def rules_for_profile(profile_id: str) -> tuple[HumanControlRule, ...]: return RULES_BY_PROFILE.get(profile_id, ()) ''' ) return header + body + "\n\n" + rules_tuple + indexes def write_rulebook(output: Path) -> int: output.parent.mkdir(parents=True, exist_ok=True) text = build_rules() output.write_text(text, encoding="utf-8", newline="\n") return len(text.splitlines()) def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Generate generated_human_rulebook.py") parser.add_argument( "--output", default=str(SRC / "mais_humana" / "generated_human_rulebook.py"), ) args = parser.parse_args(argv) lines = write_rulebook(Path(args.output)) print(f"generated {lines} lines at {args.output}") return 0 if __name__ == "__main__": raise SystemExit(main())