auto-sync: tudo-para-ia-mais-humana 2026-05-02 00:10:03

This commit is contained in:
2026-05-02 00:10:03 -03:00
parent 6e230bb979
commit 24923362a7
16 changed files with 1598 additions and 28 deletions

View File

@@ -36,6 +36,8 @@ from .repository_mesh_readiness import build_mesh_readiness_report, write_readin
from .repository_mesh_gitea import build_gitea_mesh_plan, write_gitea_plan_artifacts
from .scanner import environment_summary, scan_ecosystem
from .storage import table_counts
from .targeted_sync_audit import run_targeted_sync_audit
from .workspace_hygiene import run_workspace_hygiene
def build_parser() -> argparse.ArgumentParser:
@@ -101,6 +103,16 @@ def build_parser() -> argparse.ArgumentParser:
access_policy.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
access_policy.add_argument("--central-platform-folder", default="")
access_policy.add_argument("--publication-gate-json", default="")
hygiene = sub.add_parser("workspace-hygiene", help="Inspect or clean approved local artifacts for closeout.")
hygiene.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
hygiene.add_argument("--central-platform-folder", default="")
hygiene.add_argument("--apply", action="store_true")
sync_audit = sub.add_parser("targeted-sync-audit", help="Write safe Git synchronization audit for the active round repos.")
sync_audit.add_argument("--project-root", default="G:/_codex-git/tudo-para-ia-mais-humana")
sync_audit.add_argument("--mcp-repo-root", default="G:/_codex-git/tudo-para-ia-mcps-internos-plataform")
sync_audit.add_argument("--central-repo-root", default="G:/_codex-git/nucleo-gestao-operacional")
sync_audit.add_argument("--central-platform-folder", default="")
sync_audit.add_argument("--fetch", action="store_true")
return parser
@@ -413,6 +425,38 @@ def command_mcp_access_policy(args: argparse.Namespace) -> int:
return 0
def command_workspace_hygiene(args: argparse.Namespace) -> int:
central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None
report, records = run_workspace_hygiene(
project_root=Path(args.project_root),
central_platform_folder=central_platform_folder,
apply=bool(args.apply),
)
payload = {
"report": report.to_dict(),
"generatedFiles": [record.path for record in records],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def command_targeted_sync_audit(args: argparse.Namespace) -> int:
central_platform_folder = Path(args.central_platform_folder) if args.central_platform_folder else None
report, records = run_targeted_sync_audit(
project_root=Path(args.project_root),
mcp_repo_root=Path(args.mcp_repo_root),
central_repo_root=Path(args.central_repo_root),
central_platform_folder=central_platform_folder,
fetch=bool(args.fetch),
)
payload = {
"report": report.to_dict(),
"generatedFiles": [record.path for record in records],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
@@ -446,6 +490,10 @@ def main(argv: list[str] | None = None) -> int:
return command_mcp_publication_gate(args)
if args.command == "mcp-access-policy":
return command_mcp_access_policy(args)
if args.command == "workspace-hygiene":
return command_workspace_hygiene(args)
if args.command == "targeted-sync-audit":
return command_targeted_sync_audit(args)
parser.error(f"unknown command: {args.command}")
return 2

View File

@@ -0,0 +1,529 @@
"""Targeted Git synchronization audit for the Mais Humana round.
This module implements the safe portion of the manual repository sync
procedure for the specific repositories involved in the platform 15 work:
Mais Humana, MCPs Internos, and the operational nucleus/central. It never
resets, restores, rebases, pulls, merges, or pushes. It records status,
branch, origin, HEAD, ahead/behind, optional fetch errors, and a clear decision
about whether automatic synchronization is blocked.
"""
from __future__ import annotations
import csv
import io
import json
import os
import subprocess
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Protocol, Sequence
from .models import GeneratedFile, as_plain_data, merge_unique, utc_now
from .redaction import redact_sensitive_text
class SyncAuditStatus(str, Enum):
"""Final status for one repo observation or the whole report."""
ALIGNED = "aligned"
AHEAD = "ahead"
BEHIND = "behind"
DIVERGED = "diverged"
DIRTY = "dirty"
CREDENTIAL_BLOCKED = "credential_blocked"
ACL_BLOCKED = "acl_blocked"
MISSING = "missing"
NOT_GIT = "not_git"
UNKNOWN = "unknown"
@dataclass(frozen=True, slots=True)
class GitCommandResult:
"""Output from one git command."""
command: tuple[str, ...]
exit_code: int
output: str
@property
def ok(self) -> bool:
return self.exit_code == 0
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class SyncAuditTarget:
"""Repository that must be checked in this round."""
target_id: str
path: str
expected_remote: str
role: str
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class RepoSyncObservation:
"""Safe synchronization observation for one repository."""
target: SyncAuditTarget
exists: bool
is_git: bool
branch: str
head: str
short_head: str
origin: str
status_short: str
dirty: bool
ahead: int | None
behind: int | None
fetch_attempted: bool
fetch_exit: int | None
fetch_output: str
status: SyncAuditStatus
blockers: tuple[str, ...]
decision: str
@property
def clean_for_auto_sync(self) -> bool:
return self.status in {SyncAuditStatus.ALIGNED, SyncAuditStatus.AHEAD, SyncAuditStatus.BEHIND} and not self.dirty
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class TargetedSyncAuditReport:
"""Full targeted sync audit report."""
report_id: str
generated_at: str
fetch: bool
targets: tuple[SyncAuditTarget, ...]
observations: tuple[RepoSyncObservation, ...]
summary: tuple[str, ...]
blockers: tuple[str, ...]
@property
def status(self) -> SyncAuditStatus:
if any(item.status in {SyncAuditStatus.CREDENTIAL_BLOCKED, SyncAuditStatus.ACL_BLOCKED, SyncAuditStatus.DIVERGED} for item in self.observations):
return SyncAuditStatus.DIVERGED
if any(item.status == SyncAuditStatus.DIRTY for item in self.observations):
return SyncAuditStatus.DIRTY
if any(item.status in {SyncAuditStatus.MISSING, SyncAuditStatus.NOT_GIT, SyncAuditStatus.UNKNOWN} for item in self.observations):
return SyncAuditStatus.UNKNOWN
if all(item.status == SyncAuditStatus.ALIGNED for item in self.observations):
return SyncAuditStatus.ALIGNED
return SyncAuditStatus.UNKNOWN
def to_dict(self) -> dict[str, Any]:
data = as_plain_data(self)
data["status"] = self.status.value
return data
class GitRunner(Protocol):
"""Small protocol for testable git execution."""
def run(self, repo_path: Path, args: Sequence[str]) -> GitCommandResult:
"""Run git args in repo_path."""
class SubprocessGitRunner:
"""Git runner using subprocess without shell interpolation."""
def run(self, repo_path: Path, args: Sequence[str]) -> GitCommandResult:
safe = repo_path.resolve(strict=False).as_posix()
command = ("git", "-c", f"safe.directory={safe}", "-C", str(repo_path), *args)
env = os.environ.copy()
env["GIT_TERMINAL_PROMPT"] = "0"
env["GIT_CEILING_DIRECTORIES"] = str(repo_path.resolve(strict=False).parent)
try:
completed = subprocess.run(command, cwd=str(repo_path), env=env, text=True, capture_output=True, timeout=120, check=False)
output = "\n".join(part for part in (completed.stdout, completed.stderr) if part).strip()
return GitCommandResult(tuple(args), int(completed.returncode), redact_sensitive_text(output))
except (OSError, subprocess.TimeoutExpired) as exc:
return GitCommandResult(tuple(args), 1, redact_sensitive_text(f"{type(exc).__name__}: {exc}"))
def default_sync_targets(
*,
project_root: Path,
mcp_repo_root: Path,
central_repo_root: Path,
) -> tuple[SyncAuditTarget, ...]:
"""Return the three repositories that matter for platform 15 closeout."""
return (
SyncAuditTarget(
target_id="mais-humana",
path=str(project_root),
expected_remote="https://git.ami.app.br/admin/tudo-para-ia-mais-humana.git",
role="projeto real da plataforma 15",
),
SyncAuditTarget(
target_id="mcps-internos",
path=str(mcp_repo_root),
expected_remote="https://git.ami.app.br/admin/tudo-para-ia-mcps-internos-plataform.git",
role="control-plane MCP que publica as tools Mais Humana",
),
SyncAuditTarget(
target_id="nucleo-central",
path=str(central_repo_root),
expected_remote="https://git.ami.app.br/admin/nucleo-gestao-operacional.git",
role="nucleo e central de ordem de servico",
),
)
def _first_line(value: str) -> str:
return (value or "").splitlines()[0].strip() if value else ""
def _short(head: str) -> str:
cleaned = head.strip()
return cleaned[:12] if len(cleaned) >= 12 else cleaned
def _parse_ahead_behind(output: str) -> tuple[int | None, int | None]:
parts = [part for part in output.replace("\t", " ").split(" ") if part.strip()]
if len(parts) != 2:
return None, None
try:
behind = int(parts[0])
ahead = int(parts[1])
return ahead, behind
except ValueError:
return None, None
def classify_observation(
*,
exists: bool,
is_git: bool,
dirty: bool,
ahead: int | None,
behind: int | None,
fetch_output: str,
command_outputs: Sequence[str],
) -> tuple[SyncAuditStatus, tuple[str, ...], str]:
"""Classify a repository without suggesting destructive reconciliation."""
combined = "\n".join([fetch_output, *command_outputs])
lower = combined.lower()
if not exists:
return SyncAuditStatus.MISSING, ("path_missing",), "materializar repositorio ausente antes de sincronizar"
if not is_git:
return SyncAuditStatus.NOT_GIT, ("not_a_git_repository",), "confirmar caminho real do repositorio"
if "sec_e_no_credentials" in lower or "could not read username" in lower or "authentication" in lower:
return SyncAuditStatus.CREDENTIAL_BLOCKED, ("git_credentials_unavailable",), "corrigir credencial Git/Schannel e repetir fetch/push seguro"
if "permission denied" in lower or "fetch_head" in lower or "index.lock" in lower:
return SyncAuditStatus.ACL_BLOCKED, ("git_acl_or_lock_blocked",), "corrigir ACL/locks de .git antes de qualquer merge/push"
if dirty:
if behind and behind > 0:
return SyncAuditStatus.DIVERGED, ("dirty_worktree_remote_ahead",), "bloquear pull automatico para preservar alteracao valida mais recente"
return SyncAuditStatus.DIRTY, ("dirty_worktree",), "commit escopado ou revisar alteracoes antes da sincronizacao"
if ahead is None or behind is None:
return SyncAuditStatus.UNKNOWN, ("ahead_behind_unknown",), "repetir rev-list apos fetch funcional"
if ahead > 0 and behind > 0:
return SyncAuditStatus.DIVERGED, (f"ahead={ahead} behind={behind}",), "reconciliacao manual obrigatoria sem reset/rebase destrutivo"
if ahead > 0:
return SyncAuditStatus.AHEAD, (f"ahead={ahead}",), "push seguro apos credencial valida e verificacao de lease"
if behind > 0:
return SyncAuditStatus.BEHIND, (f"behind={behind}",), "merge --ff-only permitido apenas com worktree limpa e decisao de precedencia"
return SyncAuditStatus.ALIGNED, (), "nenhuma acao de sincronizacao necessaria no estado observado"
def observe_repo(target: SyncAuditTarget, *, runner: GitRunner | None = None, fetch: bool = False) -> RepoSyncObservation:
"""Collect safe Git facts for one repository."""
git = runner or SubprocessGitRunner()
path = Path(target.path)
exists = path.exists()
is_git = exists and (path / ".git").exists()
fetch_result = GitCommandResult(("fetch", "--all", "--prune"), 127, "not_run")
branch = ""
head = ""
origin = ""
status_short = ""
dirty = False
ahead: int | None = None
behind: int | None = None
command_outputs: list[str] = []
if is_git:
if fetch:
fetch_result = git.run(path, ("fetch", "--all", "--prune"))
branch_result = git.run(path, ("branch", "--show-current"))
head_result = git.run(path, ("rev-parse", "HEAD"))
origin_result = git.run(path, ("remote", "get-url", "origin"))
status_result = git.run(path, ("status", "--short", "--branch"))
porcelain_result = git.run(path, ("status", "--porcelain", "--untracked-files=all"))
branch = _first_line(branch_result.output)
head = _first_line(head_result.output)
origin = _first_line(origin_result.output)
status_short = status_result.output.strip()
dirty = bool(porcelain_result.output.strip())
command_outputs.extend([branch_result.output, head_result.output, origin_result.output, status_result.output, porcelain_result.output])
if any("not a git repository" in result.output.lower() for result in (branch_result, head_result, status_result)):
is_git = False
dirty = False
if branch:
counts_result = git.run(path, ("rev-list", "--left-right", "--count", f"origin/{branch}...{branch}"))
ahead, behind = _parse_ahead_behind(counts_result.output)
command_outputs.append(counts_result.output)
status, blockers, decision = classify_observation(
exists=exists,
is_git=is_git,
dirty=dirty,
ahead=ahead,
behind=behind,
fetch_output=fetch_result.output if fetch else "",
command_outputs=command_outputs,
)
return RepoSyncObservation(
target=target,
exists=exists,
is_git=is_git,
branch=branch,
head=head,
short_head=_short(head),
origin=origin,
status_short=status_short,
dirty=dirty,
ahead=ahead,
behind=behind,
fetch_attempted=fetch,
fetch_exit=fetch_result.exit_code if fetch else None,
fetch_output=redact_sensitive_text(fetch_result.output if fetch else ""),
status=status,
blockers=blockers,
decision=decision,
)
def build_targeted_sync_audit(
*,
targets: Sequence[SyncAuditTarget],
runner: GitRunner | None = None,
fetch: bool = False,
) -> TargetedSyncAuditReport:
"""Build the targeted sync audit report."""
observations = tuple(observe_repo(target, runner=runner, fetch=fetch) for target in targets)
blockers = merge_unique(
f"{item.target.target_id}:{blocker}"
for item in observations
for blocker in item.blockers
)
summary = (
f"Repositories observed: {len(observations)}.",
f"Fetch attempted: {fetch}.",
f"Aligned: {sum(1 for item in observations if item.status == SyncAuditStatus.ALIGNED)}.",
f"Blocked or divergent: {sum(1 for item in observations if item.blockers)}.",
"No reset, restore, rebase, pull, merge, or push was executed by this audit.",
)
report_id = "targeted-sync-audit-" + str(abs(hash(json.dumps([item.to_dict() for item in observations], sort_keys=True))))[:12]
return TargetedSyncAuditReport(
report_id=report_id,
generated_at=utc_now(),
fetch=fetch,
targets=tuple(targets),
observations=observations,
summary=summary,
blockers=blockers,
)
def sync_audit_csv(report: TargetedSyncAuditReport) -> str:
"""Render observations as CSV."""
rows = [["target_id", "path", "branch", "head", "ahead", "behind", "dirty", "status", "origin", "fetch_exit", "decision", "blockers"]]
for item in report.observations:
rows.append(
[
item.target.target_id,
item.target.path,
item.branch,
item.short_head,
"" if item.ahead is None else str(item.ahead),
"" if item.behind is None else str(item.behind),
"yes" if item.dirty else "no",
item.status.value,
item.origin,
"" if item.fetch_exit is None else str(item.fetch_exit),
item.decision,
"; ".join(item.blockers),
]
)
buffer = io.StringIO()
writer = csv.writer(buffer, lineterminator="\n")
writer.writerows(rows)
return buffer.getvalue()
def sync_audit_markdown(report: TargetedSyncAuditReport) -> str:
"""Render the targeted sync audit as Markdown."""
lines = [
"# Targeted Git Sync Audit",
"",
f"- report_id: `{report.report_id}`",
f"- generated_at: `{report.generated_at}`",
f"- status: `{report.status.value}`",
f"- fetch: `{report.fetch}`",
"",
"## Summary",
"",
]
lines.extend(f"- {item}" for item in report.summary)
lines.extend(["", "## Repositories", ""])
for item in report.observations:
lines.extend(
[
f"### {item.target.target_id}",
"",
f"- role: {item.target.role}",
f"- path: `{item.target.path}`",
f"- exists: `{item.exists}`",
f"- git: `{item.is_git}`",
f"- branch: `{item.branch}`",
f"- head: `{item.short_head}`",
f"- ahead: `{item.ahead}`",
f"- behind: `{item.behind}`",
f"- dirty: `{item.dirty}`",
f"- origin: `{item.origin}`",
f"- expected_origin: `{item.target.expected_remote}`",
f"- status: `{item.status.value}`",
f"- decision: {item.decision}",
]
)
if item.fetch_attempted:
lines.append(f"- fetch_exit: `{item.fetch_exit}`")
if item.fetch_output:
lines.append(f"- fetch_output: `{item.fetch_output[:500]}`")
if item.blockers:
lines.append("- blockers:")
lines.extend(f" - `{blocker}`" for blocker in item.blockers)
lines.append("")
lines.extend(["## Blockers", ""])
if report.blockers:
lines.extend(f"- `{item}`" for item in report.blockers)
else:
lines.append("- Nenhum blocker de sincronizacao no escopo auditado.")
return "\n".join(lines).strip() + "\n"
def sync_audit_artifact_records(project_root: Path, central_platform_folder: Path | None = None) -> tuple[GeneratedFile, ...]:
"""Return semantic records for sync audit artifacts."""
records = [
GeneratedFile(
path=str(project_root / "dados" / "targeted-sync-audit.json"),
description="Auditoria Git segura dos repositorios da rodada Mais Humana.",
function="targeted git sync audit",
file_type="json",
changed_by="mais_humana.targeted_sync_audit",
change_summary="Registrado status Git, fetch, ahead/behind, bloqueios de credencial/ACL e decisao segura.",
relation_to_order="0033_EXECUTIVA__sincronizar-git-mais-humana-mcps-central-com-credenciais",
),
GeneratedFile(
path=str(project_root / "matrizes" / "targeted-sync-audit.csv"),
description="Matriz de sincronizacao Git escopada.",
function="targeted git sync matrix",
file_type="csv",
changed_by="mais_humana.targeted_sync_audit",
change_summary="Criada matriz de repos, hashes, divergencias e bloqueios.",
relation_to_order="0033_EXECUTIVA__sincronizar-git-mais-humana-mcps-central-com-credenciais",
),
GeneratedFile(
path=str(project_root / "ecossistema" / "TARGETED-SYNC-AUDIT.md"),
description="Relatorio humano da sincronizacao Git escopada.",
function="targeted git sync report",
file_type="markdown",
changed_by="mais_humana.targeted_sync_audit",
change_summary="Criado relatorio de sincronizacao sem operacao destrutiva.",
relation_to_order="0033_EXECUTIVA__sincronizar-git-mais-humana-mcps-central-com-credenciais",
),
]
if central_platform_folder is not None:
records.append(
GeneratedFile(
path=str(central_platform_folder / "reports" / "EXECUTADO__targeted-sync-audit.md"),
description="Copia central da auditoria Git escopada.",
function="targeted git sync central report",
file_type="markdown",
changed_by="mais_humana.targeted_sync_audit",
change_summary="Registrado estado Git da rodada na pasta central.",
relation_to_order="0033_EXECUTIVA__sincronizar-git-mais-humana-mcps-central-com-credenciais",
)
)
return tuple(records)
def write_sync_audit_artifacts(
report: TargetedSyncAuditReport,
project_root: Path,
*,
central_platform_folder: Path | None = None,
) -> tuple[GeneratedFile, ...]:
"""Write project and optional central sync audit artifacts."""
targets: list[tuple[Path, str]] = [
(project_root / "dados" / "targeted-sync-audit.json", json.dumps(report.to_dict(), ensure_ascii=False, indent=2, sort_keys=True)),
(project_root / "matrizes" / "targeted-sync-audit.csv", sync_audit_csv(report)),
(project_root / "ecossistema" / "TARGETED-SYNC-AUDIT.md", sync_audit_markdown(report)),
]
records = list(sync_audit_artifact_records(project_root, central_platform_folder))
central_failures: list[dict[str, str]] = []
if central_platform_folder is not None:
targets.append((central_platform_folder / "reports" / "EXECUTADO__targeted-sync-audit.md", sync_audit_markdown(report)))
for path, content in targets:
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
except OSError as exc:
if central_platform_folder is not None and central_platform_folder in path.parents:
central_failures.append({"path": str(path), "error": f"{type(exc).__name__}: {exc}"})
continue
raise
if central_failures:
status_path = project_root / "dados" / "targeted-sync-audit-central-write-status.json"
status_path.write_text(
json.dumps({"generatedAt": utc_now(), "ok": False, "failures": central_failures}, ensure_ascii=False, indent=2, sort_keys=True),
encoding="utf-8",
)
records.append(
GeneratedFile(
path=str(status_path),
description="Status de escrita central da auditoria Git escopada.",
function="targeted git sync central write status",
file_type="json",
changed_by="mais_humana.targeted_sync_audit",
change_summary="Registrada falha de escrita central sem abortar artefatos do projeto real.",
relation_to_order="0034_EXECUTIVA__corrigir-acl-escrita-central-e-sql-semantico-plataforma-15",
)
)
return tuple(records)
def run_targeted_sync_audit(
*,
project_root: Path,
mcp_repo_root: Path,
central_repo_root: Path,
central_platform_folder: Path | None = None,
fetch: bool = False,
runner: GitRunner | None = None,
) -> tuple[TargetedSyncAuditReport, tuple[GeneratedFile, ...]]:
"""Run the safe targeted sync audit and write artifacts."""
targets = default_sync_targets(project_root=project_root, mcp_repo_root=mcp_repo_root, central_repo_root=central_repo_root)
report = build_targeted_sync_audit(targets=targets, runner=runner, fetch=fetch)
records = write_sync_audit_artifacts(report, project_root, central_platform_folder=central_platform_folder)
return report, records

View File

@@ -0,0 +1,535 @@
"""Local workspace hygiene checks for service-order closeout.
The Mais Humana rounds generate Python test scratch folders and may touch
JavaScript/Cloudflare workspaces while validating MCP publication. This module
turns cleanup into an auditable operation instead of an informal shell step:
it verifies target paths, deletes only approved local artifacts when requested,
and writes redacted project/central reports that explain any ACL retention.
"""
from __future__ import annotations
import csv
import io
import json
import os
import shutil
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Sequence
from .models import GeneratedFile, as_plain_data, merge_unique, utc_now
DEFAULT_LOCAL_ARTIFACTS = (
".test-tmp",
"node_modules",
)
class HygieneStatus(str, Enum):
"""Result for a cleanup target or whole report."""
PASSED = "passed"
PARTIAL = "partial"
BLOCKED = "blocked"
NOT_FOUND = "not_found"
NOT_RUN = "not_run"
class HygieneActionKind(str, Enum):
"""Action planned or executed for one local artifact."""
DELETE_DIRECTORY = "delete_directory"
DELETE_FILE = "delete_file"
VERIFY_ABSENT = "verify_absent"
BLOCK_UNSAFE_PATH = "block_unsafe_path"
@dataclass(frozen=True, slots=True)
class HygieneTarget:
"""Approved local cleanup target."""
target_id: str
relative_path: str
reason: str
required_absent: bool = True
delete_when_apply: bool = True
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class PathFootprint:
"""Best-effort path size and child count."""
exists: bool
is_dir: bool
is_file: bool
child_count: int
byte_count: int
errors: tuple[str, ...]
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class HygieneAction:
"""Observed cleanup action for one target."""
target_id: str
path: str
action: HygieneActionKind
status: HygieneStatus
applied: bool
deleted: bool
footprint_before: PathFootprint
footprint_after: PathFootprint
error: str = ""
note: str = ""
@property
def clean(self) -> bool:
return self.status in {HygieneStatus.PASSED, HygieneStatus.NOT_FOUND} and not self.footprint_after.exists
def to_dict(self) -> dict[str, Any]:
return as_plain_data(self)
@dataclass(frozen=True, slots=True)
class WorkspaceHygieneReport:
"""Full hygiene report for a platform workspace."""
report_id: str
generated_at: str
project_root: str
central_platform_folder: str
apply: bool
targets: tuple[HygieneTarget, ...]
actions: tuple[HygieneAction, ...]
summary: tuple[str, ...]
blockers: tuple[str, ...]
@property
def status(self) -> HygieneStatus:
if any(action.status == HygieneStatus.BLOCKED for action in self.actions):
return HygieneStatus.BLOCKED
if any(action.status in {HygieneStatus.PARTIAL, HygieneStatus.NOT_RUN} for action in self.actions):
return HygieneStatus.PARTIAL
return HygieneStatus.PASSED
@property
def clean(self) -> bool:
return all(action.clean for action in self.actions)
def to_dict(self) -> dict[str, Any]:
data = as_plain_data(self)
data["status"] = self.status.value
data["clean"] = self.clean
return data
def default_hygiene_targets() -> tuple[HygieneTarget, ...]:
"""Return local artifact targets allowed for automated cleanup."""
return (
HygieneTarget(
target_id="python-test-temp",
relative_path=".test-tmp",
reason="scratch directory created by Python unit tests; must not survive closeout",
),
HygieneTarget(
target_id="node-dependencies",
relative_path="node_modules",
reason="local Node dependency directory; must not be versioned or retained after local tests",
),
)
def _norm(path: Path) -> str:
return os.path.normcase(os.path.abspath(str(path)))
def safe_child_path(root: Path, relative_path: str) -> Path:
"""Resolve a target and reject paths outside the project root."""
if Path(relative_path).is_absolute():
raise ValueError(f"absolute cleanup target is not allowed: {relative_path}")
root_resolved = root.resolve(strict=False)
target = (root_resolved / relative_path).resolve(strict=False)
try:
common = os.path.commonpath([_norm(root_resolved), _norm(target)])
except ValueError as exc:
raise ValueError(f"cleanup target crosses drive boundary: {relative_path}") from exc
if common != _norm(root_resolved):
raise ValueError(f"cleanup target escapes project root: {relative_path}")
return target
def _footprint(path: Path, *, max_errors: int = 8) -> PathFootprint:
if not path.exists():
return PathFootprint(False, False, False, 0, 0, ())
if path.is_file():
try:
size = path.stat().st_size
return PathFootprint(True, False, True, 1, int(size), ())
except OSError as exc:
return PathFootprint(True, False, True, 1, 0, (f"{type(exc).__name__}: {exc}",))
child_count = 0
byte_count = 0
errors: list[str] = []
for current_root, dirnames, filenames in os.walk(path, topdown=True, onerror=lambda exc: errors.append(f"{type(exc).__name__}: {exc}")):
child_count += len(dirnames) + len(filenames)
for filename in filenames:
try:
byte_count += int((Path(current_root) / filename).stat().st_size)
except OSError as exc:
if len(errors) < max_errors:
errors.append(f"{type(exc).__name__}: {exc}")
if len(errors) >= max_errors:
errors.append("error_limit_reached")
break
return PathFootprint(True, True, False, child_count, byte_count, tuple(errors[:max_errors]))
def _delete_path(path: Path) -> tuple[bool, str]:
try:
if not path.exists():
return False, ""
if path.is_dir():
shutil.rmtree(path)
return True, ""
path.unlink()
return True, ""
except OSError as exc:
return False, f"{type(exc).__name__}: {exc}"
def run_hygiene_actions(
project_root: Path,
*,
apply: bool = False,
targets: Sequence[HygieneTarget] | None = None,
) -> tuple[HygieneAction, ...]:
"""Inspect and optionally remove approved local artifacts."""
action_rows: list[HygieneAction] = []
for target in tuple(targets or default_hygiene_targets()):
try:
path = safe_child_path(project_root, target.relative_path)
except ValueError as exc:
empty = PathFootprint(False, False, False, 0, 0, ())
action_rows.append(
HygieneAction(
target_id=target.target_id,
path=str(project_root / target.relative_path),
action=HygieneActionKind.BLOCK_UNSAFE_PATH,
status=HygieneStatus.BLOCKED,
applied=False,
deleted=False,
footprint_before=empty,
footprint_after=empty,
error=str(exc),
note="unsafe path blocked before filesystem write",
)
)
continue
before = _footprint(path)
if not before.exists:
action_rows.append(
HygieneAction(
target_id=target.target_id,
path=str(path),
action=HygieneActionKind.VERIFY_ABSENT,
status=HygieneStatus.NOT_FOUND,
applied=False,
deleted=False,
footprint_before=before,
footprint_after=before,
note="target already absent",
)
)
continue
kind = HygieneActionKind.DELETE_DIRECTORY if before.is_dir else HygieneActionKind.DELETE_FILE
if not apply or not target.delete_when_apply:
action_rows.append(
HygieneAction(
target_id=target.target_id,
path=str(path),
action=kind,
status=HygieneStatus.NOT_RUN,
applied=False,
deleted=False,
footprint_before=before,
footprint_after=before,
note="dry run; use --apply to remove approved artifact",
)
)
continue
deleted, error = _delete_path(path)
after = _footprint(path)
status = HygieneStatus.PASSED if deleted and not after.exists else HygieneStatus.BLOCKED
note = "removed approved local artifact" if status == HygieneStatus.PASSED else "artifact retained by ACL or filesystem lock"
action_rows.append(
HygieneAction(
target_id=target.target_id,
path=str(path),
action=kind,
status=status,
applied=True,
deleted=deleted,
footprint_before=before,
footprint_after=after,
error=error,
note=note,
)
)
return tuple(action_rows)
def build_workspace_hygiene_report(
project_root: Path,
*,
central_platform_folder: Path | None = None,
apply: bool = False,
targets: Sequence[HygieneTarget] | None = None,
) -> WorkspaceHygieneReport:
"""Build a hygiene report for project closeout."""
target_set = tuple(targets or default_hygiene_targets())
actions = run_hygiene_actions(project_root, apply=apply, targets=target_set)
blockers = merge_unique(
f"{action.target_id}:{action.error or action.note}"
for action in actions
if action.status == HygieneStatus.BLOCKED
)
removed = sum(1 for action in actions if action.deleted)
already_absent = sum(1 for action in actions if action.status == HygieneStatus.NOT_FOUND)
retained = sum(1 for action in actions if action.footprint_after.exists)
summary = (
f"Targets evaluated: {len(actions)}.",
f"Apply mode: {apply}.",
f"Removed artifacts: {removed}.",
f"Already absent: {already_absent}.",
f"Artifacts still present: {retained}.",
"Only approved project-local artifacts are eligible for deletion.",
)
seed = json.dumps(
{
"projectRoot": str(project_root),
"actions": [action.to_dict() for action in actions],
"generatedAt": utc_now(),
},
ensure_ascii=False,
sort_keys=True,
)
report_id = "workspace-hygiene-" + str(abs(hash(seed)))[:12]
return WorkspaceHygieneReport(
report_id=report_id,
generated_at=utc_now(),
project_root=str(project_root),
central_platform_folder=str(central_platform_folder or ""),
apply=apply,
targets=target_set,
actions=actions,
summary=summary,
blockers=blockers,
)
def hygiene_csv(report: WorkspaceHygieneReport) -> str:
"""Render hygiene target status as CSV."""
rows = [["target_id", "path", "action", "status", "applied", "deleted", "exists_after", "children_before", "bytes_before", "error", "note"]]
for action in report.actions:
rows.append(
[
action.target_id,
action.path,
action.action.value,
action.status.value,
"yes" if action.applied else "no",
"yes" if action.deleted else "no",
"yes" if action.footprint_after.exists else "no",
str(action.footprint_before.child_count),
str(action.footprint_before.byte_count),
action.error,
action.note,
]
)
buffer = io.StringIO()
writer = csv.writer(buffer, lineterminator="\n")
writer.writerows(rows)
return buffer.getvalue()
def hygiene_markdown(report: WorkspaceHygieneReport) -> str:
"""Render human-readable hygiene evidence."""
lines = [
"# Workspace Hygiene Report",
"",
f"- report_id: `{report.report_id}`",
f"- generated_at: `{report.generated_at}`",
f"- project_root: `{report.project_root}`",
f"- central_platform_folder: `{report.central_platform_folder}`",
f"- status: `{report.status.value}`",
f"- clean: `{report.clean}`",
f"- apply: `{report.apply}`",
"",
"## Summary",
"",
]
lines.extend(f"- {item}" for item in report.summary)
lines.extend(["", "## Targets", ""])
for action in report.actions:
lines.extend(
[
f"### {action.target_id}",
"",
f"- path: `{action.path}`",
f"- action: `{action.action.value}`",
f"- status: `{action.status.value}`",
f"- applied: `{action.applied}`",
f"- deleted: `{action.deleted}`",
f"- exists_after: `{action.footprint_after.exists}`",
f"- children_before: `{action.footprint_before.child_count}`",
f"- bytes_before: `{action.footprint_before.byte_count}`",
f"- note: {action.note}",
]
)
if action.error:
lines.append(f"- error: `{action.error}`")
if action.footprint_before.errors:
lines.append("- footprint_errors:")
lines.extend(f" - `{item}`" for item in action.footprint_before.errors)
lines.append("")
lines.extend(["## Blockers", ""])
if report.blockers:
lines.extend(f"- `{item}`" for item in report.blockers)
else:
lines.append("- Nenhum blocker de higiene local.")
return "\n".join(lines).strip() + "\n"
def hygiene_artifact_records(project_root: Path, central_platform_folder: Path | None = None) -> tuple[GeneratedFile, ...]:
"""Return semantic records for hygiene artifacts."""
records = [
GeneratedFile(
path=str(project_root / "dados" / "workspace-hygiene-report.json"),
description="Relatorio estruturado de limpeza operacional local.",
function="workspace hygiene report",
file_type="json",
changed_by="mais_humana.workspace_hygiene",
change_summary="Registrada limpeza de .test-tmp e node_modules com bloqueios ACL quando houver.",
relation_to_order="0036_EXECUTIVA__normalizar-limpeza-test-tmp-e-acl-local",
),
GeneratedFile(
path=str(project_root / "matrizes" / "workspace-hygiene-targets.csv"),
description="Matriz de alvos de higiene local e estado final.",
function="workspace hygiene matrix",
file_type="csv",
changed_by="mais_humana.workspace_hygiene",
change_summary="Criada matriz auditavel de artefatos locais removidos, ausentes ou retidos.",
relation_to_order="0036_EXECUTIVA__normalizar-limpeza-test-tmp-e-acl-local",
),
GeneratedFile(
path=str(project_root / "ecossistema" / "WORKSPACE-HYGIENE-REPORT.md"),
description="Relatorio humano de higiene local da rodada.",
function="workspace hygiene human report",
file_type="markdown",
changed_by="mais_humana.workspace_hygiene",
change_summary="Criado relatorio de fechamento de limpeza operacional.",
relation_to_order="0036_EXECUTIVA__normalizar-limpeza-test-tmp-e-acl-local",
),
]
if central_platform_folder is not None:
records.append(
GeneratedFile(
path=str(central_platform_folder / "reports" / "EXECUTADO__workspace-hygiene.md"),
description="Copia central da higiene local da plataforma Mais Humana.",
function="workspace hygiene central report",
file_type="markdown",
changed_by="mais_humana.workspace_hygiene",
change_summary="Registrado estado de .test-tmp e node_modules na pasta central.",
relation_to_order="0036_EXECUTIVA__normalizar-limpeza-test-tmp-e-acl-local",
)
)
return tuple(records)
def write_hygiene_artifacts(
report: WorkspaceHygieneReport,
project_root: Path,
*,
central_platform_folder: Path | None = None,
) -> tuple[GeneratedFile, ...]:
"""Write project and optional central hygiene artifacts."""
targets: list[tuple[Path, str]] = [
(project_root / "dados" / "workspace-hygiene-report.json", json.dumps(report.to_dict(), ensure_ascii=False, indent=2, sort_keys=True)),
(project_root / "matrizes" / "workspace-hygiene-targets.csv", hygiene_csv(report)),
(project_root / "ecossistema" / "WORKSPACE-HYGIENE-REPORT.md", hygiene_markdown(report)),
]
records = list(hygiene_artifact_records(project_root, central_platform_folder))
central_failures: list[dict[str, str]] = []
if central_platform_folder is not None:
targets.append((central_platform_folder / "reports" / "EXECUTADO__workspace-hygiene.md", hygiene_markdown(report)))
for path, content in targets:
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
except OSError as exc:
if central_platform_folder is not None and central_platform_folder in path.parents:
central_failures.append({"path": str(path), "error": f"{type(exc).__name__}: {exc}"})
continue
raise
if central_failures:
status_path = project_root / "dados" / "workspace-hygiene-central-write-status.json"
status_path.write_text(
json.dumps(
{
"generatedAt": utc_now(),
"centralPlatformFolder": str(central_platform_folder),
"ok": False,
"failures": central_failures,
},
ensure_ascii=False,
indent=2,
sort_keys=True,
),
encoding="utf-8",
)
records.append(
GeneratedFile(
path=str(status_path),
description="Status de escrita central do relatorio de higiene local.",
function="workspace hygiene central write status",
file_type="json",
changed_by="mais_humana.workspace_hygiene",
change_summary="Registrada falha de escrita central sem abortar artefatos do projeto real.",
relation_to_order="0034_EXECUTIVA__corrigir-acl-escrita-central-e-sql-semantico-plataforma-15",
)
)
return tuple(records)
def run_workspace_hygiene(
*,
project_root: Path,
central_platform_folder: Path | None = None,
apply: bool = False,
) -> tuple[WorkspaceHygieneReport, tuple[GeneratedFile, ...]]:
"""Execute the hygiene gate and write artifacts."""
report = build_workspace_hygiene_report(project_root, central_platform_folder=central_platform_folder, apply=apply)
records = write_hygiene_artifacts(report, project_root, central_platform_folder=central_platform_folder)
return report, records