Files

453 lines
18 KiB
Python

from __future__ import annotations
import logging
import re
from collections import Counter
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from django.core.exceptions import ValidationError
from django.utils import timezone
from wagtail.models import Page, Site
from wagtail.snippets.models import get_snippet_models
from ..agents import get_language_agent
from ..extractors.visible_text import extract_visible_rendered_text, normalize_text
from ..settings import audit_default_locales, rewrite_enabled
from ..types import dedupe_issues, format_issue, make_issue
from .rules.cta import validate_cta
from .rules.forms import validate_form_copy
from .rules.language import detect_language_mismatch
from .rules.patterns import (
GLOBAL_BAD_PATTERNS,
KNOWN_REPLACEMENTS,
LOCALE_FORBIDDEN,
validate_patterns,
)
from mandelstudio.models import LocaleAuditIssue, LocaleAuditRun
logger = logging.getLogger("mandelstudio.multilingual")
def expected_locale(instance: Any) -> str:
locale = getattr(instance, "locale", None)
if locale is not None and getattr(locale, "language_code", None):
return locale.language_code
return "nl"
def iter_text_nodes(value: Any, path: str = ""):
if value is None:
return
if isinstance(value, str):
yield path, value
return
if hasattr(value, "raw_data"):
yield from iter_text_nodes(list(value.raw_data), path)
return
if isinstance(value, list):
for index, item in enumerate(value):
yield from iter_text_nodes(item, f"{path}[{index}]")
return
if isinstance(value, dict):
for key, item in value.items():
child_path = f"{path}.{key}" if path else str(key)
yield from iter_text_nodes(item, child_path)
def extract_instance_text(instance: Any) -> list[tuple[str, str]]:
nodes: list[tuple[str, str]] = []
for field_name in ["title", "seo_title", "search_description"]:
value = getattr(instance, field_name, None)
if isinstance(value, str) and value.strip():
nodes.append((field_name, value))
for field_name in ["body", "content", "footer", "mini_footer"]:
if hasattr(instance, field_name):
nodes.extend(list(iter_text_nodes(getattr(instance, field_name), field_name)))
return nodes
def validate_text_nodes(locale_code: str, nodes: list[tuple[str, str]]):
issues = []
for field_path, raw_text in nodes:
normalized = normalize_text(raw_text)
if not normalized:
continue
issues.extend(validate_patterns(locale_code, field_path, normalized))
issues.extend(validate_cta(locale_code, field_path, normalized))
issues.extend(validate_form_copy(locale_code, field_path, normalized))
if len(normalized) >= 80:
mismatch = detect_language_mismatch(locale_code, normalized)
if mismatch:
issues.append(make_issue("language_heuristic", field_path, mismatch["message"]))
return dedupe_issues(issues)
REWRITE_REVIEW_TYPES = {
"known_bad_pattern",
"wrong_language_fragment",
"rendered_bad_pattern",
"rendered_wrong_language",
"rewrite_candidate",
"weak_marketing_copy",
"foreign_ui_label",
"generic_badge_label",
"mixed_locale_heading",
"cta_language_mismatch",
}
def validate_page(page: Page):
return validate_text_nodes(expected_locale(page), extract_instance_text(page.specific))
def validate_snippet_instance(instance: Any):
return validate_text_nodes(expected_locale(instance), extract_instance_text(instance))
def validate_posted_snippet(locale_code: str, payload: dict[str, Any]):
nodes = [(key, value) for key, value in payload.items() if isinstance(value, str)]
return validate_text_nodes(locale_code, nodes)
def _replace_known_strings(value: Any, locale_code: str):
changes = []
if isinstance(value, str):
new = value
for bad, replacements in KNOWN_REPLACEMENTS.items():
replacement = replacements.get(locale_code)
if replacement and bad in new:
new = new.replace(bad, replacement)
changes.append({"bad": bad, "replacement": replacement})
return new, changes, new != value
if isinstance(value, list):
out = []
changed = False
for item in value:
new_item, item_changes, item_changed = _replace_known_strings(item, locale_code)
out.append(new_item)
changes.extend(item_changes)
changed = changed or item_changed
return out, changes, changed
if isinstance(value, dict):
out = {}
changed = False
for key, item in value.items():
new_item, item_changes, item_changed = _replace_known_strings(item, locale_code)
out[key] = new_item
changes.extend(item_changes)
changed = changed or item_changed
return out, changes, changed
return value, changes, False
def apply_known_replacements(instance: Any, locale_code: str):
changes = []
for field_name in ["title", "seo_title", "search_description"]:
value = getattr(instance, field_name, None)
if not isinstance(value, str):
continue
new_value, field_changes, changed = _replace_known_strings(value, locale_code)
if changed:
setattr(instance, field_name, new_value)
changes.extend({"field": field_name, **change} for change in field_changes)
for field_name in ["body", "content", "footer", "mini_footer"]:
if not hasattr(instance, field_name):
continue
field_value = getattr(instance, field_name)
if hasattr(field_value, "raw_data"):
new_raw, field_changes, changed = _replace_known_strings(list(field_value.raw_data), locale_code)
if changed:
setattr(instance, field_name, new_raw)
changes.extend({"field": field_name, **change} for change in field_changes)
elif isinstance(field_value, str):
new_value, field_changes, changed = _replace_known_strings(field_value, locale_code)
if changed:
setattr(instance, field_name, new_value)
changes.extend({"field": field_name, **change} for change in field_changes)
if not changes:
return []
if isinstance(instance, Page):
revision = instance.save_revision()
if instance.live:
revision.publish()
return changes
instance.save()
return changes
def rewrite_with_agent(instance: Any, locale_code: str, issues, *, dry_run: bool = False):
if not rewrite_enabled():
return []
agent = get_language_agent(locale_code)
issue_map = agent.build_issue_map(issues)
changes = []
for field_name in ["title", "seo_title", "search_description"]:
value = getattr(instance, field_name, None)
if not isinstance(value, str):
continue
field_issues = issue_map.get(field_name, [])
rewritten = agent.rewrite(value, field_path=field_name, issues=field_issues)
if rewritten != value:
setattr(instance, field_name, rewritten)
changes.append({"field": field_name, "before": value, "after": rewritten, "method": "agent"})
for field_name in ["body", "content", "footer", "mini_footer"]:
if not hasattr(instance, field_name):
continue
field_value = getattr(instance, field_name)
if hasattr(field_value, "raw_data"):
rewritten, changed = agent.process_block(list(field_value.raw_data), field_name, issue_map)
if changed:
setattr(instance, field_name, rewritten)
changes.append({"field": field_name, "method": "agent"})
elif isinstance(field_value, str):
rewritten = agent.rewrite(field_value, field_path=field_name, issues=issue_map.get(field_name, []))
if rewritten != field_value:
setattr(instance, field_name, rewritten)
changes.append({"field": field_name, "before": field_value, "after": rewritten, "method": "agent"})
if not changes or dry_run:
return changes
if isinstance(instance, Page):
revision = instance.save_revision()
if instance.live:
revision.publish()
return changes
instance.save()
return changes
def enumerate_public_pages(locale_codes: list[str] | None = None, url_filters: list[str] | None = None):
result = {}
site = Site.objects.order_by("id").first()
site_root = getattr(site, "root_page", None)
normalized_filters = set(url_filters or [])
for locale_code in (locale_codes or audit_default_locales()):
locale_root_path = None
if site_root is not None:
translated_root = (
Page.objects.filter(
translation_key=site_root.translation_key,
locale__language_code=locale_code,
)
.specific()
.first()
)
chosen_root = translated_root or site_root
locale_root_path = getattr(chosen_root, "path", None)
qs = (
Page.objects.filter(locale__language_code=locale_code)
.live()
.public()
.specific()
.order_by("path")
)
pages = []
for page in qs:
page_url = getattr(page, "url", None)
if not page_url:
continue
if locale_root_path and not page.path.startswith(locale_root_path):
continue
if normalized_filters and page_url not in normalized_filters:
continue
pages.append(page)
result[locale_code] = pages
return result
def fetch_rendered_text(page: Page):
page_url = getattr(page, "url", None)
if not page_url:
return 598, "missing page URL"
if str(page_url).startswith("http"):
full_url = page_url
else:
try:
site = page.get_site()
except Site.DoesNotExist:
site = None
site = site or Site.objects.order_by("id").first()
if site is None or not getattr(site, "root_url", None):
return 598, "missing site root_url"
full_url = f"{site.root_url}{page_url}"
request = Request(full_url, headers={"User-Agent": "mandelstudio-audit/1.0"})
try:
with urlopen(request, timeout=30) as response:
status = response.getcode()
body = response.read().decode("utf-8", errors="replace")
except HTTPError as exc:
status = exc.code
body = exc.read().decode("utf-8", errors="replace")
except URLError as exc:
status = 599
body = str(exc)
text = extract_visible_rendered_text(body)
return status, text
def iter_rendered_lines(rendered_text: str) -> list[str]:
lines = []
for chunk in re.split(r"(?<=[\.\!\?])\s+|\s{2,}", rendered_text):
normalized = normalize_text(chunk)
if normalized:
lines.append(normalized)
return lines
def validate_rendered_output(locale_code: str, rendered_text: str, status_code: int):
issues = []
if status_code != 200:
issues.append(make_issue("render_status", "rendered", str(status_code)))
source_counter = Counter()
for line in iter_rendered_lines(rendered_text):
line_issues = validate_patterns(locale_code, "rendered", line)
for issue in line_issues:
issue.bad_value = line
issue.extra = {**(issue.extra or {}), "source": "rendered"}
source_counter[(issue.issue_type, issue.bad_value)] += 1
issues.extend(line_issues)
for issue in issues:
if issue.extra is not None:
issue.extra["count"] = source_counter.get((issue.issue_type, issue.bad_value), 1)
for fragment in GLOBAL_BAD_PATTERNS:
if fragment in rendered_text:
issue = make_issue("rendered_bad_pattern", "rendered", fragment, KNOWN_REPLACEMENTS.get(fragment, {}).get(locale_code, ""))
issue.extra = {"source": "rendered", "count": 1}
issues.append(issue)
for fragment in LOCALE_FORBIDDEN.get(locale_code, ()):
if fragment in rendered_text:
issue = make_issue("rendered_wrong_language", "rendered", fragment, KNOWN_REPLACEMENTS.get(fragment, {}).get(locale_code, ""))
issue.extra = {"source": "rendered", "count": 1}
issues.append(issue)
return dedupe_issues(issues)
def annotate_rewrite_previews(locale_code: str, issues):
agent = get_language_agent(locale_code)
for issue in issues:
if issue.issue_type not in REWRITE_REVIEW_TYPES:
continue
if issue.replacement:
continue
preview = agent.rewrite(issue.bad_value, field_path=issue.field_path, issues=[issue])
if preview and preview != issue.bad_value:
issue.replacement = preview
issue.extra = {**(issue.extra or {}), "review_candidate": True}
return issues
def validate_instance_or_raise(instance: Any):
issues = validate_page(instance) if isinstance(instance, Page) else validate_snippet_instance(instance)
blocking = [issue for issue in issues if issue.blocks]
if not blocking:
return issues
raise ValidationError({"content_guard": [format_issue(issue) for issue in blocking]})
def validate_ai_text_or_raise(locale_code: str, field_path: str, value: str):
issues = validate_text_nodes(locale_code, [(field_path, value)])
blocking = [issue for issue in issues if issue.blocks]
if not blocking:
return issues
raise ValidationError({"content_guard": [format_issue(issue) for issue in blocking]})
def record_issues(run: LocaleAuditRun, locale_code: str, obj: Any, issues, *, fixed: bool = False) -> None:
for issue in issues:
LocaleAuditIssue.objects.create(
run=run,
locale_code=locale_code,
object_id=getattr(obj, "pk", None),
object_type=obj.__class__.__name__,
url=getattr(obj, "url", "") or "",
title=getattr(obj, "title", str(obj))[:255],
severity=issue.severity,
issue_type=issue.issue_type,
field_path=issue.field_path,
bad_value=issue.bad_value,
replacement=issue.replacement,
fixed=fixed,
extra=issue.extra or {},
)
def audit_locales(locale_codes: list[str], fix: bool = False, rewrite: bool = False, dry_run: bool = False, url_filters: list[str] | None = None) -> LocaleAuditRun:
run = LocaleAuditRun.objects.create(locale_codes=locale_codes, fix_enabled=fix or rewrite)
pages_by_locale = enumerate_public_pages(locale_codes, url_filters=url_filters)
summary: dict[str, Any] = {}
total_checked = 0
total_issues = 0
pages_with_issues = 0
for locale_code, pages in pages_by_locale.items():
locale_summary = {"total_urls_checked": len(pages), "issues_found": 0, "issues_fixed": 0, "remaining_issues": 0, "by_severity": {"block": 0, "warn": 0, "log": 0}}
for page in pages:
total_checked += 1
status_code, rendered = fetch_rendered_text(page)
issues = dedupe_issues(validate_page(page) + validate_rendered_output(locale_code, rendered, status_code))
if rewrite:
issues = annotate_rewrite_previews(locale_code, issues)
initial_issue_count = len(issues)
fixed_changes = []
if issues and fix:
fixed_changes = apply_known_replacements(page.specific, locale_code)
if fixed_changes:
record_issues(run, locale_code, page, issues, fixed=True)
status_code, rendered = fetch_rendered_text(page.specific)
issues = dedupe_issues(validate_page(page.specific) + validate_rendered_output(locale_code, rendered, status_code))
if rewrite:
issues = annotate_rewrite_previews(locale_code, issues)
if issues and rewrite:
rewrite_changes = rewrite_with_agent(page.specific, locale_code, issues, dry_run=dry_run)
if rewrite_changes:
record_issues(run, locale_code, page, issues, fixed=not dry_run)
if not dry_run:
status_code, rendered = fetch_rendered_text(page.specific)
issues = dedupe_issues(validate_page(page.specific) + validate_rendered_output(locale_code, rendered, status_code))
issues = annotate_rewrite_previews(locale_code, issues)
if issues:
pages_with_issues += 1
record_issues(run, locale_code, page, issues)
locale_summary["issues_found"] += initial_issue_count
locale_summary["issues_fixed"] += initial_issue_count - len(issues)
locale_summary["remaining_issues"] += len(issues)
for issue in issues:
locale_summary["by_severity"][issue.severity] = locale_summary["by_severity"].get(issue.severity, 0) + 1
total_issues += initial_issue_count
summary[locale_code] = locale_summary
snippet_summary = {}
for model in get_snippet_models():
count = 0
for instance in model.objects.all():
issues = validate_snippet_instance(instance)
if rewrite:
issues = annotate_rewrite_previews(expected_locale(instance), issues)
if issues and rewrite:
rewrite_changes = rewrite_with_agent(instance, expected_locale(instance), issues, dry_run=dry_run)
if rewrite_changes and not dry_run:
issues = validate_snippet_instance(instance)
if not issues:
continue
count += len(issues)
record_issues(run, expected_locale(instance), instance, issues)
if count:
snippet_summary[model.__name__] = count
total_issues += count
summary["snippets"] = snippet_summary
run.total_urls_checked = total_checked
run.issues_found = total_issues
run.pages_with_issues = pages_with_issues
run.summary = summary
run.finished_at = timezone.now()
run.save(update_fields=["total_urls_checked", "issues_found", "pages_with_issues", "summary", "finished_at"])
logger.info("Completed multilingual audit run %s", run.pk)
return run