453 lines
18 KiB
Python
453 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from collections import Counter
|
|
from typing import Any
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils import timezone
|
|
from wagtail.models import Page, Site
|
|
from wagtail.snippets.models import get_snippet_models
|
|
|
|
from ..agents import get_language_agent
|
|
from ..extractors.visible_text import extract_visible_rendered_text, normalize_text
|
|
from ..settings import audit_default_locales, rewrite_enabled
|
|
from ..types import dedupe_issues, format_issue, make_issue
|
|
from .rules.cta import validate_cta
|
|
from .rules.forms import validate_form_copy
|
|
from .rules.language import detect_language_mismatch
|
|
from .rules.patterns import (
|
|
GLOBAL_BAD_PATTERNS,
|
|
KNOWN_REPLACEMENTS,
|
|
LOCALE_FORBIDDEN,
|
|
validate_patterns,
|
|
)
|
|
from mandelstudio.models import LocaleAuditIssue, LocaleAuditRun
|
|
|
|
logger = logging.getLogger("mandelstudio.multilingual")
|
|
|
|
|
|
def expected_locale(instance: Any) -> str:
|
|
locale = getattr(instance, "locale", None)
|
|
if locale is not None and getattr(locale, "language_code", None):
|
|
return locale.language_code
|
|
return "nl"
|
|
|
|
|
|
def iter_text_nodes(value: Any, path: str = ""):
|
|
if value is None:
|
|
return
|
|
if isinstance(value, str):
|
|
yield path, value
|
|
return
|
|
if hasattr(value, "raw_data"):
|
|
yield from iter_text_nodes(list(value.raw_data), path)
|
|
return
|
|
if isinstance(value, list):
|
|
for index, item in enumerate(value):
|
|
yield from iter_text_nodes(item, f"{path}[{index}]")
|
|
return
|
|
if isinstance(value, dict):
|
|
for key, item in value.items():
|
|
child_path = f"{path}.{key}" if path else str(key)
|
|
yield from iter_text_nodes(item, child_path)
|
|
|
|
|
|
def extract_instance_text(instance: Any) -> list[tuple[str, str]]:
|
|
nodes: list[tuple[str, str]] = []
|
|
for field_name in ["title", "seo_title", "search_description"]:
|
|
value = getattr(instance, field_name, None)
|
|
if isinstance(value, str) and value.strip():
|
|
nodes.append((field_name, value))
|
|
for field_name in ["body", "content", "footer", "mini_footer"]:
|
|
if hasattr(instance, field_name):
|
|
nodes.extend(list(iter_text_nodes(getattr(instance, field_name), field_name)))
|
|
return nodes
|
|
|
|
|
|
def validate_text_nodes(locale_code: str, nodes: list[tuple[str, str]]):
|
|
issues = []
|
|
for field_path, raw_text in nodes:
|
|
normalized = normalize_text(raw_text)
|
|
if not normalized:
|
|
continue
|
|
issues.extend(validate_patterns(locale_code, field_path, normalized))
|
|
issues.extend(validate_cta(locale_code, field_path, normalized))
|
|
issues.extend(validate_form_copy(locale_code, field_path, normalized))
|
|
if len(normalized) >= 80:
|
|
mismatch = detect_language_mismatch(locale_code, normalized)
|
|
if mismatch:
|
|
issues.append(make_issue("language_heuristic", field_path, mismatch["message"]))
|
|
return dedupe_issues(issues)
|
|
|
|
|
|
REWRITE_REVIEW_TYPES = {
|
|
"known_bad_pattern",
|
|
"wrong_language_fragment",
|
|
"rendered_bad_pattern",
|
|
"rendered_wrong_language",
|
|
"rewrite_candidate",
|
|
"weak_marketing_copy",
|
|
"foreign_ui_label",
|
|
"generic_badge_label",
|
|
"mixed_locale_heading",
|
|
"cta_language_mismatch",
|
|
}
|
|
|
|
|
|
def validate_page(page: Page):
|
|
return validate_text_nodes(expected_locale(page), extract_instance_text(page.specific))
|
|
|
|
|
|
def validate_snippet_instance(instance: Any):
|
|
return validate_text_nodes(expected_locale(instance), extract_instance_text(instance))
|
|
|
|
|
|
def validate_posted_snippet(locale_code: str, payload: dict[str, Any]):
|
|
nodes = [(key, value) for key, value in payload.items() if isinstance(value, str)]
|
|
return validate_text_nodes(locale_code, nodes)
|
|
|
|
|
|
def _replace_known_strings(value: Any, locale_code: str):
|
|
changes = []
|
|
if isinstance(value, str):
|
|
new = value
|
|
for bad, replacements in KNOWN_REPLACEMENTS.items():
|
|
replacement = replacements.get(locale_code)
|
|
if replacement and bad in new:
|
|
new = new.replace(bad, replacement)
|
|
changes.append({"bad": bad, "replacement": replacement})
|
|
return new, changes, new != value
|
|
if isinstance(value, list):
|
|
out = []
|
|
changed = False
|
|
for item in value:
|
|
new_item, item_changes, item_changed = _replace_known_strings(item, locale_code)
|
|
out.append(new_item)
|
|
changes.extend(item_changes)
|
|
changed = changed or item_changed
|
|
return out, changes, changed
|
|
if isinstance(value, dict):
|
|
out = {}
|
|
changed = False
|
|
for key, item in value.items():
|
|
new_item, item_changes, item_changed = _replace_known_strings(item, locale_code)
|
|
out[key] = new_item
|
|
changes.extend(item_changes)
|
|
changed = changed or item_changed
|
|
return out, changes, changed
|
|
return value, changes, False
|
|
|
|
|
|
def apply_known_replacements(instance: Any, locale_code: str):
|
|
changes = []
|
|
for field_name in ["title", "seo_title", "search_description"]:
|
|
value = getattr(instance, field_name, None)
|
|
if not isinstance(value, str):
|
|
continue
|
|
new_value, field_changes, changed = _replace_known_strings(value, locale_code)
|
|
if changed:
|
|
setattr(instance, field_name, new_value)
|
|
changes.extend({"field": field_name, **change} for change in field_changes)
|
|
|
|
for field_name in ["body", "content", "footer", "mini_footer"]:
|
|
if not hasattr(instance, field_name):
|
|
continue
|
|
field_value = getattr(instance, field_name)
|
|
if hasattr(field_value, "raw_data"):
|
|
new_raw, field_changes, changed = _replace_known_strings(list(field_value.raw_data), locale_code)
|
|
if changed:
|
|
setattr(instance, field_name, new_raw)
|
|
changes.extend({"field": field_name, **change} for change in field_changes)
|
|
elif isinstance(field_value, str):
|
|
new_value, field_changes, changed = _replace_known_strings(field_value, locale_code)
|
|
if changed:
|
|
setattr(instance, field_name, new_value)
|
|
changes.extend({"field": field_name, **change} for change in field_changes)
|
|
|
|
if not changes:
|
|
return []
|
|
if isinstance(instance, Page):
|
|
revision = instance.save_revision()
|
|
if instance.live:
|
|
revision.publish()
|
|
return changes
|
|
instance.save()
|
|
return changes
|
|
|
|
|
|
def rewrite_with_agent(instance: Any, locale_code: str, issues, *, dry_run: bool = False):
|
|
if not rewrite_enabled():
|
|
return []
|
|
agent = get_language_agent(locale_code)
|
|
issue_map = agent.build_issue_map(issues)
|
|
changes = []
|
|
|
|
for field_name in ["title", "seo_title", "search_description"]:
|
|
value = getattr(instance, field_name, None)
|
|
if not isinstance(value, str):
|
|
continue
|
|
field_issues = issue_map.get(field_name, [])
|
|
rewritten = agent.rewrite(value, field_path=field_name, issues=field_issues)
|
|
if rewritten != value:
|
|
setattr(instance, field_name, rewritten)
|
|
changes.append({"field": field_name, "before": value, "after": rewritten, "method": "agent"})
|
|
|
|
for field_name in ["body", "content", "footer", "mini_footer"]:
|
|
if not hasattr(instance, field_name):
|
|
continue
|
|
field_value = getattr(instance, field_name)
|
|
if hasattr(field_value, "raw_data"):
|
|
rewritten, changed = agent.process_block(list(field_value.raw_data), field_name, issue_map)
|
|
if changed:
|
|
setattr(instance, field_name, rewritten)
|
|
changes.append({"field": field_name, "method": "agent"})
|
|
elif isinstance(field_value, str):
|
|
rewritten = agent.rewrite(field_value, field_path=field_name, issues=issue_map.get(field_name, []))
|
|
if rewritten != field_value:
|
|
setattr(instance, field_name, rewritten)
|
|
changes.append({"field": field_name, "before": field_value, "after": rewritten, "method": "agent"})
|
|
|
|
if not changes or dry_run:
|
|
return changes
|
|
if isinstance(instance, Page):
|
|
revision = instance.save_revision()
|
|
if instance.live:
|
|
revision.publish()
|
|
return changes
|
|
instance.save()
|
|
return changes
|
|
|
|
|
|
def enumerate_public_pages(locale_codes: list[str] | None = None, url_filters: list[str] | None = None):
|
|
result = {}
|
|
site = Site.objects.order_by("id").first()
|
|
site_root = getattr(site, "root_page", None)
|
|
normalized_filters = set(url_filters or [])
|
|
for locale_code in (locale_codes or audit_default_locales()):
|
|
locale_root_path = None
|
|
if site_root is not None:
|
|
translated_root = (
|
|
Page.objects.filter(
|
|
translation_key=site_root.translation_key,
|
|
locale__language_code=locale_code,
|
|
)
|
|
.specific()
|
|
.first()
|
|
)
|
|
chosen_root = translated_root or site_root
|
|
locale_root_path = getattr(chosen_root, "path", None)
|
|
qs = (
|
|
Page.objects.filter(locale__language_code=locale_code)
|
|
.live()
|
|
.public()
|
|
.specific()
|
|
.order_by("path")
|
|
)
|
|
pages = []
|
|
for page in qs:
|
|
page_url = getattr(page, "url", None)
|
|
if not page_url:
|
|
continue
|
|
if locale_root_path and not page.path.startswith(locale_root_path):
|
|
continue
|
|
if normalized_filters and page_url not in normalized_filters:
|
|
continue
|
|
pages.append(page)
|
|
result[locale_code] = pages
|
|
return result
|
|
|
|
|
|
def fetch_rendered_text(page: Page):
|
|
page_url = getattr(page, "url", None)
|
|
if not page_url:
|
|
return 598, "missing page URL"
|
|
if str(page_url).startswith("http"):
|
|
full_url = page_url
|
|
else:
|
|
try:
|
|
site = page.get_site()
|
|
except Site.DoesNotExist:
|
|
site = None
|
|
site = site or Site.objects.order_by("id").first()
|
|
if site is None or not getattr(site, "root_url", None):
|
|
return 598, "missing site root_url"
|
|
full_url = f"{site.root_url}{page_url}"
|
|
request = Request(full_url, headers={"User-Agent": "mandelstudio-audit/1.0"})
|
|
try:
|
|
with urlopen(request, timeout=30) as response:
|
|
status = response.getcode()
|
|
body = response.read().decode("utf-8", errors="replace")
|
|
except HTTPError as exc:
|
|
status = exc.code
|
|
body = exc.read().decode("utf-8", errors="replace")
|
|
except URLError as exc:
|
|
status = 599
|
|
body = str(exc)
|
|
text = extract_visible_rendered_text(body)
|
|
return status, text
|
|
|
|
|
|
def iter_rendered_lines(rendered_text: str) -> list[str]:
|
|
lines = []
|
|
for chunk in re.split(r"(?<=[\.\!\?])\s+|\s{2,}", rendered_text):
|
|
normalized = normalize_text(chunk)
|
|
if normalized:
|
|
lines.append(normalized)
|
|
return lines
|
|
|
|
|
|
def validate_rendered_output(locale_code: str, rendered_text: str, status_code: int):
|
|
issues = []
|
|
if status_code != 200:
|
|
issues.append(make_issue("render_status", "rendered", str(status_code)))
|
|
source_counter = Counter()
|
|
for line in iter_rendered_lines(rendered_text):
|
|
line_issues = validate_patterns(locale_code, "rendered", line)
|
|
for issue in line_issues:
|
|
issue.bad_value = line
|
|
issue.extra = {**(issue.extra or {}), "source": "rendered"}
|
|
source_counter[(issue.issue_type, issue.bad_value)] += 1
|
|
issues.extend(line_issues)
|
|
for issue in issues:
|
|
if issue.extra is not None:
|
|
issue.extra["count"] = source_counter.get((issue.issue_type, issue.bad_value), 1)
|
|
for fragment in GLOBAL_BAD_PATTERNS:
|
|
if fragment in rendered_text:
|
|
issue = make_issue("rendered_bad_pattern", "rendered", fragment, KNOWN_REPLACEMENTS.get(fragment, {}).get(locale_code, ""))
|
|
issue.extra = {"source": "rendered", "count": 1}
|
|
issues.append(issue)
|
|
for fragment in LOCALE_FORBIDDEN.get(locale_code, ()):
|
|
if fragment in rendered_text:
|
|
issue = make_issue("rendered_wrong_language", "rendered", fragment, KNOWN_REPLACEMENTS.get(fragment, {}).get(locale_code, ""))
|
|
issue.extra = {"source": "rendered", "count": 1}
|
|
issues.append(issue)
|
|
return dedupe_issues(issues)
|
|
|
|
|
|
def annotate_rewrite_previews(locale_code: str, issues):
|
|
agent = get_language_agent(locale_code)
|
|
for issue in issues:
|
|
if issue.issue_type not in REWRITE_REVIEW_TYPES:
|
|
continue
|
|
if issue.replacement:
|
|
continue
|
|
preview = agent.rewrite(issue.bad_value, field_path=issue.field_path, issues=[issue])
|
|
if preview and preview != issue.bad_value:
|
|
issue.replacement = preview
|
|
issue.extra = {**(issue.extra or {}), "review_candidate": True}
|
|
return issues
|
|
|
|
|
|
def validate_instance_or_raise(instance: Any):
|
|
issues = validate_page(instance) if isinstance(instance, Page) else validate_snippet_instance(instance)
|
|
blocking = [issue for issue in issues if issue.blocks]
|
|
if not blocking:
|
|
return issues
|
|
raise ValidationError({"content_guard": [format_issue(issue) for issue in blocking]})
|
|
|
|
|
|
def validate_ai_text_or_raise(locale_code: str, field_path: str, value: str):
|
|
issues = validate_text_nodes(locale_code, [(field_path, value)])
|
|
blocking = [issue for issue in issues if issue.blocks]
|
|
if not blocking:
|
|
return issues
|
|
raise ValidationError({"content_guard": [format_issue(issue) for issue in blocking]})
|
|
|
|
|
|
def record_issues(run: LocaleAuditRun, locale_code: str, obj: Any, issues, *, fixed: bool = False) -> None:
|
|
for issue in issues:
|
|
LocaleAuditIssue.objects.create(
|
|
run=run,
|
|
locale_code=locale_code,
|
|
object_id=getattr(obj, "pk", None),
|
|
object_type=obj.__class__.__name__,
|
|
url=getattr(obj, "url", "") or "",
|
|
title=getattr(obj, "title", str(obj))[:255],
|
|
severity=issue.severity,
|
|
issue_type=issue.issue_type,
|
|
field_path=issue.field_path,
|
|
bad_value=issue.bad_value,
|
|
replacement=issue.replacement,
|
|
fixed=fixed,
|
|
extra=issue.extra or {},
|
|
)
|
|
|
|
|
|
def audit_locales(locale_codes: list[str], fix: bool = False, rewrite: bool = False, dry_run: bool = False, url_filters: list[str] | None = None) -> LocaleAuditRun:
|
|
run = LocaleAuditRun.objects.create(locale_codes=locale_codes, fix_enabled=fix or rewrite)
|
|
pages_by_locale = enumerate_public_pages(locale_codes, url_filters=url_filters)
|
|
summary: dict[str, Any] = {}
|
|
total_checked = 0
|
|
total_issues = 0
|
|
pages_with_issues = 0
|
|
|
|
for locale_code, pages in pages_by_locale.items():
|
|
locale_summary = {"total_urls_checked": len(pages), "issues_found": 0, "issues_fixed": 0, "remaining_issues": 0, "by_severity": {"block": 0, "warn": 0, "log": 0}}
|
|
for page in pages:
|
|
total_checked += 1
|
|
status_code, rendered = fetch_rendered_text(page)
|
|
issues = dedupe_issues(validate_page(page) + validate_rendered_output(locale_code, rendered, status_code))
|
|
if rewrite:
|
|
issues = annotate_rewrite_previews(locale_code, issues)
|
|
initial_issue_count = len(issues)
|
|
fixed_changes = []
|
|
if issues and fix:
|
|
fixed_changes = apply_known_replacements(page.specific, locale_code)
|
|
if fixed_changes:
|
|
record_issues(run, locale_code, page, issues, fixed=True)
|
|
status_code, rendered = fetch_rendered_text(page.specific)
|
|
issues = dedupe_issues(validate_page(page.specific) + validate_rendered_output(locale_code, rendered, status_code))
|
|
if rewrite:
|
|
issues = annotate_rewrite_previews(locale_code, issues)
|
|
if issues and rewrite:
|
|
rewrite_changes = rewrite_with_agent(page.specific, locale_code, issues, dry_run=dry_run)
|
|
if rewrite_changes:
|
|
record_issues(run, locale_code, page, issues, fixed=not dry_run)
|
|
if not dry_run:
|
|
status_code, rendered = fetch_rendered_text(page.specific)
|
|
issues = dedupe_issues(validate_page(page.specific) + validate_rendered_output(locale_code, rendered, status_code))
|
|
issues = annotate_rewrite_previews(locale_code, issues)
|
|
if issues:
|
|
pages_with_issues += 1
|
|
record_issues(run, locale_code, page, issues)
|
|
locale_summary["issues_found"] += initial_issue_count
|
|
locale_summary["issues_fixed"] += initial_issue_count - len(issues)
|
|
locale_summary["remaining_issues"] += len(issues)
|
|
for issue in issues:
|
|
locale_summary["by_severity"][issue.severity] = locale_summary["by_severity"].get(issue.severity, 0) + 1
|
|
total_issues += initial_issue_count
|
|
summary[locale_code] = locale_summary
|
|
|
|
snippet_summary = {}
|
|
for model in get_snippet_models():
|
|
count = 0
|
|
for instance in model.objects.all():
|
|
issues = validate_snippet_instance(instance)
|
|
if rewrite:
|
|
issues = annotate_rewrite_previews(expected_locale(instance), issues)
|
|
if issues and rewrite:
|
|
rewrite_changes = rewrite_with_agent(instance, expected_locale(instance), issues, dry_run=dry_run)
|
|
if rewrite_changes and not dry_run:
|
|
issues = validate_snippet_instance(instance)
|
|
if not issues:
|
|
continue
|
|
count += len(issues)
|
|
record_issues(run, expected_locale(instance), instance, issues)
|
|
if count:
|
|
snippet_summary[model.__name__] = count
|
|
total_issues += count
|
|
summary["snippets"] = snippet_summary
|
|
|
|
run.total_urls_checked = total_checked
|
|
run.issues_found = total_issues
|
|
run.pages_with_issues = pages_with_issues
|
|
run.summary = summary
|
|
run.finished_at = timezone.now()
|
|
run.save(update_fields=["total_urls_checked", "issues_found", "pages_with_issues", "summary", "finished_at"])
|
|
logger.info("Completed multilingual audit run %s", run.pk)
|
|
return run
|