172 lines
5.7 KiB
Python
172 lines
5.7 KiB
Python
"""Secret and sensitive-text checks for generated human artifacts."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable, Sequence
|
|
|
|
from .models import as_plain_data
|
|
|
|
|
|
SECRET_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
|
|
("cloudflare_cfat_token", re.compile(r"\bcfat_[A-Za-z0-9_\-]{20,}\b")),
|
|
("generic_token_assignment", re.compile(r"(?i)\b(token|secret|password|api[_-]?key)\s*[:=]\s*['\"]?[A-Za-z0-9_\-]{16,}")),
|
|
("bearer_token", re.compile(r"(?i)\bbearer\s+[A-Za-z0-9_\-\.]{20,}")),
|
|
(
|
|
"cloudflare_token_assignment",
|
|
re.compile(r"(?i)\b(cloudflare[_-]?(api[_-]?)?token|cf[_-]?token)\b\s*[:=]\s*['\"]?[A-Za-z0-9_\-]{24,}"),
|
|
),
|
|
("private_key", re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----")),
|
|
("connection_string", re.compile(r"(?i)\b(postgres|mysql|mongodb|redis)://[^\\s]+")),
|
|
)
|
|
|
|
OPAQUE_REFERENCE_RE = re.compile(
|
|
r"""(?ix)
|
|
^\s*
|
|
["']?(credentialRef|secretRef|tokenRef|sourcePayloadHash|sourceRecordsHash|traceId|auditId)["']?
|
|
\s*[:=]\s*
|
|
["']?[A-Za-z0-9][A-Za-z0-9._:/\-]{2,160}["']?
|
|
[,;]?\s*$
|
|
"""
|
|
)
|
|
|
|
SAFE_TEXT_TERMS = {
|
|
"redaction",
|
|
"sem segredo",
|
|
"nao vazar",
|
|
"referencia opaca",
|
|
}
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class RedactionFinding:
|
|
path: str
|
|
pattern_id: str
|
|
line: int
|
|
sample: str
|
|
severity: str
|
|
recommendation: str
|
|
|
|
def to_dict(self) -> dict[str, object]:
|
|
return as_plain_data(self)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class RedactionReport:
|
|
scanned_files: int
|
|
findings: tuple[RedactionFinding, ...]
|
|
passed: bool
|
|
|
|
def to_dict(self) -> dict[str, object]:
|
|
return as_plain_data(self)
|
|
|
|
|
|
def mask_secret_sample(pattern_id: str, sample: str) -> str:
|
|
cleaned = sample.strip()
|
|
if not cleaned:
|
|
return "[redacted:0]"
|
|
if pattern_id == "cloudflare_cfat_token":
|
|
return f"cfat_[redacted:{len(cleaned)}]"
|
|
if pattern_id == "bearer_token":
|
|
return f"Bearer [redacted:{len(cleaned)}]"
|
|
if "=" in cleaned:
|
|
key = cleaned.split("=", 1)[0].strip()
|
|
return f"{key}=[redacted:{len(cleaned)}]"
|
|
if ":" in cleaned:
|
|
key = cleaned.split(":", 1)[0].strip()
|
|
return f"{key}: [redacted:{len(cleaned)}]"
|
|
if "://" in cleaned:
|
|
return f"[connection-string-redacted:{len(cleaned)}]"
|
|
if len(cleaned) <= 8:
|
|
return "[redacted]"
|
|
return f"{cleaned[:4]}[redacted:{len(cleaned)}]"
|
|
|
|
|
|
def redact_sensitive_text(text: str) -> str:
|
|
"""Return text with known secret-shaped values replaced by redacted markers."""
|
|
|
|
redacted = text
|
|
for pattern_id, pattern in SECRET_PATTERNS:
|
|
redacted = pattern.sub(lambda match: mask_secret_sample(pattern_id, match.group(0)), redacted)
|
|
return redacted
|
|
|
|
|
|
def is_allowlisted(line: str) -> bool:
|
|
lowered = line.lower()
|
|
if "cfat_" in lowered or "bearer " in lowered:
|
|
return False
|
|
if OPAQUE_REFERENCE_RE.match(line):
|
|
return True
|
|
return any(term.lower() in lowered for term in SAFE_TEXT_TERMS) and not any(pattern.search(line) for _, pattern in SECRET_PATTERNS)
|
|
|
|
|
|
def scan_text_for_secrets(path: str, text: str) -> tuple[RedactionFinding, ...]:
|
|
findings: list[RedactionFinding] = []
|
|
for line_number, line in enumerate(text.splitlines(), start=1):
|
|
if is_allowlisted(line):
|
|
continue
|
|
for pattern_id, pattern in SECRET_PATTERNS:
|
|
match = pattern.search(line)
|
|
if not match:
|
|
continue
|
|
sample = match.group(0)
|
|
if len(sample) > 90:
|
|
sample = sample[:87] + "..."
|
|
sample = mask_secret_sample(pattern_id, sample)
|
|
severity = "critical" if pattern_id in {"private_key", "connection_string"} else "warning"
|
|
findings.append(
|
|
RedactionFinding(
|
|
path=path,
|
|
pattern_id=pattern_id,
|
|
line=line_number,
|
|
sample=sample,
|
|
severity=severity,
|
|
recommendation="Substituir valor sensivel por referencia opaca e registrar apenas credentialRef/secretRef.",
|
|
)
|
|
)
|
|
return tuple(findings)
|
|
|
|
|
|
def iter_text_files(root: Path, suffixes: Sequence[str] = (".md", ".json", ".csv", ".html", ".txt")) -> Iterable[Path]:
|
|
if not root.exists():
|
|
return
|
|
for path in root.rglob("*"):
|
|
if path.is_file() and path.suffix.lower() in suffixes:
|
|
if any(part in {".git", ".test-tmp", "__pycache__", "node_modules", "dist", "build"} for part in path.parts):
|
|
continue
|
|
yield path
|
|
|
|
|
|
def scan_generated_artifacts(root: Path) -> RedactionReport:
|
|
findings: list[RedactionFinding] = []
|
|
count = 0
|
|
for path in iter_text_files(root):
|
|
count += 1
|
|
try:
|
|
text = path.read_text(encoding="utf-8", errors="ignore")
|
|
except OSError:
|
|
continue
|
|
findings.extend(scan_text_for_secrets(str(path), text))
|
|
return RedactionReport(scanned_files=count, findings=tuple(findings), passed=not findings)
|
|
|
|
|
|
def redaction_markdown(report: RedactionReport) -> str:
|
|
lines = ["# Redaction Check Mais Humana", ""]
|
|
lines.append(f"- arquivos varridos: `{report.scanned_files}`")
|
|
lines.append(f"- passou: `{report.passed}`")
|
|
lines.append(f"- achados: `{len(report.findings)}`")
|
|
lines.append("")
|
|
if report.findings:
|
|
lines.append("## Achados")
|
|
lines.append("")
|
|
for finding in report.findings:
|
|
lines.append(
|
|
f"- `{finding.severity}` {finding.path}:{finding.line} "
|
|
f"({finding.pattern_id}) - {finding.recommendation}"
|
|
)
|
|
else:
|
|
lines.append("Nenhum segredo aparente encontrado nos artefatos textuais gerados.")
|
|
return "\n".join(lines).strip() + "\n"
|