feat: fundar plataforma mais humana
This commit is contained in:
505
src/mais_humana/scanner.py
Normal file
505
src/mais_humana/scanner.py
Normal file
@@ -0,0 +1,505 @@
|
||||
"""Repository scanner for the human-centered platform.
|
||||
|
||||
The scanner is deliberately conservative. It extracts local evidence without
|
||||
executing project code, without reading secrets, and without depending on a
|
||||
particular package manager. The goal is not static analysis perfection; the
|
||||
goal is repeatable operational context for human reports.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Iterable, Iterator, Sequence
|
||||
|
||||
from .catalog import CATEGORY_KEYWORDS, PLATFORMS, categories_for_text
|
||||
from .models import Evidence, EvidenceKind, FileMetric, PlatformDefinition, PlatformScan, ScriptCommand
|
||||
|
||||
|
||||
SKIP_DIRS = {
|
||||
".git",
|
||||
".test-tmp",
|
||||
".hg",
|
||||
".svn",
|
||||
"node_modules",
|
||||
"dist",
|
||||
"build",
|
||||
"coverage",
|
||||
".next",
|
||||
".nuxt",
|
||||
".wrangler",
|
||||
".turbo",
|
||||
".cache",
|
||||
"vendor",
|
||||
"__pycache__",
|
||||
".pytest_cache",
|
||||
".mypy_cache",
|
||||
}
|
||||
|
||||
TEXT_EXTENSIONS = {
|
||||
".ts",
|
||||
".tsx",
|
||||
".js",
|
||||
".mjs",
|
||||
".cjs",
|
||||
".py",
|
||||
".java",
|
||||
".json",
|
||||
".md",
|
||||
".mdx",
|
||||
".yml",
|
||||
".yaml",
|
||||
".toml",
|
||||
".txt",
|
||||
".sql",
|
||||
".html",
|
||||
".css",
|
||||
".scss",
|
||||
".xml",
|
||||
}
|
||||
|
||||
CODE_EXTENSIONS = {".ts", ".tsx", ".js", ".mjs", ".cjs", ".py", ".java"}
|
||||
|
||||
ROUTE_PATTERNS = (
|
||||
re.compile(r"\bapp\.(get|post|put|patch|delete)\s*\(\s*['\"]([^'\"]+)", re.I),
|
||||
re.compile(r"\brouter\.(get|post|put|patch|delete)\s*\(\s*['\"]([^'\"]+)", re.I),
|
||||
re.compile(r"\bnew\s+URLPattern\s*\(\s*['\"]([^'\"]+)", re.I),
|
||||
re.compile(r"\b(path|route)\s*:\s*['\"](/[^'\"]+)['\"]", re.I),
|
||||
re.compile(r"\bfetch\s*\(\s*['\"](https?://[^'\"]+|/[^'\"]+)['\"]", re.I),
|
||||
)
|
||||
|
||||
SENSITIVE_FILE_PARTS = (
|
||||
".env",
|
||||
"secret",
|
||||
"secrets",
|
||||
"private",
|
||||
"key.pem",
|
||||
"id_rsa",
|
||||
"credential",
|
||||
"credentials",
|
||||
)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ScanOptions:
|
||||
"""Options for local scan depth and safety."""
|
||||
|
||||
max_file_bytes: int = 420_000
|
||||
max_readme_chars: int = 4_000
|
||||
max_evidence_per_kind: int = 40
|
||||
include_markdown_metrics: bool = True
|
||||
include_json_metrics: bool = True
|
||||
|
||||
|
||||
def is_sensitive_path(path: Path) -> bool:
|
||||
lowered = str(path).lower()
|
||||
return any(part in lowered for part in SENSITIVE_FILE_PARTS)
|
||||
|
||||
|
||||
def should_skip_dir(path: Path) -> bool:
|
||||
return path.name in SKIP_DIRS
|
||||
|
||||
|
||||
def is_probably_text(path: Path) -> bool:
|
||||
return path.suffix.lower() in TEXT_EXTENSIONS or path.name.lower() in {"package.json", "wrangler.toml"}
|
||||
|
||||
|
||||
def safe_relative(path: Path, base: Path) -> str:
|
||||
try:
|
||||
return str(path.relative_to(base)).replace("\\", "/")
|
||||
except ValueError:
|
||||
return str(path).replace("\\", "/")
|
||||
|
||||
|
||||
def iter_files(root: Path) -> Iterator[Path]:
|
||||
if not root.exists():
|
||||
return
|
||||
stack = [root]
|
||||
while stack:
|
||||
current = stack.pop()
|
||||
try:
|
||||
entries = sorted(current.iterdir(), key=lambda item: item.name.lower())
|
||||
except OSError:
|
||||
continue
|
||||
for entry in entries:
|
||||
if entry.is_dir():
|
||||
if not should_skip_dir(entry):
|
||||
stack.append(entry)
|
||||
elif entry.is_file():
|
||||
yield entry
|
||||
|
||||
|
||||
def count_lines(path: Path, max_bytes: int) -> int:
|
||||
try:
|
||||
if path.stat().st_size > max_bytes:
|
||||
return 0
|
||||
with path.open("r", encoding="utf-8", errors="ignore") as handle:
|
||||
return sum(1 for _ in handle)
|
||||
except OSError:
|
||||
return 0
|
||||
|
||||
|
||||
def read_text_limited(path: Path, max_bytes: int) -> str:
|
||||
try:
|
||||
if path.stat().st_size > max_bytes:
|
||||
return ""
|
||||
return path.read_text(encoding="utf-8", errors="ignore")
|
||||
except OSError:
|
||||
return ""
|
||||
|
||||
|
||||
def run_git(repo: Path, *args: str) -> str | None:
|
||||
try:
|
||||
completed = subprocess.run(
|
||||
["git", *args],
|
||||
cwd=str(repo),
|
||||
text=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
timeout=12,
|
||||
check=False,
|
||||
)
|
||||
except (OSError, subprocess.SubprocessError):
|
||||
return None
|
||||
if completed.returncode != 0:
|
||||
return None
|
||||
return completed.stdout.strip() or None
|
||||
|
||||
|
||||
def detect_git(repo: Path) -> tuple[bool, str | None, str | None, str | None]:
|
||||
git_present = (repo / ".git").exists()
|
||||
if not git_present:
|
||||
return False, None, None, None
|
||||
branch = run_git(repo, "rev-parse", "--abbrev-ref", "HEAD")
|
||||
head = run_git(repo, "rev-parse", "HEAD")
|
||||
remote = run_git(repo, "remote", "get-url", "origin")
|
||||
return True, branch, head, remote
|
||||
|
||||
|
||||
def script_intent(name: str, command: str) -> str:
|
||||
merged = f"{name} {command}".lower()
|
||||
if any(token in merged for token in ("test", "vitest", "pytest", "jest", "node --test")):
|
||||
return "test"
|
||||
if any(token in merged for token in ("build", "tsc", "vite build", "rollup", "webpack")):
|
||||
return "build"
|
||||
if any(token in merged for token in ("smoke", "health", "readiness")):
|
||||
return "validation"
|
||||
if any(token in merged for token in ("deploy", "wrangler deploy", "pages deploy")):
|
||||
return "deploy"
|
||||
if any(token in merged for token in ("generate", "contract", "schema")):
|
||||
return "generation"
|
||||
if any(token in merged for token in ("lint", "format", "eslint", "prettier")):
|
||||
return "quality"
|
||||
return "operation"
|
||||
|
||||
|
||||
def load_package_scripts(repo: Path) -> tuple[ScriptCommand, ...]:
|
||||
package_path = repo / "package.json"
|
||||
if not package_path.exists() or is_sensitive_path(package_path):
|
||||
return ()
|
||||
try:
|
||||
data = json.loads(package_path.read_text(encoding="utf-8", errors="ignore"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return ()
|
||||
scripts = data.get("scripts", {})
|
||||
if not isinstance(scripts, dict):
|
||||
return ()
|
||||
output: list[ScriptCommand] = []
|
||||
for name, command in sorted(scripts.items()):
|
||||
if isinstance(command, str):
|
||||
output.append(
|
||||
ScriptCommand(
|
||||
name=name,
|
||||
command=command,
|
||||
source_file="package.json",
|
||||
intent=script_intent(name, command),
|
||||
)
|
||||
)
|
||||
return tuple(output)
|
||||
|
||||
|
||||
def read_readme(repo: Path, max_chars: int) -> str:
|
||||
candidates = [
|
||||
repo / "README.md",
|
||||
repo / "readme.md",
|
||||
repo / "README.txt",
|
||||
repo / "docs" / "README.md",
|
||||
]
|
||||
for candidate in candidates:
|
||||
if candidate.exists() and not is_sensitive_path(candidate):
|
||||
content = read_text_limited(candidate, max_chars * 8)
|
||||
return content[:max_chars].strip()
|
||||
return ""
|
||||
|
||||
|
||||
def metric_for_file(path: Path, base: Path, options: ScanOptions) -> FileMetric | None:
|
||||
suffix = path.suffix.lower()
|
||||
if suffix not in TEXT_EXTENSIONS and path.name.lower() not in {"package.json", "wrangler.toml"}:
|
||||
return None
|
||||
if suffix in {".md", ".mdx"} and not options.include_markdown_metrics:
|
||||
return None
|
||||
if suffix == ".json" and not options.include_json_metrics:
|
||||
return None
|
||||
if is_sensitive_path(path):
|
||||
return None
|
||||
try:
|
||||
size = path.stat().st_size
|
||||
except OSError:
|
||||
return None
|
||||
lines = count_lines(path, options.max_file_bytes)
|
||||
return FileMetric(path=safe_relative(path, base), extension=suffix or path.name.lower(), lines=lines, bytes_size=size)
|
||||
|
||||
|
||||
def add_limited(bucket: dict[EvidenceKind, list[Evidence]], evidence: Evidence, limit: int) -> None:
|
||||
items = bucket.setdefault(evidence.kind, [])
|
||||
if len(items) < limit:
|
||||
items.append(evidence)
|
||||
|
||||
|
||||
def evidence_from_filename(path: Path, base: Path) -> Evidence | None:
|
||||
relative = safe_relative(path, base)
|
||||
lowered = relative.lower()
|
||||
name = path.name.lower()
|
||||
if is_sensitive_path(path):
|
||||
return None
|
||||
if name.startswith("readme"):
|
||||
return Evidence(EvidenceKind.README, relative, "Documentacao inicial encontrada.", confidence=0.75, tags=("docs",))
|
||||
if "openapi" in lowered or "swagger" in lowered:
|
||||
return Evidence(EvidenceKind.OPENAPI, relative, "Arquivo com indicio de contrato OpenAPI.", confidence=0.8)
|
||||
if "test" in lowered or "spec" in lowered:
|
||||
return Evidence(EvidenceKind.TEST, relative, "Arquivo de teste ou especificacao encontrado.", confidence=0.72)
|
||||
if "wrangler" in name or name in {"package.json", "pyproject.toml", "tsconfig.json"}:
|
||||
return Evidence(EvidenceKind.CONFIG, relative, "Configuracao operacional encontrada.", confidence=0.65)
|
||||
if "worker" in lowered or "cloudflare" in lowered:
|
||||
return Evidence(EvidenceKind.WORKER, relative, "Indicador de Worker ou Cloudflare encontrado.", confidence=0.6)
|
||||
if "screen" in lowered or "view" in lowered or "ui" in lowered:
|
||||
return Evidence(EvidenceKind.UI_SURFACE, relative, "Possivel superficie visual encontrada.", confidence=0.55)
|
||||
if "mcp" in lowered or "tool" in lowered:
|
||||
return Evidence(EvidenceKind.MCP_TOOL, relative, "Possivel tool ou superficie MCP encontrada.", confidence=0.55)
|
||||
return None
|
||||
|
||||
|
||||
def evidence_from_text(path: Path, base: Path, text: str, limit: int) -> tuple[Evidence, ...]:
|
||||
relative = safe_relative(path, base)
|
||||
lowered = text.lower()
|
||||
output: list[Evidence] = []
|
||||
line_index: dict[str, int] = {}
|
||||
for index, line in enumerate(text.splitlines(), start=1):
|
||||
if len(line_index) > 100:
|
||||
break
|
||||
normalized = line.lower()
|
||||
for key in (
|
||||
"health",
|
||||
"readiness",
|
||||
"openapi",
|
||||
"audit",
|
||||
"trace",
|
||||
"rbac",
|
||||
"byok",
|
||||
"credentialref",
|
||||
"panelready",
|
||||
"samesource",
|
||||
"entitlement",
|
||||
"invoice",
|
||||
"incident",
|
||||
"support",
|
||||
"screen",
|
||||
"mcp",
|
||||
):
|
||||
if key in normalized and key not in line_index:
|
||||
line_index[key] = index
|
||||
for key, line in line_index.items():
|
||||
if len(output) >= limit:
|
||||
break
|
||||
kind = kind_for_keyword(key)
|
||||
output.append(
|
||||
Evidence(
|
||||
kind=kind,
|
||||
path=relative,
|
||||
line=line,
|
||||
summary=f"Texto menciona '{key}', sinalizando capacidade humana ou operacional.",
|
||||
confidence=confidence_for_keyword(key),
|
||||
tags=tuple(category.value for category in categories_for_text(key)),
|
||||
)
|
||||
)
|
||||
for pattern in ROUTE_PATTERNS:
|
||||
for match in pattern.finditer(text):
|
||||
if len(output) >= limit:
|
||||
break
|
||||
route = match.group(match.lastindex or 1)
|
||||
if not route:
|
||||
continue
|
||||
output.append(
|
||||
Evidence(
|
||||
kind=EvidenceKind.ROUTE,
|
||||
path=relative,
|
||||
summary=f"Rota ou chamada HTTP detectada: {route}",
|
||||
confidence=0.66,
|
||||
tags=("route",),
|
||||
)
|
||||
)
|
||||
if len(output) >= limit:
|
||||
break
|
||||
return tuple(output)
|
||||
|
||||
|
||||
def kind_for_keyword(keyword: str) -> EvidenceKind:
|
||||
keyword = keyword.lower()
|
||||
if keyword in {"openapi"}:
|
||||
return EvidenceKind.OPENAPI
|
||||
if keyword in {"audit", "trace"}:
|
||||
return EvidenceKind.OBSERVABILITY
|
||||
if keyword in {"rbac", "credentialref", "byok"}:
|
||||
return EvidenceKind.SECURITY
|
||||
if keyword in {"panelready", "samesource", "screen"}:
|
||||
return EvidenceKind.UI_SURFACE
|
||||
if keyword in {"mcp"}:
|
||||
return EvidenceKind.MCP_TOOL
|
||||
if keyword in {"entitlement", "invoice"}:
|
||||
return EvidenceKind.BUSINESS_RULE
|
||||
if keyword in {"incident", "support"}:
|
||||
return EvidenceKind.UNKNOWN
|
||||
if keyword in {"health", "readiness"}:
|
||||
return EvidenceKind.OBSERVABILITY
|
||||
return EvidenceKind.UNKNOWN
|
||||
|
||||
|
||||
def confidence_for_keyword(keyword: str) -> float:
|
||||
strong = {"openapi", "panelready", "samesource", "credentialref", "rbac", "byok"}
|
||||
medium = {"health", "readiness", "audit", "trace", "entitlement", "invoice"}
|
||||
if keyword.lower() in strong:
|
||||
return 0.78
|
||||
if keyword.lower() in medium:
|
||||
return 0.68
|
||||
return 0.55
|
||||
|
||||
|
||||
def classify_warnings(scan: PlatformScan) -> tuple[str, ...]:
|
||||
warnings: list[str] = list(scan.warnings)
|
||||
if not scan.exists:
|
||||
warnings.append("repositorio real nao encontrado")
|
||||
if scan.exists and not scan.git_present:
|
||||
warnings.append("repositorio real existe sem .git")
|
||||
if scan.exists and not scan.readme_excerpt:
|
||||
warnings.append("README tecnico nao encontrado")
|
||||
if scan.exists and scan.code_lines == 0:
|
||||
warnings.append("nenhuma linha de codigo TS/JS/Python/Java encontrada")
|
||||
if scan.exists and not scan.has_tests:
|
||||
warnings.append("testes nao encontrados por varredura local")
|
||||
if scan.exists and not scan.has_openapi:
|
||||
warnings.append("contrato OpenAPI nao encontrado por varredura local")
|
||||
return tuple(dict.fromkeys(warnings))
|
||||
|
||||
|
||||
def scan_platform(root: Path, platform: PlatformDefinition, options: ScanOptions | None = None) -> PlatformScan:
|
||||
options = options or ScanOptions()
|
||||
repo = root / platform.repo_name
|
||||
exists = repo.exists()
|
||||
git_present, branch, head, remote = detect_git(repo) if exists else (False, None, None, None)
|
||||
readme = read_readme(repo, options.max_readme_chars) if exists else ""
|
||||
metrics: list[FileMetric] = []
|
||||
evidence_bucket: dict[EvidenceKind, list[Evidence]] = {}
|
||||
warnings: list[str] = []
|
||||
scripts = load_package_scripts(repo) if exists else ()
|
||||
for script in scripts:
|
||||
add_limited(
|
||||
evidence_bucket,
|
||||
Evidence(
|
||||
EvidenceKind.PACKAGE_SCRIPT,
|
||||
script.source_file,
|
||||
f"Script '{script.name}' com intencao '{script.intent}'.",
|
||||
confidence=0.62,
|
||||
tags=(script.intent,),
|
||||
),
|
||||
options.max_evidence_per_kind,
|
||||
)
|
||||
if exists:
|
||||
for file_path in iter_files(repo):
|
||||
metric = metric_for_file(file_path, repo, options)
|
||||
if metric is not None:
|
||||
metrics.append(metric)
|
||||
filename_evidence = evidence_from_filename(file_path, repo)
|
||||
if filename_evidence is not None:
|
||||
add_limited(evidence_bucket, filename_evidence, options.max_evidence_per_kind)
|
||||
if file_path.suffix.lower() in CODE_EXTENSIONS | {".md", ".json"} and not is_sensitive_path(file_path):
|
||||
text = read_text_limited(file_path, options.max_file_bytes)
|
||||
if text:
|
||||
for item in evidence_from_text(file_path, repo, text, limit=6):
|
||||
add_limited(evidence_bucket, item, options.max_evidence_per_kind)
|
||||
evidence: list[Evidence] = []
|
||||
for kind in sorted(evidence_bucket, key=lambda item: item.value):
|
||||
evidence.extend(evidence_bucket[kind])
|
||||
scan = PlatformScan(
|
||||
platform=platform,
|
||||
repo_path=str(repo),
|
||||
exists=exists,
|
||||
git_present=git_present,
|
||||
branch=branch,
|
||||
head=head,
|
||||
remote_origin=remote,
|
||||
readme_excerpt=readme,
|
||||
file_metrics=tuple(metrics),
|
||||
scripts=scripts,
|
||||
evidence=tuple(evidence),
|
||||
warnings=tuple(warnings),
|
||||
)
|
||||
return PlatformScan(
|
||||
platform=scan.platform,
|
||||
repo_path=scan.repo_path,
|
||||
exists=scan.exists,
|
||||
git_present=scan.git_present,
|
||||
branch=scan.branch,
|
||||
head=scan.head,
|
||||
remote_origin=scan.remote_origin,
|
||||
readme_excerpt=scan.readme_excerpt,
|
||||
file_metrics=scan.file_metrics,
|
||||
scripts=scan.scripts,
|
||||
evidence=scan.evidence,
|
||||
warnings=classify_warnings(scan),
|
||||
scanned_at=scan.scanned_at,
|
||||
)
|
||||
|
||||
|
||||
def scan_ecosystem(root: Path, platforms: Sequence[PlatformDefinition] = PLATFORMS) -> tuple[PlatformScan, ...]:
|
||||
return tuple(scan_platform(root, platform) for platform in platforms)
|
||||
|
||||
|
||||
def summarize_extensions(metrics: Iterable[FileMetric]) -> dict[str, int]:
|
||||
result: dict[str, int] = {}
|
||||
for metric in metrics:
|
||||
result[metric.extension] = result.get(metric.extension, 0) + metric.lines
|
||||
return dict(sorted(result.items(), key=lambda item: (-item[1], item[0])))
|
||||
|
||||
|
||||
def detect_human_keywords(scan: PlatformScan) -> dict[str, int]:
|
||||
counts: dict[str, int] = {}
|
||||
for evidence in scan.evidence:
|
||||
text = f"{evidence.summary} {' '.join(evidence.tags)}".lower()
|
||||
for category, keywords in CATEGORY_KEYWORDS.items():
|
||||
if any(keyword.lower() in text for keyword in keywords):
|
||||
counts[category.value] = counts.get(category.value, 0) + 1
|
||||
return dict(sorted(counts.items(), key=lambda item: (-item[1], item[0])))
|
||||
|
||||
|
||||
def list_candidate_roots(root: Path) -> tuple[str, ...]:
|
||||
if not root.exists():
|
||||
return ()
|
||||
output: list[str] = []
|
||||
for entry in sorted(root.iterdir(), key=lambda item: item.name.lower()):
|
||||
if entry.is_dir() and entry.name.startswith("tudo-para-ia-"):
|
||||
output.append(entry.name)
|
||||
return tuple(output)
|
||||
|
||||
|
||||
def environment_summary(root: Path) -> dict[str, object]:
|
||||
return {
|
||||
"root": str(root),
|
||||
"root_exists": root.exists(),
|
||||
"candidate_repositories": list_candidate_roots(root),
|
||||
"platform_catalog_size": len(PLATFORMS),
|
||||
"skip_dirs": sorted(SKIP_DIRS),
|
||||
}
|
||||
Reference in New Issue
Block a user