Files
tudo-para-ia-mais-humana-pl…/src/mais_humana/round_assurance.py
2026-04-30 06:42:00 -03:00

394 lines
14 KiB
Python

"""Assurance checks for a full Mais Humana service-order round."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Sequence
from .exit_order_compiler import CompiledOrderSet
from .governance_models import EcosystemGovernancePortfolio, RoundExecutionPackage, RoundMinimumStatus
from .models import PlatformHumanReport, ReportBundle, as_plain_data, merge_unique
@dataclass(slots=True)
class AssuranceCase:
"""One assurance case expected at round closeout."""
case_id: str
title: str
required: bool
passed: bool
severity: str
evidence: tuple[str, ...]
reason: str
next_action: str
def to_dict(self) -> dict[str, object]:
return as_plain_data(self)
@dataclass(slots=True)
class AssuranceSuite:
"""Complete assurance result for the round."""
suite_id: str
cases: tuple[AssuranceCase, ...]
passed: bool
blocker_count: int
warning_count: int
summary: tuple[str, ...]
def to_dict(self) -> dict[str, object]:
return as_plain_data(self)
@property
def failed_cases(self) -> tuple[AssuranceCase, ...]:
return tuple(case for case in self.cases if not case.passed)
def case(
case_id: str,
title: str,
passed: bool,
reason: str,
next_action: str,
evidence: Iterable[str] = (),
severity: str = "warning",
required: bool = True,
) -> AssuranceCase:
return AssuranceCase(
case_id=case_id,
title=title,
required=required,
passed=passed,
severity="info" if passed else severity,
evidence=tuple(str(item) for item in evidence),
reason=reason,
next_action=next_action,
)
def artifact_exists(root: Path, relative: str) -> bool:
return (root / relative).exists()
def no_node_modules(root: Path) -> bool:
if not root.exists():
return True
for path in root.rglob("node_modules"):
if path.is_dir():
return False
return True
def generated_file_paths(bundle: ReportBundle) -> set[str]:
return {item.path.replace("\\", "/") for item in bundle.generated_files}
def required_artifact_cases(project_root: Path, bundle: ReportBundle) -> tuple[AssuranceCase, ...]:
required = (
"dados/snapshot-ecossistema.json",
"dados/dossie-operacional-humano.json",
"dados/governanca-operacional.json",
"ecossistema/RELATORIO-GERAL-DO-ECOSSISTEMA-humana.md",
"ecossistema/GOVERNANCA-OPERACIONAL-MAIS-HUMANA.md",
"ecossistema/DOSSIE-OPERACIONAL-HUMANO.md",
"matrizes/matriz-plataforma-perfil.csv",
"matrizes/governanca-checks.csv",
"graficos/matriz-plataforma-perfil.svg",
"relatorios-docx/RELATORIO-GERAL-DO-ECOSSISTEMA-humana.docx",
)
paths = generated_file_paths(bundle)
output: list[AssuranceCase] = []
for relative in required:
exists = artifact_exists(project_root, relative)
output.append(
case(
f"artifact.{relative.replace('/', '.')}",
f"Artefato obrigatorio {relative}",
exists,
"Artefato encontrado." if exists else "Artefato nao foi encontrado no projeto real.",
"regenerar relatorios e conferir escrita no projeto real",
evidence=(relative, "registrado" if relative in paths else "nao registrado no bundle"),
severity="blocker",
)
)
return tuple(output)
def minimum_cases(package: RoundExecutionPackage | None) -> tuple[AssuranceCase, ...]:
if package is None:
return (
case(
"minimum.lifecycle-package",
"Pacote de lifecycle existe",
False,
"Pacote de lifecycle nao foi produzido.",
"gerar fechamento de ordens ativas",
severity="blocker",
),
)
output: list[AssuranceCase] = []
for minimum in package.minimums:
output.append(
case(
f"minimum.{minimum.minimum_id}",
minimum.title,
minimum.status == RoundMinimumStatus.MET,
minimum.reason,
minimum.next_action,
evidence=(str(minimum.actual_value), str(minimum.required_value), minimum.status.value),
severity="blocker" if minimum.status == RoundMinimumStatus.IMPOSSIBLE else "warning",
)
)
return tuple(output)
def governance_cases(portfolio: EcosystemGovernancePortfolio) -> tuple[AssuranceCase, ...]:
avg = portfolio.average_governance_score
blocked = len(portfolio.blocked_platforms)
candidates_exec = sum(1 for candidate in portfolio.order_candidates if candidate.order_type.value == "executiva")
candidates_man = sum(1 for candidate in portfolio.order_candidates if candidate.order_type.value == "gerencial")
return (
case(
"governance.portfolio",
"Portfolio de governanca criado",
bool(portfolio.cards),
f"Cards criados: {len(portfolio.cards)}; score medio {avg}.",
"reexecutar avaliacao de governanca",
evidence=(str(len(portfolio.cards)), str(avg)),
severity="blocker",
),
case(
"governance.blockers-classified",
"Blockers de governanca classificados",
bool(portfolio.blockers_summary) or blocked == 0,
f"Plataformas bloqueadas: {blocked}.",
"classificar blockers por check, dominio, plataforma e proxima acao",
evidence=portfolio.blockers_summary[:8],
severity="warning",
),
case(
"governance.executive-candidates",
"Candidatas executivas reais existem",
candidates_exec >= 5,
f"Candidatas executivas: {candidates_exec}.",
"criar checks executivos para pendencias materiais ainda nao cobertas",
evidence=(str(candidates_exec),),
severity="warning",
),
case(
"governance.managerial-candidates",
"Candidatas gerenciais reais existem",
candidates_man >= 5,
f"Candidatas gerenciais: {candidates_man}.",
"criar checks gerenciais para maturidade e relacoes de ecossistema",
evidence=(str(candidates_man),),
severity="warning",
),
)
def compiled_order_cases(compiled: CompiledOrderSet | None) -> tuple[AssuranceCase, ...]:
if compiled is None:
return ()
return (
case(
"orders.compiled-executive",
"Ordens executivas compiladas",
compiled.executive_count >= 5,
f"Executivas compiladas: {compiled.executive_count}.",
"usar candidatas de governanca ou recomendacoes reais para completar saida",
evidence=(str(compiled.executive_count),),
severity="warning",
),
case(
"orders.compiled-managerial",
"Ordens gerenciais compiladas",
compiled.managerial_count >= 5,
f"Gerenciais compiladas: {compiled.managerial_count}.",
"usar candidatas de governanca ou recomendacoes reais para completar saida",
evidence=(str(compiled.managerial_count),),
severity="warning",
),
)
def platform_cases(reports: Sequence[PlatformHumanReport]) -> tuple[AssuranceCase, ...]:
total = len(reports)
with_code = sum(1 for report in reports if report.scan.code_lines > 0)
with_evidence = sum(1 for report in reports if report.scan.evidence)
return (
case(
"platforms.count",
"Catalogo de 14 plataformas analisado",
total >= 14,
f"Plataformas no relatorio: {total}.",
"revisar catalogo canonico de plataformas",
evidence=(str(total),),
severity="blocker",
),
case(
"platforms.code-evidence",
"Evidencia tecnica encontrada em plataformas",
with_code > 0,
f"Plataformas com codigo detectado: {with_code}.",
"verificar raiz do ecossistema e budgets de scanner",
evidence=(str(with_code),),
severity="warning",
),
case(
"platforms.local-evidence",
"Evidencias locais coletadas",
with_evidence > 0,
f"Plataformas com evidencias: {with_evidence}.",
"ampliar scanner ou registrar ausencia material",
evidence=(str(with_evidence),),
severity="warning",
),
)
def operational_hygiene_cases(project_root: Path, central_folder: Path | None) -> tuple[AssuranceCase, ...]:
cases = [
case(
"hygiene.no-node-modules",
"node_modules removido do projeto real",
no_node_modules(project_root),
"Nenhuma pasta node_modules local foi encontrada." if no_node_modules(project_root) else "node_modules foi encontrado.",
"remover node_modules antes de sincronizar",
evidence=(str(project_root),),
severity="blocker",
)
]
if central_folder is not None:
cases.append(
case(
"hygiene.semantic-sql",
"SQLite semantico existe",
(central_folder / "controle-semantico.sqlite").exists(),
"SQLite semantico encontrado." if (central_folder / "controle-semantico.sqlite").exists() else "SQLite semantico ausente.",
"executar write_semantic_state e registrar funcoes de arquivos",
evidence=(str(central_folder / "controle-semantico.sqlite"),),
severity="blocker",
)
)
return tuple(cases)
def cloudflare_premise_cases(extra_text: Sequence[str]) -> tuple[AssuranceCase, ...]:
text = "\n".join(extra_text).lower()
plugin_attempted = "plugin cloudflare" in text or "user rejected mcp tool call" in text or "mcp tool call" in text
plugin_as_blocker = "plugin cloudflare" in text and ("blocker" in text or "bloqueio" in text)
return (
case(
"cloudflare.plugin-tested",
"Teste inicial do plugin Cloudflare registrado",
plugin_attempted,
"Tentativa do plugin Cloudflare foi informada no contexto da rodada." if plugin_attempted else "Nao ha registro textual da tentativa do plugin.",
"registrar somente a tentativa, sem tratar falha como bloqueio",
evidence=extra_text[:4],
severity="warning",
required=False,
),
case(
"cloudflare.plugin-not-blocker",
"Falha do plugin Cloudflare nao virou blocker",
not plugin_as_blocker,
"Nao foi detectada classificacao direta da falha do plugin como blocker.",
"remover qualquer pendencia que use o plugin como impedimento operacional",
evidence=extra_text[:4],
severity="blocker",
),
)
def build_assurance_suite(
project_root: Path,
bundle: ReportBundle,
platform_reports: Sequence[PlatformHumanReport],
portfolio: EcosystemGovernancePortfolio,
lifecycle_package: RoundExecutionPackage | None = None,
compiled_orders: CompiledOrderSet | None = None,
central_folder: Path | None = None,
extra_text: Sequence[str] = (),
) -> AssuranceSuite:
cases: list[AssuranceCase] = []
cases.extend(required_artifact_cases(project_root, bundle))
cases.extend(platform_cases(platform_reports))
cases.extend(governance_cases(portfolio))
cases.extend(compiled_order_cases(compiled_orders))
cases.extend(minimum_cases(lifecycle_package))
cases.extend(operational_hygiene_cases(project_root, central_folder))
cases.extend(cloudflare_premise_cases(extra_text))
blocker_count = sum(1 for item in cases if not item.passed and item.required and item.severity == "blocker")
warning_count = sum(1 for item in cases if not item.passed and item.severity != "blocker")
summary = (
f"Casos de assurance: {len(cases)}",
f"Blockers de assurance: {blocker_count}",
f"Warnings de assurance: {warning_count}",
f"Artefatos gerados no bundle: {len(bundle.generated_files)}",
f"Plataformas no portfolio: {len(portfolio.cards)}",
)
return AssuranceSuite(
suite_id="mais-humana.assurance.v1",
cases=tuple(cases),
passed=blocker_count == 0,
blocker_count=blocker_count,
warning_count=warning_count,
summary=summary,
)
def assurance_markdown(suite: AssuranceSuite) -> str:
lines = [
"# Assurance da rodada Mais Humana",
"",
f"- suite_id: `{suite.suite_id}`",
f"- passed: `{suite.passed}`",
f"- blockers: `{suite.blocker_count}`",
f"- warnings: `{suite.warning_count}`",
"",
"## Sumario",
"",
]
lines.extend(f"- {item}" for item in suite.summary)
lines.extend(["", "## Casos", ""])
for item in suite.cases:
status = "ok" if item.passed else item.severity
lines.append(f"### {item.case_id}")
lines.append("")
lines.append(f"- status: `{status}`")
lines.append(f"- obrigatorio: `{item.required}`")
lines.append(f"- titulo: {item.title}")
lines.append(f"- razao: {item.reason}")
lines.append(f"- proxima_acao: {item.next_action}")
if item.evidence:
lines.append("- evidencias:")
for evidence in item.evidence[:8]:
lines.append(f" - `{evidence}`")
lines.append("")
return "\n".join(lines).strip() + "\n"
def assurance_rows(suite: AssuranceSuite) -> list[list[str]]:
rows = [["case_id", "passed", "severity", "required", "title", "reason", "next_action"]]
for item in suite.cases:
rows.append(
[
item.case_id,
"yes" if item.passed else "no",
item.severity,
"yes" if item.required else "no",
item.title,
item.reason,
item.next_action,
]
)
return rows
def assurance_pending_items(suite: AssuranceSuite) -> tuple[str, ...]:
return merge_unique(f"{case.case_id}: {case.next_action}" for case in suite.failed_cases)