Add multilingual audit CI pipeline + extract mandelblog_content_guard
This commit is contained in:
452
mandelblog_content_guard/validators/multilingual.py
Normal file
452
mandelblog_content_guard/validators/multilingual.py
Normal file
@@ -0,0 +1,452 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
from collections import Counter
|
||||
from typing import Any
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.utils import timezone
|
||||
from wagtail.models import Page, Site
|
||||
from wagtail.snippets.models import get_snippet_models
|
||||
|
||||
from ..agents import get_language_agent
|
||||
from ..extractors.visible_text import extract_visible_rendered_text, normalize_text
|
||||
from ..settings import audit_default_locales, rewrite_enabled
|
||||
from ..types import dedupe_issues, format_issue, make_issue
|
||||
from .rules.cta import validate_cta
|
||||
from .rules.forms import validate_form_copy
|
||||
from .rules.language import detect_language_mismatch
|
||||
from .rules.patterns import (
|
||||
GLOBAL_BAD_PATTERNS,
|
||||
KNOWN_REPLACEMENTS,
|
||||
LOCALE_FORBIDDEN,
|
||||
validate_patterns,
|
||||
)
|
||||
from mandelstudio.models import LocaleAuditIssue, LocaleAuditRun
|
||||
|
||||
logger = logging.getLogger("mandelstudio.multilingual")
|
||||
|
||||
|
||||
def expected_locale(instance: Any) -> str:
|
||||
locale = getattr(instance, "locale", None)
|
||||
if locale is not None and getattr(locale, "language_code", None):
|
||||
return locale.language_code
|
||||
return "nl"
|
||||
|
||||
|
||||
def iter_text_nodes(value: Any, path: str = ""):
|
||||
if value is None:
|
||||
return
|
||||
if isinstance(value, str):
|
||||
yield path, value
|
||||
return
|
||||
if hasattr(value, "raw_data"):
|
||||
yield from iter_text_nodes(list(value.raw_data), path)
|
||||
return
|
||||
if isinstance(value, list):
|
||||
for index, item in enumerate(value):
|
||||
yield from iter_text_nodes(item, f"{path}[{index}]")
|
||||
return
|
||||
if isinstance(value, dict):
|
||||
for key, item in value.items():
|
||||
child_path = f"{path}.{key}" if path else str(key)
|
||||
yield from iter_text_nodes(item, child_path)
|
||||
|
||||
|
||||
def extract_instance_text(instance: Any) -> list[tuple[str, str]]:
|
||||
nodes: list[tuple[str, str]] = []
|
||||
for field_name in ["title", "seo_title", "search_description"]:
|
||||
value = getattr(instance, field_name, None)
|
||||
if isinstance(value, str) and value.strip():
|
||||
nodes.append((field_name, value))
|
||||
for field_name in ["body", "content", "footer", "mini_footer"]:
|
||||
if hasattr(instance, field_name):
|
||||
nodes.extend(list(iter_text_nodes(getattr(instance, field_name), field_name)))
|
||||
return nodes
|
||||
|
||||
|
||||
def validate_text_nodes(locale_code: str, nodes: list[tuple[str, str]]):
|
||||
issues = []
|
||||
for field_path, raw_text in nodes:
|
||||
normalized = normalize_text(raw_text)
|
||||
if not normalized:
|
||||
continue
|
||||
issues.extend(validate_patterns(locale_code, field_path, normalized))
|
||||
issues.extend(validate_cta(locale_code, field_path, normalized))
|
||||
issues.extend(validate_form_copy(locale_code, field_path, normalized))
|
||||
if len(normalized) >= 80:
|
||||
mismatch = detect_language_mismatch(locale_code, normalized)
|
||||
if mismatch:
|
||||
issues.append(make_issue("language_heuristic", field_path, mismatch["message"]))
|
||||
return dedupe_issues(issues)
|
||||
|
||||
|
||||
REWRITE_REVIEW_TYPES = {
|
||||
"known_bad_pattern",
|
||||
"wrong_language_fragment",
|
||||
"rendered_bad_pattern",
|
||||
"rendered_wrong_language",
|
||||
"rewrite_candidate",
|
||||
"weak_marketing_copy",
|
||||
"foreign_ui_label",
|
||||
"generic_badge_label",
|
||||
"mixed_locale_heading",
|
||||
"cta_language_mismatch",
|
||||
}
|
||||
|
||||
|
||||
def validate_page(page: Page):
|
||||
return validate_text_nodes(expected_locale(page), extract_instance_text(page.specific))
|
||||
|
||||
|
||||
def validate_snippet_instance(instance: Any):
|
||||
return validate_text_nodes(expected_locale(instance), extract_instance_text(instance))
|
||||
|
||||
|
||||
def validate_posted_snippet(locale_code: str, payload: dict[str, Any]):
|
||||
nodes = [(key, value) for key, value in payload.items() if isinstance(value, str)]
|
||||
return validate_text_nodes(locale_code, nodes)
|
||||
|
||||
|
||||
def _replace_known_strings(value: Any, locale_code: str):
|
||||
changes = []
|
||||
if isinstance(value, str):
|
||||
new = value
|
||||
for bad, replacements in KNOWN_REPLACEMENTS.items():
|
||||
replacement = replacements.get(locale_code)
|
||||
if replacement and bad in new:
|
||||
new = new.replace(bad, replacement)
|
||||
changes.append({"bad": bad, "replacement": replacement})
|
||||
return new, changes, new != value
|
||||
if isinstance(value, list):
|
||||
out = []
|
||||
changed = False
|
||||
for item in value:
|
||||
new_item, item_changes, item_changed = _replace_known_strings(item, locale_code)
|
||||
out.append(new_item)
|
||||
changes.extend(item_changes)
|
||||
changed = changed or item_changed
|
||||
return out, changes, changed
|
||||
if isinstance(value, dict):
|
||||
out = {}
|
||||
changed = False
|
||||
for key, item in value.items():
|
||||
new_item, item_changes, item_changed = _replace_known_strings(item, locale_code)
|
||||
out[key] = new_item
|
||||
changes.extend(item_changes)
|
||||
changed = changed or item_changed
|
||||
return out, changes, changed
|
||||
return value, changes, False
|
||||
|
||||
|
||||
def apply_known_replacements(instance: Any, locale_code: str):
|
||||
changes = []
|
||||
for field_name in ["title", "seo_title", "search_description"]:
|
||||
value = getattr(instance, field_name, None)
|
||||
if not isinstance(value, str):
|
||||
continue
|
||||
new_value, field_changes, changed = _replace_known_strings(value, locale_code)
|
||||
if changed:
|
||||
setattr(instance, field_name, new_value)
|
||||
changes.extend({"field": field_name, **change} for change in field_changes)
|
||||
|
||||
for field_name in ["body", "content", "footer", "mini_footer"]:
|
||||
if not hasattr(instance, field_name):
|
||||
continue
|
||||
field_value = getattr(instance, field_name)
|
||||
if hasattr(field_value, "raw_data"):
|
||||
new_raw, field_changes, changed = _replace_known_strings(list(field_value.raw_data), locale_code)
|
||||
if changed:
|
||||
setattr(instance, field_name, new_raw)
|
||||
changes.extend({"field": field_name, **change} for change in field_changes)
|
||||
elif isinstance(field_value, str):
|
||||
new_value, field_changes, changed = _replace_known_strings(field_value, locale_code)
|
||||
if changed:
|
||||
setattr(instance, field_name, new_value)
|
||||
changes.extend({"field": field_name, **change} for change in field_changes)
|
||||
|
||||
if not changes:
|
||||
return []
|
||||
if isinstance(instance, Page):
|
||||
revision = instance.save_revision()
|
||||
if instance.live:
|
||||
revision.publish()
|
||||
return changes
|
||||
instance.save()
|
||||
return changes
|
||||
|
||||
|
||||
def rewrite_with_agent(instance: Any, locale_code: str, issues, *, dry_run: bool = False):
|
||||
if not rewrite_enabled():
|
||||
return []
|
||||
agent = get_language_agent(locale_code)
|
||||
issue_map = agent.build_issue_map(issues)
|
||||
changes = []
|
||||
|
||||
for field_name in ["title", "seo_title", "search_description"]:
|
||||
value = getattr(instance, field_name, None)
|
||||
if not isinstance(value, str):
|
||||
continue
|
||||
field_issues = issue_map.get(field_name, [])
|
||||
rewritten = agent.rewrite(value, field_path=field_name, issues=field_issues)
|
||||
if rewritten != value:
|
||||
setattr(instance, field_name, rewritten)
|
||||
changes.append({"field": field_name, "before": value, "after": rewritten, "method": "agent"})
|
||||
|
||||
for field_name in ["body", "content", "footer", "mini_footer"]:
|
||||
if not hasattr(instance, field_name):
|
||||
continue
|
||||
field_value = getattr(instance, field_name)
|
||||
if hasattr(field_value, "raw_data"):
|
||||
rewritten, changed = agent.process_block(list(field_value.raw_data), field_name, issue_map)
|
||||
if changed:
|
||||
setattr(instance, field_name, rewritten)
|
||||
changes.append({"field": field_name, "method": "agent"})
|
||||
elif isinstance(field_value, str):
|
||||
rewritten = agent.rewrite(field_value, field_path=field_name, issues=issue_map.get(field_name, []))
|
||||
if rewritten != field_value:
|
||||
setattr(instance, field_name, rewritten)
|
||||
changes.append({"field": field_name, "before": field_value, "after": rewritten, "method": "agent"})
|
||||
|
||||
if not changes or dry_run:
|
||||
return changes
|
||||
if isinstance(instance, Page):
|
||||
revision = instance.save_revision()
|
||||
if instance.live:
|
||||
revision.publish()
|
||||
return changes
|
||||
instance.save()
|
||||
return changes
|
||||
|
||||
|
||||
def enumerate_public_pages(locale_codes: list[str] | None = None, url_filters: list[str] | None = None):
|
||||
result = {}
|
||||
site = Site.objects.order_by("id").first()
|
||||
site_root = getattr(site, "root_page", None)
|
||||
normalized_filters = set(url_filters or [])
|
||||
for locale_code in (locale_codes or audit_default_locales()):
|
||||
locale_root_path = None
|
||||
if site_root is not None:
|
||||
translated_root = (
|
||||
Page.objects.filter(
|
||||
translation_key=site_root.translation_key,
|
||||
locale__language_code=locale_code,
|
||||
)
|
||||
.specific()
|
||||
.first()
|
||||
)
|
||||
chosen_root = translated_root or site_root
|
||||
locale_root_path = getattr(chosen_root, "path", None)
|
||||
qs = (
|
||||
Page.objects.filter(locale__language_code=locale_code)
|
||||
.live()
|
||||
.public()
|
||||
.specific()
|
||||
.order_by("path")
|
||||
)
|
||||
pages = []
|
||||
for page in qs:
|
||||
page_url = getattr(page, "url", None)
|
||||
if not page_url:
|
||||
continue
|
||||
if locale_root_path and not page.path.startswith(locale_root_path):
|
||||
continue
|
||||
if normalized_filters and page_url not in normalized_filters:
|
||||
continue
|
||||
pages.append(page)
|
||||
result[locale_code] = pages
|
||||
return result
|
||||
|
||||
|
||||
def fetch_rendered_text(page: Page):
|
||||
page_url = getattr(page, "url", None)
|
||||
if not page_url:
|
||||
return 598, "missing page URL"
|
||||
if str(page_url).startswith("http"):
|
||||
full_url = page_url
|
||||
else:
|
||||
try:
|
||||
site = page.get_site()
|
||||
except Site.DoesNotExist:
|
||||
site = None
|
||||
site = site or Site.objects.order_by("id").first()
|
||||
if site is None or not getattr(site, "root_url", None):
|
||||
return 598, "missing site root_url"
|
||||
full_url = f"{site.root_url}{page_url}"
|
||||
request = Request(full_url, headers={"User-Agent": "mandelstudio-audit/1.0"})
|
||||
try:
|
||||
with urlopen(request, timeout=30) as response:
|
||||
status = response.getcode()
|
||||
body = response.read().decode("utf-8", errors="replace")
|
||||
except HTTPError as exc:
|
||||
status = exc.code
|
||||
body = exc.read().decode("utf-8", errors="replace")
|
||||
except URLError as exc:
|
||||
status = 599
|
||||
body = str(exc)
|
||||
text = extract_visible_rendered_text(body)
|
||||
return status, text
|
||||
|
||||
|
||||
def iter_rendered_lines(rendered_text: str) -> list[str]:
|
||||
lines = []
|
||||
for chunk in re.split(r"(?<=[\.\!\?])\s+|\s{2,}", rendered_text):
|
||||
normalized = normalize_text(chunk)
|
||||
if normalized:
|
||||
lines.append(normalized)
|
||||
return lines
|
||||
|
||||
|
||||
def validate_rendered_output(locale_code: str, rendered_text: str, status_code: int):
|
||||
issues = []
|
||||
if status_code != 200:
|
||||
issues.append(make_issue("render_status", "rendered", str(status_code)))
|
||||
source_counter = Counter()
|
||||
for line in iter_rendered_lines(rendered_text):
|
||||
line_issues = validate_patterns(locale_code, "rendered", line)
|
||||
for issue in line_issues:
|
||||
issue.bad_value = line
|
||||
issue.extra = {**(issue.extra or {}), "source": "rendered"}
|
||||
source_counter[(issue.issue_type, issue.bad_value)] += 1
|
||||
issues.extend(line_issues)
|
||||
for issue in issues:
|
||||
if issue.extra is not None:
|
||||
issue.extra["count"] = source_counter.get((issue.issue_type, issue.bad_value), 1)
|
||||
for fragment in GLOBAL_BAD_PATTERNS:
|
||||
if fragment in rendered_text:
|
||||
issue = make_issue("rendered_bad_pattern", "rendered", fragment, KNOWN_REPLACEMENTS.get(fragment, {}).get(locale_code, ""))
|
||||
issue.extra = {"source": "rendered", "count": 1}
|
||||
issues.append(issue)
|
||||
for fragment in LOCALE_FORBIDDEN.get(locale_code, ()):
|
||||
if fragment in rendered_text:
|
||||
issue = make_issue("rendered_wrong_language", "rendered", fragment, KNOWN_REPLACEMENTS.get(fragment, {}).get(locale_code, ""))
|
||||
issue.extra = {"source": "rendered", "count": 1}
|
||||
issues.append(issue)
|
||||
return dedupe_issues(issues)
|
||||
|
||||
|
||||
def annotate_rewrite_previews(locale_code: str, issues):
|
||||
agent = get_language_agent(locale_code)
|
||||
for issue in issues:
|
||||
if issue.issue_type not in REWRITE_REVIEW_TYPES:
|
||||
continue
|
||||
if issue.replacement:
|
||||
continue
|
||||
preview = agent.rewrite(issue.bad_value, field_path=issue.field_path, issues=[issue])
|
||||
if preview and preview != issue.bad_value:
|
||||
issue.replacement = preview
|
||||
issue.extra = {**(issue.extra or {}), "review_candidate": True}
|
||||
return issues
|
||||
|
||||
|
||||
def validate_instance_or_raise(instance: Any):
|
||||
issues = validate_page(instance) if isinstance(instance, Page) else validate_snippet_instance(instance)
|
||||
blocking = [issue for issue in issues if issue.blocks]
|
||||
if not blocking:
|
||||
return issues
|
||||
raise ValidationError({"content_guard": [format_issue(issue) for issue in blocking]})
|
||||
|
||||
|
||||
def validate_ai_text_or_raise(locale_code: str, field_path: str, value: str):
|
||||
issues = validate_text_nodes(locale_code, [(field_path, value)])
|
||||
blocking = [issue for issue in issues if issue.blocks]
|
||||
if not blocking:
|
||||
return issues
|
||||
raise ValidationError({"content_guard": [format_issue(issue) for issue in blocking]})
|
||||
|
||||
|
||||
def record_issues(run: LocaleAuditRun, locale_code: str, obj: Any, issues, *, fixed: bool = False) -> None:
|
||||
for issue in issues:
|
||||
LocaleAuditIssue.objects.create(
|
||||
run=run,
|
||||
locale_code=locale_code,
|
||||
object_id=getattr(obj, "pk", None),
|
||||
object_type=obj.__class__.__name__,
|
||||
url=getattr(obj, "url", "") or "",
|
||||
title=getattr(obj, "title", str(obj))[:255],
|
||||
severity=issue.severity,
|
||||
issue_type=issue.issue_type,
|
||||
field_path=issue.field_path,
|
||||
bad_value=issue.bad_value,
|
||||
replacement=issue.replacement,
|
||||
fixed=fixed,
|
||||
extra=issue.extra or {},
|
||||
)
|
||||
|
||||
|
||||
def audit_locales(locale_codes: list[str], fix: bool = False, rewrite: bool = False, dry_run: bool = False, url_filters: list[str] | None = None) -> LocaleAuditRun:
|
||||
run = LocaleAuditRun.objects.create(locale_codes=locale_codes, fix_enabled=fix or rewrite)
|
||||
pages_by_locale = enumerate_public_pages(locale_codes, url_filters=url_filters)
|
||||
summary: dict[str, Any] = {}
|
||||
total_checked = 0
|
||||
total_issues = 0
|
||||
pages_with_issues = 0
|
||||
|
||||
for locale_code, pages in pages_by_locale.items():
|
||||
locale_summary = {"total_urls_checked": len(pages), "issues_found": 0, "issues_fixed": 0, "remaining_issues": 0, "by_severity": {"block": 0, "warn": 0, "log": 0}}
|
||||
for page in pages:
|
||||
total_checked += 1
|
||||
status_code, rendered = fetch_rendered_text(page)
|
||||
issues = dedupe_issues(validate_page(page) + validate_rendered_output(locale_code, rendered, status_code))
|
||||
if rewrite:
|
||||
issues = annotate_rewrite_previews(locale_code, issues)
|
||||
initial_issue_count = len(issues)
|
||||
fixed_changes = []
|
||||
if issues and fix:
|
||||
fixed_changes = apply_known_replacements(page.specific, locale_code)
|
||||
if fixed_changes:
|
||||
record_issues(run, locale_code, page, issues, fixed=True)
|
||||
status_code, rendered = fetch_rendered_text(page.specific)
|
||||
issues = dedupe_issues(validate_page(page.specific) + validate_rendered_output(locale_code, rendered, status_code))
|
||||
if rewrite:
|
||||
issues = annotate_rewrite_previews(locale_code, issues)
|
||||
if issues and rewrite:
|
||||
rewrite_changes = rewrite_with_agent(page.specific, locale_code, issues, dry_run=dry_run)
|
||||
if rewrite_changes:
|
||||
record_issues(run, locale_code, page, issues, fixed=not dry_run)
|
||||
if not dry_run:
|
||||
status_code, rendered = fetch_rendered_text(page.specific)
|
||||
issues = dedupe_issues(validate_page(page.specific) + validate_rendered_output(locale_code, rendered, status_code))
|
||||
issues = annotate_rewrite_previews(locale_code, issues)
|
||||
if issues:
|
||||
pages_with_issues += 1
|
||||
record_issues(run, locale_code, page, issues)
|
||||
locale_summary["issues_found"] += initial_issue_count
|
||||
locale_summary["issues_fixed"] += initial_issue_count - len(issues)
|
||||
locale_summary["remaining_issues"] += len(issues)
|
||||
for issue in issues:
|
||||
locale_summary["by_severity"][issue.severity] = locale_summary["by_severity"].get(issue.severity, 0) + 1
|
||||
total_issues += initial_issue_count
|
||||
summary[locale_code] = locale_summary
|
||||
|
||||
snippet_summary = {}
|
||||
for model in get_snippet_models():
|
||||
count = 0
|
||||
for instance in model.objects.all():
|
||||
issues = validate_snippet_instance(instance)
|
||||
if rewrite:
|
||||
issues = annotate_rewrite_previews(expected_locale(instance), issues)
|
||||
if issues and rewrite:
|
||||
rewrite_changes = rewrite_with_agent(instance, expected_locale(instance), issues, dry_run=dry_run)
|
||||
if rewrite_changes and not dry_run:
|
||||
issues = validate_snippet_instance(instance)
|
||||
if not issues:
|
||||
continue
|
||||
count += len(issues)
|
||||
record_issues(run, expected_locale(instance), instance, issues)
|
||||
if count:
|
||||
snippet_summary[model.__name__] = count
|
||||
total_issues += count
|
||||
summary["snippets"] = snippet_summary
|
||||
|
||||
run.total_urls_checked = total_checked
|
||||
run.issues_found = total_issues
|
||||
run.pages_with_issues = pages_with_issues
|
||||
run.summary = summary
|
||||
run.finished_at = timezone.now()
|
||||
run.save(update_fields=["total_urls_checked", "issues_found", "pages_with_issues", "summary", "finished_at"])
|
||||
logger.info("Completed multilingual audit run %s", run.pk)
|
||||
return run
|
||||
Reference in New Issue
Block a user