Files
mandelstudio/mandelblog_content_guard/agents/base.py

188 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import re
from collections import defaultdict
from typing import Any
from django.utils.module_loading import import_string
from ..settings import get_rewrite_backend
class BaseLanguageAgent:
locale = "nl"
tone = "business"
preferred_formality = "neutral"
cta_defaults: dict[str, str] = {}
vocabulary_map: dict[str, str] = {}
contextual_vocabulary_map: dict[str, dict[str, str]] = {}
cleanup_patterns: tuple[tuple[re.Pattern[str], str], ...] = (
(
re.compile(
r"""^.*?\bis\s+(?:German|Spanish|French|Italian|Portuguese|Dutch),\s+not\s+Dutch.*?(?::\s*|\"\.\s*)(?P<quote>.+?)\"?\.?\s*$""",
re.IGNORECASE,
),
"{quote}",
),
(
re.compile(
r"""^.*?\btranslation\s+from\s+.*?(?::\s*|\"\.\s*)(?P<quote>.+?)\"?\.?\s*$""",
re.IGNORECASE,
),
"{quote}",
),
(
re.compile(
r"""^.*?\btraducid[oa]\s+al\s+.*?(?::\s*|\"\.\s*)(?P<quote>.+?)\"?\.?\s*$""",
re.IGNORECASE,
),
"{quote}",
),
(
re.compile(
r"""^.*?\bперевод\s+с\s+.*?(?::\s*|\"\.\s*)(?P<quote>.+?)\"?\.?\s*$""",
re.IGNORECASE,
),
"{quote}",
),
(
re.compile(
r"""^\s*La\s+entrada\s+\"?(?P<quote>.+?)\"?\s+está\s+en\s+alemán.*$""",
re.IGNORECASE,
),
"{quote}",
),
)
def __init__(self) -> None:
self.backend = self._load_backend()
def _load_backend(self):
backend_path = get_rewrite_backend()
if not backend_path:
return None
return import_string(backend_path)
def backend_prompt(self, field_path: str, text: str) -> str:
return (
f"Rewrite the following {self.locale} website copy for a small-business "
f"website in a natural, professional, sales-driven tone. Preserve meaning, "
f"remove translation artifacts, keep it concise, and do not add commentary.\n"
f"Field: {field_path}\n"
f"Locale: {self.locale}\n"
f"Tone: {self.tone}\n"
f"Formality: {self.preferred_formality}\n"
f"Text: {text}"
)
def _contextual_replacements(self, field_path: str) -> dict[str, str]:
lowered = field_path.lower()
replacements: dict[str, str] = {}
for token, mapping in self.contextual_vocabulary_map.items():
if token in lowered:
replacements.update(mapping)
return replacements
def post_cleanup_text(self, text: str, field_path: str = "") -> str:
return text
def _apply_replacements(self, text: str, replacements: dict[str, str]) -> str:
cleaned = text
phrase_replacements = {}
token_replacements = {}
for source, target in replacements.items():
if not source:
continue
if re.fullmatch(r"[\wÀ-ÿ-]+", source, flags=re.UNICODE):
token_replacements[source] = target
else:
phrase_replacements[source] = target
for source, target in sorted(phrase_replacements.items(), key=lambda item: len(item[0]), reverse=True):
cleaned = cleaned.replace(source, target)
for source, target in sorted(token_replacements.items(), key=lambda item: len(item[0]), reverse=True):
pattern = re.compile(rf"(?<![\wÀ-ÿ-]){re.escape(source)}(?![\wÀ-ÿ-])", re.UNICODE)
cleaned = pattern.sub(target, cleaned)
return cleaned
def cleanup_text(self, text: str, field_path: str = "") -> str:
cleaned = text.strip()
for pattern, replacement in self.cleanup_patterns:
match = pattern.match(cleaned)
if not match:
continue
cleaned = replacement.format(**match.groupdict()).strip()
cleaned = self._apply_replacements(cleaned, self.vocabulary_map)
cleaned = self._apply_replacements(cleaned, self._contextual_replacements(field_path))
cleaned = self.post_cleanup_text(cleaned, field_path=field_path)
return re.sub(r"\s+", " ", cleaned).strip()
def normalize_cta(self, text: str, field_path: str = "") -> str:
normalized = self.cleanup_text(text, field_path=field_path)
lowered = normalized.lower()
for keyword, replacement in self.cta_defaults.items():
if keyword in lowered:
return replacement
return normalized
def rewrite(self, text: str, field_path: str = "", issues: list[Any] | None = None) -> str:
cleaned = self.cleanup_text(text, field_path=field_path)
lowered_path = field_path.lower()
if any(token in lowered_path for token in ("cta", "button", "link_text", "submit")):
cleaned = self.normalize_cta(cleaned, field_path=field_path)
elif issues and any(
issue.issue_type in {"generic_badge_label", "foreign_ui_label", "weak_marketing_copy", "mixed_locale_heading"}
for issue in issues
):
cleaned = self.cleanup_text(cleaned, field_path=field_path)
if self.backend:
rewritten = self.backend(
locale=self.locale,
field_path=field_path,
text=cleaned,
prompt=self.backend_prompt(field_path, cleaned),
)
if isinstance(rewritten, str) and rewritten.strip():
cleaned = rewritten.strip()
return cleaned
def process_block(self, block_data: Any, field_path: str = "", issue_map: dict[str, list[Any]] | None = None):
issue_map = issue_map or {}
if isinstance(block_data, dict):
changed = False
output = {}
for key, value in block_data.items():
child_path = f"{field_path}.{key}" if field_path else str(key)
new_value, child_changed = self.process_block(value, child_path, issue_map)
output[key] = new_value
changed = changed or child_changed
return output, changed
if isinstance(block_data, list):
changed = False
output = []
for index, value in enumerate(block_data):
child_path = f"{field_path}[{index}]"
new_value, child_changed = self.process_block(value, child_path, issue_map)
output.append(new_value)
changed = changed or child_changed
return output, changed
if isinstance(block_data, str):
issues = issue_map.get(field_path, [])
needs_rewrite = bool(issues) or any(
token in field_path for token in ("cta", "button", "label", "placeholder", "help_text")
)
if not needs_rewrite:
cleaned = self.cleanup_text(block_data)
return cleaned, cleaned != block_data
rewritten = self.rewrite(block_data, field_path=field_path, issues=issues)
return rewritten, rewritten != block_data
return block_data, False
def build_issue_map(self, issues: list[Any]) -> dict[str, list[Any]]:
issue_map: dict[str, list[Any]] = defaultdict(list)
for issue in issues:
if issue.field_path:
issue_map[issue.field_path].append(issue)
return issue_map