"""Repository scanner for the human-centered platform. The scanner is deliberately conservative. It extracts local evidence without executing project code, without reading secrets, and without depending on a particular package manager. The goal is not static analysis perfection; the goal is repeatable operational context for human reports. """ from __future__ import annotations import json import os import re import subprocess from dataclasses import dataclass from pathlib import Path from typing import Iterable, Iterator, Sequence from .catalog import CATEGORY_KEYWORDS, PLATFORMS, categories_for_text from .models import Evidence, EvidenceKind, FileMetric, PlatformDefinition, PlatformScan, ScriptCommand SKIP_DIRS = { ".git", ".test-tmp", ".hg", ".svn", "node_modules", "dist", "build", "coverage", ".next", ".nuxt", ".wrangler", ".turbo", ".cache", "vendor", "__pycache__", ".pytest_cache", ".mypy_cache", } TEXT_EXTENSIONS = { ".ts", ".tsx", ".js", ".mjs", ".cjs", ".py", ".java", ".json", ".md", ".mdx", ".yml", ".yaml", ".toml", ".txt", ".sql", ".html", ".css", ".scss", ".xml", } CODE_EXTENSIONS = {".ts", ".tsx", ".js", ".mjs", ".cjs", ".py", ".java"} ROUTE_PATTERNS = ( re.compile(r"\bapp\.(get|post|put|patch|delete)\s*\(\s*['\"]([^'\"]+)", re.I), re.compile(r"\brouter\.(get|post|put|patch|delete)\s*\(\s*['\"]([^'\"]+)", re.I), re.compile(r"\bnew\s+URLPattern\s*\(\s*['\"]([^'\"]+)", re.I), re.compile(r"\b(path|route)\s*:\s*['\"](/[^'\"]+)['\"]", re.I), re.compile(r"\bfetch\s*\(\s*['\"](https?://[^'\"]+|/[^'\"]+)['\"]", re.I), ) SENSITIVE_FILE_PARTS = ( ".env", "secret", "secrets", "private", "key.pem", "id_rsa", "credential", "credentials", ) @dataclass(slots=True) class ScanOptions: """Options for local scan depth and safety.""" max_file_bytes: int = 420_000 max_readme_chars: int = 4_000 max_evidence_per_kind: int = 40 include_markdown_metrics: bool = True include_json_metrics: bool = True def is_sensitive_path(path: Path) -> bool: lowered = str(path).lower() return any(part in lowered for part in SENSITIVE_FILE_PARTS) def should_skip_dir(path: Path) -> bool: return path.name in SKIP_DIRS def is_probably_text(path: Path) -> bool: return path.suffix.lower() in TEXT_EXTENSIONS or path.name.lower() in {"package.json", "wrangler.toml"} def safe_relative(path: Path, base: Path) -> str: try: return str(path.relative_to(base)).replace("\\", "/") except ValueError: return str(path).replace("\\", "/") def iter_files(root: Path) -> Iterator[Path]: if not root.exists(): return stack = [root] while stack: current = stack.pop() try: entries = sorted(current.iterdir(), key=lambda item: item.name.lower()) except OSError: continue for entry in entries: if entry.is_dir(): if not should_skip_dir(entry): stack.append(entry) elif entry.is_file(): yield entry def count_lines(path: Path, max_bytes: int) -> int: try: if path.stat().st_size > max_bytes: return 0 with path.open("r", encoding="utf-8", errors="ignore") as handle: return sum(1 for _ in handle) except OSError: return 0 def read_text_limited(path: Path, max_bytes: int) -> str: try: if path.stat().st_size > max_bytes: return "" return path.read_text(encoding="utf-8", errors="ignore") except OSError: return "" def run_git(repo: Path, *args: str) -> str | None: try: completed = subprocess.run( ["git", *args], cwd=str(repo), text=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, timeout=12, check=False, ) except (OSError, subprocess.SubprocessError): return None if completed.returncode != 0: return None return completed.stdout.strip() or None def detect_git(repo: Path) -> tuple[bool, str | None, str | None, str | None]: git_present = (repo / ".git").exists() if not git_present: return False, None, None, None branch = run_git(repo, "rev-parse", "--abbrev-ref", "HEAD") head = run_git(repo, "rev-parse", "HEAD") remote = run_git(repo, "remote", "get-url", "origin") return True, branch, head, remote def script_intent(name: str, command: str) -> str: merged = f"{name} {command}".lower() if any(token in merged for token in ("test", "vitest", "pytest", "jest", "node --test")): return "test" if any(token in merged for token in ("build", "tsc", "vite build", "rollup", "webpack")): return "build" if any(token in merged for token in ("smoke", "health", "readiness")): return "validation" if any(token in merged for token in ("deploy", "wrangler deploy", "pages deploy")): return "deploy" if any(token in merged for token in ("generate", "contract", "schema")): return "generation" if any(token in merged for token in ("lint", "format", "eslint", "prettier")): return "quality" return "operation" def load_package_scripts(repo: Path) -> tuple[ScriptCommand, ...]: package_path = repo / "package.json" if not package_path.exists() or is_sensitive_path(package_path): return () try: data = json.loads(package_path.read_text(encoding="utf-8", errors="ignore")) except (OSError, json.JSONDecodeError): return () scripts = data.get("scripts", {}) if not isinstance(scripts, dict): return () output: list[ScriptCommand] = [] for name, command in sorted(scripts.items()): if isinstance(command, str): output.append( ScriptCommand( name=name, command=command, source_file="package.json", intent=script_intent(name, command), ) ) return tuple(output) def read_readme(repo: Path, max_chars: int) -> str: candidates = [ repo / "README.md", repo / "readme.md", repo / "README.txt", repo / "docs" / "README.md", ] for candidate in candidates: if candidate.exists() and not is_sensitive_path(candidate): content = read_text_limited(candidate, max_chars * 8) return content[:max_chars].strip() return "" def metric_for_file(path: Path, base: Path, options: ScanOptions) -> FileMetric | None: suffix = path.suffix.lower() if suffix not in TEXT_EXTENSIONS and path.name.lower() not in {"package.json", "wrangler.toml"}: return None if suffix in {".md", ".mdx"} and not options.include_markdown_metrics: return None if suffix == ".json" and not options.include_json_metrics: return None if is_sensitive_path(path): return None try: size = path.stat().st_size except OSError: return None lines = count_lines(path, options.max_file_bytes) return FileMetric(path=safe_relative(path, base), extension=suffix or path.name.lower(), lines=lines, bytes_size=size) def add_limited(bucket: dict[EvidenceKind, list[Evidence]], evidence: Evidence, limit: int) -> None: items = bucket.setdefault(evidence.kind, []) if len(items) < limit: items.append(evidence) def evidence_from_filename(path: Path, base: Path) -> Evidence | None: relative = safe_relative(path, base) lowered = relative.lower() name = path.name.lower() if is_sensitive_path(path): return None if name.startswith("readme"): return Evidence(EvidenceKind.README, relative, "Documentacao inicial encontrada.", confidence=0.75, tags=("docs",)) if "openapi" in lowered or "swagger" in lowered: return Evidence(EvidenceKind.OPENAPI, relative, "Arquivo com indicio de contrato OpenAPI.", confidence=0.8) if "test" in lowered or "spec" in lowered: return Evidence(EvidenceKind.TEST, relative, "Arquivo de teste ou especificacao encontrado.", confidence=0.72) if "wrangler" in name or name in {"package.json", "pyproject.toml", "tsconfig.json"}: return Evidence(EvidenceKind.CONFIG, relative, "Configuracao operacional encontrada.", confidence=0.65) if "worker" in lowered or "cloudflare" in lowered: return Evidence(EvidenceKind.WORKER, relative, "Indicador de Worker ou Cloudflare encontrado.", confidence=0.6) if "screen" in lowered or "view" in lowered or "ui" in lowered: return Evidence(EvidenceKind.UI_SURFACE, relative, "Possivel superficie visual encontrada.", confidence=0.55) if "mcp" in lowered or "tool" in lowered: return Evidence(EvidenceKind.MCP_TOOL, relative, "Possivel tool ou superficie MCP encontrada.", confidence=0.55) return None def evidence_from_text(path: Path, base: Path, text: str, limit: int) -> tuple[Evidence, ...]: relative = safe_relative(path, base) lowered = text.lower() output: list[Evidence] = [] line_index: dict[str, int] = {} for index, line in enumerate(text.splitlines(), start=1): if len(line_index) > 100: break normalized = line.lower() for key in ( "health", "readiness", "openapi", "audit", "trace", "rbac", "byok", "credentialref", "panelready", "samesource", "entitlement", "invoice", "incident", "support", "screen", "mcp", ): if key in normalized and key not in line_index: line_index[key] = index for key, line in line_index.items(): if len(output) >= limit: break kind = kind_for_keyword(key) output.append( Evidence( kind=kind, path=relative, line=line, summary=f"Texto menciona '{key}', sinalizando capacidade humana ou operacional.", confidence=confidence_for_keyword(key), tags=tuple(category.value for category in categories_for_text(key)), ) ) for pattern in ROUTE_PATTERNS: for match in pattern.finditer(text): if len(output) >= limit: break route = match.group(match.lastindex or 1) if not route: continue output.append( Evidence( kind=EvidenceKind.ROUTE, path=relative, summary=f"Rota ou chamada HTTP detectada: {route}", confidence=0.66, tags=("route",), ) ) if len(output) >= limit: break return tuple(output) def kind_for_keyword(keyword: str) -> EvidenceKind: keyword = keyword.lower() if keyword in {"openapi"}: return EvidenceKind.OPENAPI if keyword in {"audit", "trace"}: return EvidenceKind.OBSERVABILITY if keyword in {"rbac", "credentialref", "byok"}: return EvidenceKind.SECURITY if keyword in {"panelready", "samesource", "screen"}: return EvidenceKind.UI_SURFACE if keyword in {"mcp"}: return EvidenceKind.MCP_TOOL if keyword in {"entitlement", "invoice"}: return EvidenceKind.BUSINESS_RULE if keyword in {"incident", "support"}: return EvidenceKind.UNKNOWN if keyword in {"health", "readiness"}: return EvidenceKind.OBSERVABILITY return EvidenceKind.UNKNOWN def confidence_for_keyword(keyword: str) -> float: strong = {"openapi", "panelready", "samesource", "credentialref", "rbac", "byok"} medium = {"health", "readiness", "audit", "trace", "entitlement", "invoice"} if keyword.lower() in strong: return 0.78 if keyword.lower() in medium: return 0.68 return 0.55 def classify_warnings(scan: PlatformScan) -> tuple[str, ...]: warnings: list[str] = list(scan.warnings) if not scan.exists: warnings.append("repositorio real nao encontrado") if scan.exists and not scan.git_present: warnings.append("repositorio real existe sem .git") if scan.exists and not scan.readme_excerpt: warnings.append("README tecnico nao encontrado") if scan.exists and scan.code_lines == 0: warnings.append("nenhuma linha de codigo TS/JS/Python/Java encontrada") if scan.exists and not scan.has_tests: warnings.append("testes nao encontrados por varredura local") if scan.exists and not scan.has_openapi: warnings.append("contrato OpenAPI nao encontrado por varredura local") return tuple(dict.fromkeys(warnings)) def scan_platform(root: Path, platform: PlatformDefinition, options: ScanOptions | None = None) -> PlatformScan: options = options or ScanOptions() repo = root / platform.repo_name exists = repo.exists() git_present, branch, head, remote = detect_git(repo) if exists else (False, None, None, None) readme = read_readme(repo, options.max_readme_chars) if exists else "" metrics: list[FileMetric] = [] evidence_bucket: dict[EvidenceKind, list[Evidence]] = {} warnings: list[str] = [] scripts = load_package_scripts(repo) if exists else () for script in scripts: add_limited( evidence_bucket, Evidence( EvidenceKind.PACKAGE_SCRIPT, script.source_file, f"Script '{script.name}' com intencao '{script.intent}'.", confidence=0.62, tags=(script.intent,), ), options.max_evidence_per_kind, ) if exists: for file_path in iter_files(repo): metric = metric_for_file(file_path, repo, options) if metric is not None: metrics.append(metric) filename_evidence = evidence_from_filename(file_path, repo) if filename_evidence is not None: add_limited(evidence_bucket, filename_evidence, options.max_evidence_per_kind) if file_path.suffix.lower() in CODE_EXTENSIONS | {".md", ".json"} and not is_sensitive_path(file_path): text = read_text_limited(file_path, options.max_file_bytes) if text: for item in evidence_from_text(file_path, repo, text, limit=6): add_limited(evidence_bucket, item, options.max_evidence_per_kind) evidence: list[Evidence] = [] for kind in sorted(evidence_bucket, key=lambda item: item.value): evidence.extend(evidence_bucket[kind]) scan = PlatformScan( platform=platform, repo_path=str(repo), exists=exists, git_present=git_present, branch=branch, head=head, remote_origin=remote, readme_excerpt=readme, file_metrics=tuple(metrics), scripts=scripts, evidence=tuple(evidence), warnings=tuple(warnings), ) return PlatformScan( platform=scan.platform, repo_path=scan.repo_path, exists=scan.exists, git_present=scan.git_present, branch=scan.branch, head=scan.head, remote_origin=scan.remote_origin, readme_excerpt=scan.readme_excerpt, file_metrics=scan.file_metrics, scripts=scan.scripts, evidence=scan.evidence, warnings=classify_warnings(scan), scanned_at=scan.scanned_at, ) def scan_ecosystem(root: Path, platforms: Sequence[PlatformDefinition] = PLATFORMS) -> tuple[PlatformScan, ...]: return tuple(scan_platform(root, platform) for platform in platforms) def summarize_extensions(metrics: Iterable[FileMetric]) -> dict[str, int]: result: dict[str, int] = {} for metric in metrics: result[metric.extension] = result.get(metric.extension, 0) + metric.lines return dict(sorted(result.items(), key=lambda item: (-item[1], item[0]))) def detect_human_keywords(scan: PlatformScan) -> dict[str, int]: counts: dict[str, int] = {} for evidence in scan.evidence: text = f"{evidence.summary} {' '.join(evidence.tags)}".lower() for category, keywords in CATEGORY_KEYWORDS.items(): if any(keyword.lower() in text for keyword in keywords): counts[category.value] = counts.get(category.value, 0) + 1 return dict(sorted(counts.items(), key=lambda item: (-item[1], item[0]))) def list_candidate_roots(root: Path) -> tuple[str, ...]: if not root.exists(): return () output: list[str] = [] for entry in sorted(root.iterdir(), key=lambda item: item.name.lower()): if entry.is_dir() and entry.name.startswith("tudo-para-ia-"): output.append(entry.name) return tuple(output) def environment_summary(root: Path) -> dict[str, object]: return { "root": str(root), "root_exists": root.exists(), "candidate_repositories": list_candidate_roots(root), "platform_catalog_size": len(PLATFORMS), "skip_dirs": sorted(SKIP_DIRS), }