"""Gitea planning helpers for repository mesh consolidation. The permanent order asks the mesh to be consolidated in Gitea. This module keeps the Gitea part explicit and safe: it can build API requests, classify responses, and plan repository creation/rename work, but it does not hide missing credentials or assume a remote exists just because a local directory does. """ from __future__ import annotations import base64 import json import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Any, Mapping, Sequence from .models import GeneratedFile, as_plain_data, merge_unique, utc_now from .repository_mesh import MeshReport, RepositoryTarget, rows_to_csv, stable_digest class GiteaRepoStatus(str, Enum): """Remote repository status.""" EXISTS = "exists" MISSING = "missing" UNAUTHORIZED = "unauthorized" FORBIDDEN = "forbidden" NETWORK_ERROR = "network_error" UNKNOWN = "unknown" class GiteaPlannedAction(str, Enum): """Gitea actions that may be required.""" NONE = "none" VERIFY_REPOSITORY = "verify_repository" CREATE_REPOSITORY = "create_repository" RENAME_REPOSITORY = "rename_repository" CONFIGURE_MIRROR = "configure_mirror" REQUIRE_TOKEN = "require_token" REQUIRE_OWNER_DECISION = "require_owner_decision" @dataclass(frozen=True, slots=True) class GiteaEndpoint: base_url: str owner: str repo: str @property def api_repo_path(self) -> str: owner = urllib.parse.quote(self.owner, safe="") repo = urllib.parse.quote(self.repo, safe="") return f"/api/v1/repos/{owner}/{repo}" @property def web_url(self) -> str: return f"{self.base_url.rstrip('/')}/{self.owner}/{self.repo}" @property def clone_url(self) -> str: return f"{self.web_url}.git" @dataclass(frozen=True, slots=True) class GiteaAuth: token: str | None = None username: str | None = None password: str | None = None @property def available(self) -> bool: return bool(self.token or (self.username and self.password)) def headers(self) -> dict[str, str]: headers = {"Accept": "application/json"} if self.token: headers["Authorization"] = f"token {self.token}" elif self.username and self.password: raw = f"{self.username}:{self.password}".encode("utf-8") headers["Authorization"] = "Basic " + base64.b64encode(raw).decode("ascii") return headers def redacted_label(self) -> str: if self.token: return "token:" if self.username and self.password: return f"basic:{self.username}:" return "none" @dataclass(frozen=True, slots=True) class GiteaResponse: status: GiteaRepoStatus http_status: int | None url: str payload: Mapping[str, Any] error: str = "" @property def ok(self) -> bool: return self.status == GiteaRepoStatus.EXISTS def to_dict(self) -> dict[str, Any]: return as_plain_data(self) @dataclass(frozen=True, slots=True) class GiteaRepositoryPlan: plan_id: str declared_name: str expected_local_name: str owner: str repo: str clone_url: str status: GiteaRepoStatus actions: tuple[GiteaPlannedAction, ...] reason: str commands: tuple[str, ...] api_requests: tuple[str, ...] evidence: tuple[str, ...] @property def blocked(self) -> bool: return any(action in {GiteaPlannedAction.REQUIRE_TOKEN, GiteaPlannedAction.REQUIRE_OWNER_DECISION} for action in self.actions) def to_dict(self) -> dict[str, Any]: return as_plain_data(self) @dataclass(frozen=True, slots=True) class GiteaMeshPlan: mesh_plan_id: str generated_at: str base_url: str authenticated_as: str repositories: tuple[GiteaRepositoryPlan, ...] @property def blocked_count(self) -> int: return sum(1 for repo in self.repositories if repo.blocked) @property def missing_count(self) -> int: return sum(1 for repo in self.repositories if repo.status == GiteaRepoStatus.MISSING) @property def exists_count(self) -> int: return sum(1 for repo in self.repositories if repo.status == GiteaRepoStatus.EXISTS) def to_dict(self) -> dict[str, Any]: return as_plain_data(self) def endpoint_for_target(target: RepositoryTarget, base_url: str = "https://git.ami.app.br") -> GiteaEndpoint: owner, _, repo = target.gitea_repo.partition("/") return GiteaEndpoint(base_url=base_url.rstrip("/"), owner=owner or "admin", repo=repo or target.expected_local_name) def classify_http_error(status: int | None, error: str = "") -> GiteaRepoStatus: if status == 200: return GiteaRepoStatus.EXISTS if status == 404: return GiteaRepoStatus.MISSING if status == 401: return GiteaRepoStatus.UNAUTHORIZED if status == 403: return GiteaRepoStatus.FORBIDDEN if status is None and error: return GiteaRepoStatus.NETWORK_ERROR return GiteaRepoStatus.UNKNOWN def request_gitea_repo(endpoint: GiteaEndpoint, auth: GiteaAuth, timeout: int = 15) -> GiteaResponse: url = endpoint.base_url.rstrip("/") + endpoint.api_repo_path request = urllib.request.Request(url, headers=auth.headers(), method="GET") try: with urllib.request.urlopen(request, timeout=timeout) as response: text = response.read().decode("utf-8", errors="replace") payload = json.loads(text) if text.strip() else {} return GiteaResponse(GiteaRepoStatus.EXISTS, response.status, url, payload) except urllib.error.HTTPError as exc: text = exc.read().decode("utf-8", errors="replace") try: payload = json.loads(text) if text.strip() else {} except json.JSONDecodeError: payload = {"raw": text} return GiteaResponse(classify_http_error(exc.code), exc.code, url, payload, str(exc)) except (urllib.error.URLError, TimeoutError, OSError) as exc: return GiteaResponse(GiteaRepoStatus.NETWORK_ERROR, None, url, {}, str(exc)) def infer_status_from_report(target: RepositoryTarget, report: MeshReport) -> GiteaRepoStatus: for summary in report.summaries: if summary.target.declared_name != target.declared_name: continue if summary.credential_error_count: return GiteaRepoStatus.UNAUTHORIZED if any(summary.hash_set): return GiteaRepoStatus.EXISTS if summary.missing_count == len(summary.observations): return GiteaRepoStatus.UNKNOWN return GiteaRepoStatus.UNKNOWN def plan_for_target( target: RepositoryTarget, *, endpoint: GiteaEndpoint, status: GiteaRepoStatus, auth: GiteaAuth, ) -> GiteaRepositoryPlan: actions: list[GiteaPlannedAction] = [] commands: list[str] = [] api_requests: list[str] = [] evidence: list[str] = [endpoint.clone_url] reason = "repositorio Gitea verificado" if not auth.available: actions.append(GiteaPlannedAction.REQUIRE_TOKEN) reason = "credencial Gitea ausente; nao e seguro declarar existencia/criacao remota" if status == GiteaRepoStatus.EXISTS: actions.append(GiteaPlannedAction.VERIFY_REPOSITORY) api_requests.append(f"GET {endpoint.api_repo_path}") elif status == GiteaRepoStatus.MISSING: actions.append(GiteaPlannedAction.CREATE_REPOSITORY) api_requests.append(f"POST /api/v1/orgs/{endpoint.owner}/repos") commands.append(f"curl -X POST {endpoint.base_url}/api/v1/orgs/{endpoint.owner}/repos -d '{{\"name\":\"{endpoint.repo}\"}}'") reason = "repositorio Gitea nao encontrado; criar somente com token e owner confirmados" elif status in {GiteaRepoStatus.UNAUTHORIZED, GiteaRepoStatus.FORBIDDEN}: actions.append(GiteaPlannedAction.REQUIRE_TOKEN) reason = "Gitea respondeu sem permissao suficiente" elif status == GiteaRepoStatus.NETWORK_ERROR: actions.append(GiteaPlannedAction.VERIFY_REPOSITORY) reason = "erro de rede impede verificar repositório central" else: actions.append(GiteaPlannedAction.VERIFY_REPOSITORY) reason = "estado remoto nao confirmado" if target.requires_nominal_reconciliation: actions.append(GiteaPlannedAction.REQUIRE_OWNER_DECISION) evidence.extend(target.notes) if target.canonical_name and target.canonical_name != endpoint.repo: actions.append(GiteaPlannedAction.RENAME_REPOSITORY) api_requests.append(f"PATCH {endpoint.api_repo_path} name={target.canonical_name}") reason += "; renome remoto depende de decisao institucional" actions = list(merge_unique(action.value for action in actions)) action_enums = tuple(GiteaPlannedAction(value) for value in actions) seed = { "target": target.declared_name, "status": status.value, "actions": [action.value for action in action_enums], "endpoint": endpoint.clone_url, } return GiteaRepositoryPlan( plan_id=f"gitea-repo-plan-{stable_digest(seed, 12)}", declared_name=target.declared_name, expected_local_name=target.expected_local_name, owner=endpoint.owner, repo=endpoint.repo, clone_url=endpoint.clone_url, status=status, actions=action_enums, reason=reason, commands=tuple(commands), api_requests=tuple(api_requests), evidence=tuple(evidence), ) def build_gitea_mesh_plan( report: MeshReport, *, base_url: str = "https://git.ami.app.br", auth: GiteaAuth | None = None, live_check: bool = False, ) -> GiteaMeshPlan: auth = auth or GiteaAuth() plans: list[GiteaRepositoryPlan] = [] for target in report.targets: endpoint = endpoint_for_target(target, base_url) if live_check and auth.available: response = request_gitea_repo(endpoint, auth) status = response.status evidence = (response.url, response.error) else: status = infer_status_from_report(target, report) evidence = () plan = plan_for_target(target, endpoint=endpoint, status=status, auth=auth) if evidence: plan = GiteaRepositoryPlan( plan_id=plan.plan_id, declared_name=plan.declared_name, expected_local_name=plan.expected_local_name, owner=plan.owner, repo=plan.repo, clone_url=plan.clone_url, status=plan.status, actions=plan.actions, reason=plan.reason, commands=plan.commands, api_requests=plan.api_requests, evidence=merge_unique((*plan.evidence, *evidence)), ) plans.append(plan) seed = {"report": report.report_id, "base": base_url, "plans": [plan.plan_id for plan in plans]} return GiteaMeshPlan( mesh_plan_id=f"gitea-mesh-plan-{stable_digest(seed, 12)}", generated_at=utc_now(), base_url=base_url.rstrip("/"), authenticated_as=auth.redacted_label(), repositories=tuple(plans), ) def gitea_plan_rows(plan: GiteaMeshPlan) -> list[list[str]]: rows = [["plan_id", "declared_name", "owner", "repo", "status", "blocked", "actions", "reason", "api_requests"]] for repo in plan.repositories: rows.append( [ repo.plan_id, repo.declared_name, repo.owner, repo.repo, repo.status.value, "yes" if repo.blocked else "no", " | ".join(action.value for action in repo.actions), repo.reason, " | ".join(repo.api_requests), ] ) return rows def gitea_plan_csv(plan: GiteaMeshPlan) -> str: return rows_to_csv(gitea_plan_rows(plan)) def gitea_plan_markdown(plan: GiteaMeshPlan) -> str: lines = [ "# Gitea Repository Mesh Plan", "", f"- mesh_plan_id: `{plan.mesh_plan_id}`", f"- generated_at: `{plan.generated_at}`", f"- base_url: `{plan.base_url}`", f"- authenticated_as: `{plan.authenticated_as}`", f"- exists: `{plan.exists_count}`", f"- missing: `{plan.missing_count}`", f"- blocked: `{plan.blocked_count}`", "", "## Repositorios", "", ] for repo in plan.repositories: lines.append(f"### {repo.declared_name}") lines.append("") lines.append(f"- clone_url: `{repo.clone_url}`") lines.append(f"- status: `{repo.status.value}`") lines.append(f"- blocked: `{repo.blocked}`") lines.append(f"- actions: {', '.join(action.value for action in repo.actions)}") lines.append(f"- reason: {repo.reason}") if repo.api_requests: lines.append("- api_requests:") for request in repo.api_requests: lines.append(f" - `{request}`") lines.append("") return "\n".join(lines).strip() + "\n" def write_json(path: Path, payload: object) -> Path: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(as_plain_data(payload), ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") return path def write_text(path: Path, text: str) -> Path: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(text, encoding="utf-8") return path def write_gitea_plan_artifacts( plan: GiteaMeshPlan, project_root: Path, *, central_platform_folder: Path | None = None, ) -> tuple[GeneratedFile, ...]: write_json(project_root / "dados" / "repository-mesh-gitea-plan.json", plan) write_text(project_root / "matrizes" / "repository-mesh-gitea-plan.csv", gitea_plan_csv(plan)) write_text(project_root / "ecossistema" / "REPOSITORY-MESH-GITEA.md", gitea_plan_markdown(plan)) central_written: list[Path] = [] if central_platform_folder is not None: path = central_platform_folder / "reports" / "PENDENCIAS-CODEX__repository-mesh-gitea.md" try: write_text(path, gitea_plan_markdown(plan)) except OSError: pass else: central_written.append(path) records = [ GeneratedFile( path="dados/repository-mesh-gitea-plan.json", description="Plano Gitea da malha de repositorios.", function="repository mesh gitea plan", file_type="json", changed_by="mais_humana.repository_mesh_gitea", change_summary="Criado plano de verificacao/criacao/renome remoto.", relation_to_order="000_sincronizacao-dos-espelhos", ), GeneratedFile( path="matrizes/repository-mesh-gitea-plan.csv", description="Matriz do plano Gitea.", function="repository mesh gitea matrix", file_type="csv", changed_by="mais_humana.repository_mesh_gitea", change_summary="Gerada matriz de acoes Gitea.", relation_to_order="000_sincronizacao-dos-espelhos", ), GeneratedFile( path="ecossistema/REPOSITORY-MESH-GITEA.md", description="Relatorio humano do plano Gitea.", function="repository mesh gitea report", file_type="markdown", changed_by="mais_humana.repository_mesh_gitea", change_summary="Documentado plano Gitea sem credencial bruta.", relation_to_order="000_sincronizacao-dos-espelhos", ), ] for central_path in central_written: records.append( GeneratedFile( path=str(central_path), description="Pendencias Gitea registradas na central.", function="repository mesh central gitea", file_type="markdown", changed_by="mais_humana.repository_mesh_gitea", change_summary="Registradas pendencias de token/decisao para Gitea.", relation_to_order="000_sincronizacao-dos-espelhos", ) ) return tuple(records)