Add multilingual audit CI pipeline + extract mandelblog_content_guard
This commit is contained in:
187
mandelblog_content_guard/agents/base.py
Normal file
187
mandelblog_content_guard/agents/base.py
Normal file
@@ -0,0 +1,187 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from typing import Any
|
||||
|
||||
from django.utils.module_loading import import_string
|
||||
|
||||
from ..settings import get_rewrite_backend
|
||||
|
||||
|
||||
class BaseLanguageAgent:
|
||||
locale = "nl"
|
||||
tone = "business"
|
||||
preferred_formality = "neutral"
|
||||
cta_defaults: dict[str, str] = {}
|
||||
vocabulary_map: dict[str, str] = {}
|
||||
contextual_vocabulary_map: dict[str, dict[str, str]] = {}
|
||||
cleanup_patterns: tuple[tuple[re.Pattern[str], str], ...] = (
|
||||
(
|
||||
re.compile(
|
||||
r"""^.*?\bis\s+(?:German|Spanish|French|Italian|Portuguese|Dutch),\s+not\s+Dutch.*?(?::\s*|\"\.\s*)(?P<quote>.+?)\"?\.?\s*$""",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
"{quote}",
|
||||
),
|
||||
(
|
||||
re.compile(
|
||||
r"""^.*?\btranslation\s+from\s+.*?(?::\s*|\"\.\s*)(?P<quote>.+?)\"?\.?\s*$""",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
"{quote}",
|
||||
),
|
||||
(
|
||||
re.compile(
|
||||
r"""^.*?\btraducid[oa]\s+al\s+.*?(?::\s*|\"\.\s*)(?P<quote>.+?)\"?\.?\s*$""",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
"{quote}",
|
||||
),
|
||||
(
|
||||
re.compile(
|
||||
r"""^.*?\bперевод\s+с\s+.*?(?::\s*|\"\.\s*)(?P<quote>.+?)\"?\.?\s*$""",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
"{quote}",
|
||||
),
|
||||
(
|
||||
re.compile(
|
||||
r"""^\s*La\s+entrada\s+\"?(?P<quote>.+?)\"?\s+está\s+en\s+alemán.*$""",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
"{quote}",
|
||||
),
|
||||
)
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.backend = self._load_backend()
|
||||
|
||||
def _load_backend(self):
|
||||
backend_path = get_rewrite_backend()
|
||||
if not backend_path:
|
||||
return None
|
||||
return import_string(backend_path)
|
||||
|
||||
def backend_prompt(self, field_path: str, text: str) -> str:
|
||||
return (
|
||||
f"Rewrite the following {self.locale} website copy for a small-business "
|
||||
f"website in a natural, professional, sales-driven tone. Preserve meaning, "
|
||||
f"remove translation artifacts, keep it concise, and do not add commentary.\n"
|
||||
f"Field: {field_path}\n"
|
||||
f"Locale: {self.locale}\n"
|
||||
f"Tone: {self.tone}\n"
|
||||
f"Formality: {self.preferred_formality}\n"
|
||||
f"Text: {text}"
|
||||
)
|
||||
|
||||
def _contextual_replacements(self, field_path: str) -> dict[str, str]:
|
||||
lowered = field_path.lower()
|
||||
replacements: dict[str, str] = {}
|
||||
for token, mapping in self.contextual_vocabulary_map.items():
|
||||
if token in lowered:
|
||||
replacements.update(mapping)
|
||||
return replacements
|
||||
|
||||
def post_cleanup_text(self, text: str, field_path: str = "") -> str:
|
||||
return text
|
||||
|
||||
def _apply_replacements(self, text: str, replacements: dict[str, str]) -> str:
|
||||
cleaned = text
|
||||
phrase_replacements = {}
|
||||
token_replacements = {}
|
||||
for source, target in replacements.items():
|
||||
if not source:
|
||||
continue
|
||||
if re.fullmatch(r"[\wÀ-ÿ-]+", source, flags=re.UNICODE):
|
||||
token_replacements[source] = target
|
||||
else:
|
||||
phrase_replacements[source] = target
|
||||
|
||||
for source, target in sorted(phrase_replacements.items(), key=lambda item: len(item[0]), reverse=True):
|
||||
cleaned = cleaned.replace(source, target)
|
||||
|
||||
for source, target in sorted(token_replacements.items(), key=lambda item: len(item[0]), reverse=True):
|
||||
pattern = re.compile(rf"(?<![\wÀ-ÿ-]){re.escape(source)}(?![\wÀ-ÿ-])", re.UNICODE)
|
||||
cleaned = pattern.sub(target, cleaned)
|
||||
return cleaned
|
||||
|
||||
def cleanup_text(self, text: str, field_path: str = "") -> str:
|
||||
cleaned = text.strip()
|
||||
for pattern, replacement in self.cleanup_patterns:
|
||||
match = pattern.match(cleaned)
|
||||
if not match:
|
||||
continue
|
||||
cleaned = replacement.format(**match.groupdict()).strip()
|
||||
cleaned = self._apply_replacements(cleaned, self.vocabulary_map)
|
||||
cleaned = self._apply_replacements(cleaned, self._contextual_replacements(field_path))
|
||||
cleaned = self.post_cleanup_text(cleaned, field_path=field_path)
|
||||
return re.sub(r"\s+", " ", cleaned).strip()
|
||||
|
||||
def normalize_cta(self, text: str, field_path: str = "") -> str:
|
||||
normalized = self.cleanup_text(text, field_path=field_path)
|
||||
lowered = normalized.lower()
|
||||
for keyword, replacement in self.cta_defaults.items():
|
||||
if keyword in lowered:
|
||||
return replacement
|
||||
return normalized
|
||||
|
||||
def rewrite(self, text: str, field_path: str = "", issues: list[Any] | None = None) -> str:
|
||||
cleaned = self.cleanup_text(text, field_path=field_path)
|
||||
lowered_path = field_path.lower()
|
||||
if any(token in lowered_path for token in ("cta", "button", "link_text", "submit")):
|
||||
cleaned = self.normalize_cta(cleaned, field_path=field_path)
|
||||
elif issues and any(
|
||||
issue.issue_type in {"generic_badge_label", "foreign_ui_label", "weak_marketing_copy", "mixed_locale_heading"}
|
||||
for issue in issues
|
||||
):
|
||||
cleaned = self.cleanup_text(cleaned, field_path=field_path)
|
||||
if self.backend:
|
||||
rewritten = self.backend(
|
||||
locale=self.locale,
|
||||
field_path=field_path,
|
||||
text=cleaned,
|
||||
prompt=self.backend_prompt(field_path, cleaned),
|
||||
)
|
||||
if isinstance(rewritten, str) and rewritten.strip():
|
||||
cleaned = rewritten.strip()
|
||||
return cleaned
|
||||
|
||||
def process_block(self, block_data: Any, field_path: str = "", issue_map: dict[str, list[Any]] | None = None):
|
||||
issue_map = issue_map or {}
|
||||
if isinstance(block_data, dict):
|
||||
changed = False
|
||||
output = {}
|
||||
for key, value in block_data.items():
|
||||
child_path = f"{field_path}.{key}" if field_path else str(key)
|
||||
new_value, child_changed = self.process_block(value, child_path, issue_map)
|
||||
output[key] = new_value
|
||||
changed = changed or child_changed
|
||||
return output, changed
|
||||
if isinstance(block_data, list):
|
||||
changed = False
|
||||
output = []
|
||||
for index, value in enumerate(block_data):
|
||||
child_path = f"{field_path}[{index}]"
|
||||
new_value, child_changed = self.process_block(value, child_path, issue_map)
|
||||
output.append(new_value)
|
||||
changed = changed or child_changed
|
||||
return output, changed
|
||||
if isinstance(block_data, str):
|
||||
issues = issue_map.get(field_path, [])
|
||||
needs_rewrite = bool(issues) or any(
|
||||
token in field_path for token in ("cta", "button", "label", "placeholder", "help_text")
|
||||
)
|
||||
if not needs_rewrite:
|
||||
cleaned = self.cleanup_text(block_data)
|
||||
return cleaned, cleaned != block_data
|
||||
rewritten = self.rewrite(block_data, field_path=field_path, issues=issues)
|
||||
return rewritten, rewritten != block_data
|
||||
return block_data, False
|
||||
|
||||
def build_issue_map(self, issues: list[Any]) -> dict[str, list[Any]]:
|
||||
issue_map: dict[str, list[Any]] = defaultdict(list)
|
||||
for issue in issues:
|
||||
if issue.field_path:
|
||||
issue_map[issue.field_path].append(issue)
|
||||
return issue_map
|
||||
Reference in New Issue
Block a user