from __future__ import annotations import logging import re from collections import Counter from typing import Any from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from django.core.exceptions import ValidationError from django.utils import timezone from wagtail.models import Page, Site from wagtail.snippets.models import get_snippet_models from ..agents import get_language_agent from ..extractors.visible_text import extract_visible_rendered_text, normalize_text from ..settings import audit_default_locales, rewrite_enabled from ..types import dedupe_issues, format_issue, make_issue from .rules.cta import validate_cta from .rules.forms import validate_form_copy from .rules.language import detect_language_mismatch from .rules.patterns import ( GLOBAL_BAD_PATTERNS, KNOWN_REPLACEMENTS, LOCALE_FORBIDDEN, validate_patterns, ) from mandelstudio.models import LocaleAuditIssue, LocaleAuditRun logger = logging.getLogger("mandelstudio.multilingual") def expected_locale(instance: Any) -> str: locale = getattr(instance, "locale", None) if locale is not None and getattr(locale, "language_code", None): return locale.language_code return "nl" def iter_text_nodes(value: Any, path: str = ""): if value is None: return if isinstance(value, str): yield path, value return if hasattr(value, "raw_data"): yield from iter_text_nodes(list(value.raw_data), path) return if isinstance(value, list): for index, item in enumerate(value): yield from iter_text_nodes(item, f"{path}[{index}]") return if isinstance(value, dict): for key, item in value.items(): child_path = f"{path}.{key}" if path else str(key) yield from iter_text_nodes(item, child_path) def extract_instance_text(instance: Any) -> list[tuple[str, str]]: nodes: list[tuple[str, str]] = [] for field_name in ["title", "seo_title", "search_description"]: value = getattr(instance, field_name, None) if isinstance(value, str) and value.strip(): nodes.append((field_name, value)) for field_name in ["body", "content", "footer", "mini_footer"]: if hasattr(instance, field_name): nodes.extend(list(iter_text_nodes(getattr(instance, field_name), field_name))) return nodes def validate_text_nodes(locale_code: str, nodes: list[tuple[str, str]]): issues = [] for field_path, raw_text in nodes: normalized = normalize_text(raw_text) if not normalized: continue issues.extend(validate_patterns(locale_code, field_path, normalized)) issues.extend(validate_cta(locale_code, field_path, normalized)) issues.extend(validate_form_copy(locale_code, field_path, normalized)) if len(normalized) >= 80: mismatch = detect_language_mismatch(locale_code, normalized) if mismatch: issues.append(make_issue("language_heuristic", field_path, mismatch["message"])) return dedupe_issues(issues) REWRITE_REVIEW_TYPES = { "known_bad_pattern", "wrong_language_fragment", "rendered_bad_pattern", "rendered_wrong_language", "rewrite_candidate", "weak_marketing_copy", "foreign_ui_label", "generic_badge_label", "mixed_locale_heading", "cta_language_mismatch", } def validate_page(page: Page): return validate_text_nodes(expected_locale(page), extract_instance_text(page.specific)) def validate_snippet_instance(instance: Any): return validate_text_nodes(expected_locale(instance), extract_instance_text(instance)) def validate_posted_snippet(locale_code: str, payload: dict[str, Any]): nodes = [(key, value) for key, value in payload.items() if isinstance(value, str)] return validate_text_nodes(locale_code, nodes) def _replace_known_strings(value: Any, locale_code: str): changes = [] if isinstance(value, str): new = value for bad, replacements in KNOWN_REPLACEMENTS.items(): replacement = replacements.get(locale_code) if replacement and bad in new: new = new.replace(bad, replacement) changes.append({"bad": bad, "replacement": replacement}) return new, changes, new != value if isinstance(value, list): out = [] changed = False for item in value: new_item, item_changes, item_changed = _replace_known_strings(item, locale_code) out.append(new_item) changes.extend(item_changes) changed = changed or item_changed return out, changes, changed if isinstance(value, dict): out = {} changed = False for key, item in value.items(): new_item, item_changes, item_changed = _replace_known_strings(item, locale_code) out[key] = new_item changes.extend(item_changes) changed = changed or item_changed return out, changes, changed return value, changes, False def apply_known_replacements(instance: Any, locale_code: str): changes = [] for field_name in ["title", "seo_title", "search_description"]: value = getattr(instance, field_name, None) if not isinstance(value, str): continue new_value, field_changes, changed = _replace_known_strings(value, locale_code) if changed: setattr(instance, field_name, new_value) changes.extend({"field": field_name, **change} for change in field_changes) for field_name in ["body", "content", "footer", "mini_footer"]: if not hasattr(instance, field_name): continue field_value = getattr(instance, field_name) if hasattr(field_value, "raw_data"): new_raw, field_changes, changed = _replace_known_strings(list(field_value.raw_data), locale_code) if changed: setattr(instance, field_name, new_raw) changes.extend({"field": field_name, **change} for change in field_changes) elif isinstance(field_value, str): new_value, field_changes, changed = _replace_known_strings(field_value, locale_code) if changed: setattr(instance, field_name, new_value) changes.extend({"field": field_name, **change} for change in field_changes) if not changes: return [] if isinstance(instance, Page): revision = instance.save_revision() if instance.live: revision.publish() return changes instance.save() return changes def rewrite_with_agent(instance: Any, locale_code: str, issues, *, dry_run: bool = False): if not rewrite_enabled(): return [] agent = get_language_agent(locale_code) issue_map = agent.build_issue_map(issues) changes = [] for field_name in ["title", "seo_title", "search_description"]: value = getattr(instance, field_name, None) if not isinstance(value, str): continue field_issues = issue_map.get(field_name, []) rewritten = agent.rewrite(value, field_path=field_name, issues=field_issues) if rewritten != value: setattr(instance, field_name, rewritten) changes.append({"field": field_name, "before": value, "after": rewritten, "method": "agent"}) for field_name in ["body", "content", "footer", "mini_footer"]: if not hasattr(instance, field_name): continue field_value = getattr(instance, field_name) if hasattr(field_value, "raw_data"): rewritten, changed = agent.process_block(list(field_value.raw_data), field_name, issue_map) if changed: setattr(instance, field_name, rewritten) changes.append({"field": field_name, "method": "agent"}) elif isinstance(field_value, str): rewritten = agent.rewrite(field_value, field_path=field_name, issues=issue_map.get(field_name, [])) if rewritten != field_value: setattr(instance, field_name, rewritten) changes.append({"field": field_name, "before": field_value, "after": rewritten, "method": "agent"}) if not changes or dry_run: return changes if isinstance(instance, Page): revision = instance.save_revision() if instance.live: revision.publish() return changes instance.save() return changes def enumerate_public_pages(locale_codes: list[str] | None = None, url_filters: list[str] | None = None): result = {} site = Site.objects.order_by("id").first() site_root = getattr(site, "root_page", None) normalized_filters = set(url_filters or []) for locale_code in (locale_codes or audit_default_locales()): locale_root_path = None if site_root is not None: translated_root = ( Page.objects.filter( translation_key=site_root.translation_key, locale__language_code=locale_code, ) .specific() .first() ) chosen_root = translated_root or site_root locale_root_path = getattr(chosen_root, "path", None) qs = ( Page.objects.filter(locale__language_code=locale_code) .live() .public() .specific() .order_by("path") ) pages = [] for page in qs: page_url = getattr(page, "url", None) if not page_url: continue if locale_root_path and not page.path.startswith(locale_root_path): continue if normalized_filters and page_url not in normalized_filters: continue pages.append(page) result[locale_code] = pages return result def fetch_rendered_text(page: Page): page_url = getattr(page, "url", None) if not page_url: return 598, "missing page URL" if str(page_url).startswith("http"): full_url = page_url else: try: site = page.get_site() except Site.DoesNotExist: site = None site = site or Site.objects.order_by("id").first() if site is None or not getattr(site, "root_url", None): return 598, "missing site root_url" full_url = f"{site.root_url}{page_url}" request = Request(full_url, headers={"User-Agent": "mandelstudio-audit/1.0"}) try: with urlopen(request, timeout=30) as response: status = response.getcode() body = response.read().decode("utf-8", errors="replace") except HTTPError as exc: status = exc.code body = exc.read().decode("utf-8", errors="replace") except URLError as exc: status = 599 body = str(exc) text = extract_visible_rendered_text(body) return status, text def iter_rendered_lines(rendered_text: str) -> list[str]: lines = [] for chunk in re.split(r"(?<=[\.\!\?])\s+|\s{2,}", rendered_text): normalized = normalize_text(chunk) if normalized: lines.append(normalized) return lines def validate_rendered_output(locale_code: str, rendered_text: str, status_code: int): issues = [] if status_code != 200: issues.append(make_issue("render_status", "rendered", str(status_code))) source_counter = Counter() for line in iter_rendered_lines(rendered_text): line_issues = validate_patterns(locale_code, "rendered", line) for issue in line_issues: issue.bad_value = line issue.extra = {**(issue.extra or {}), "source": "rendered"} source_counter[(issue.issue_type, issue.bad_value)] += 1 issues.extend(line_issues) for issue in issues: if issue.extra is not None: issue.extra["count"] = source_counter.get((issue.issue_type, issue.bad_value), 1) for fragment in GLOBAL_BAD_PATTERNS: if fragment in rendered_text: issue = make_issue("rendered_bad_pattern", "rendered", fragment, KNOWN_REPLACEMENTS.get(fragment, {}).get(locale_code, "")) issue.extra = {"source": "rendered", "count": 1} issues.append(issue) for fragment in LOCALE_FORBIDDEN.get(locale_code, ()): if fragment in rendered_text: issue = make_issue("rendered_wrong_language", "rendered", fragment, KNOWN_REPLACEMENTS.get(fragment, {}).get(locale_code, "")) issue.extra = {"source": "rendered", "count": 1} issues.append(issue) return dedupe_issues(issues) def annotate_rewrite_previews(locale_code: str, issues): agent = get_language_agent(locale_code) for issue in issues: if issue.issue_type not in REWRITE_REVIEW_TYPES: continue if issue.replacement: continue preview = agent.rewrite(issue.bad_value, field_path=issue.field_path, issues=[issue]) if preview and preview != issue.bad_value: issue.replacement = preview issue.extra = {**(issue.extra or {}), "review_candidate": True} return issues def validate_instance_or_raise(instance: Any): issues = validate_page(instance) if isinstance(instance, Page) else validate_snippet_instance(instance) blocking = [issue for issue in issues if issue.blocks] if not blocking: return issues raise ValidationError({"content_guard": [format_issue(issue) for issue in blocking]}) def validate_ai_text_or_raise(locale_code: str, field_path: str, value: str): issues = validate_text_nodes(locale_code, [(field_path, value)]) blocking = [issue for issue in issues if issue.blocks] if not blocking: return issues raise ValidationError({"content_guard": [format_issue(issue) for issue in blocking]}) def record_issues(run: LocaleAuditRun, locale_code: str, obj: Any, issues, *, fixed: bool = False) -> None: for issue in issues: LocaleAuditIssue.objects.create( run=run, locale_code=locale_code, object_id=getattr(obj, "pk", None), object_type=obj.__class__.__name__, url=getattr(obj, "url", "") or "", title=getattr(obj, "title", str(obj))[:255], severity=issue.severity, issue_type=issue.issue_type, field_path=issue.field_path, bad_value=issue.bad_value, replacement=issue.replacement, fixed=fixed, extra=issue.extra or {}, ) def audit_locales(locale_codes: list[str], fix: bool = False, rewrite: bool = False, dry_run: bool = False, url_filters: list[str] | None = None) -> LocaleAuditRun: run = LocaleAuditRun.objects.create(locale_codes=locale_codes, fix_enabled=fix or rewrite) pages_by_locale = enumerate_public_pages(locale_codes, url_filters=url_filters) summary: dict[str, Any] = {} total_checked = 0 total_issues = 0 pages_with_issues = 0 for locale_code, pages in pages_by_locale.items(): locale_summary = {"total_urls_checked": len(pages), "issues_found": 0, "issues_fixed": 0, "remaining_issues": 0, "by_severity": {"block": 0, "warn": 0, "log": 0}} for page in pages: total_checked += 1 status_code, rendered = fetch_rendered_text(page) issues = dedupe_issues(validate_page(page) + validate_rendered_output(locale_code, rendered, status_code)) if rewrite: issues = annotate_rewrite_previews(locale_code, issues) initial_issue_count = len(issues) fixed_changes = [] if issues and fix: fixed_changes = apply_known_replacements(page.specific, locale_code) if fixed_changes: record_issues(run, locale_code, page, issues, fixed=True) status_code, rendered = fetch_rendered_text(page.specific) issues = dedupe_issues(validate_page(page.specific) + validate_rendered_output(locale_code, rendered, status_code)) if rewrite: issues = annotate_rewrite_previews(locale_code, issues) if issues and rewrite: rewrite_changes = rewrite_with_agent(page.specific, locale_code, issues, dry_run=dry_run) if rewrite_changes: record_issues(run, locale_code, page, issues, fixed=not dry_run) if not dry_run: status_code, rendered = fetch_rendered_text(page.specific) issues = dedupe_issues(validate_page(page.specific) + validate_rendered_output(locale_code, rendered, status_code)) issues = annotate_rewrite_previews(locale_code, issues) if issues: pages_with_issues += 1 record_issues(run, locale_code, page, issues) locale_summary["issues_found"] += initial_issue_count locale_summary["issues_fixed"] += initial_issue_count - len(issues) locale_summary["remaining_issues"] += len(issues) for issue in issues: locale_summary["by_severity"][issue.severity] = locale_summary["by_severity"].get(issue.severity, 0) + 1 total_issues += initial_issue_count summary[locale_code] = locale_summary snippet_summary = {} for model in get_snippet_models(): count = 0 for instance in model.objects.all(): issues = validate_snippet_instance(instance) if rewrite: issues = annotate_rewrite_previews(expected_locale(instance), issues) if issues and rewrite: rewrite_changes = rewrite_with_agent(instance, expected_locale(instance), issues, dry_run=dry_run) if rewrite_changes and not dry_run: issues = validate_snippet_instance(instance) if not issues: continue count += len(issues) record_issues(run, expected_locale(instance), instance, issues) if count: snippet_summary[model.__name__] = count total_issues += count summary["snippets"] = snippet_summary run.total_urls_checked = total_checked run.issues_found = total_issues run.pages_with_issues = pages_with_issues run.summary = summary run.finished_at = timezone.now() run.save(update_fields=["total_urls_checked", "issues_found", "pages_with_issues", "summary", "finished_at"]) logger.info("Completed multilingual audit run %s", run.pk) return run