from __future__ import annotations import re from collections import defaultdict from typing import Any from django.utils.module_loading import import_string from ..settings import get_rewrite_backend class BaseLanguageAgent: locale = "nl" tone = "business" preferred_formality = "neutral" cta_defaults: dict[str, str] = {} vocabulary_map: dict[str, str] = {} contextual_vocabulary_map: dict[str, dict[str, str]] = {} cleanup_patterns: tuple[tuple[re.Pattern[str], str], ...] = ( ( re.compile( r"""^.*?\bis\s+(?:German|Spanish|French|Italian|Portuguese|Dutch),\s+not\s+Dutch.*?(?::\s*|\"\.\s*)(?P.+?)\"?\.?\s*$""", re.IGNORECASE, ), "{quote}", ), ( re.compile( r"""^.*?\btranslation\s+from\s+.*?(?::\s*|\"\.\s*)(?P.+?)\"?\.?\s*$""", re.IGNORECASE, ), "{quote}", ), ( re.compile( r"""^.*?\btraducid[oa]\s+al\s+.*?(?::\s*|\"\.\s*)(?P.+?)\"?\.?\s*$""", re.IGNORECASE, ), "{quote}", ), ( re.compile( r"""^.*?\bперевод\s+с\s+.*?(?::\s*|\"\.\s*)(?P.+?)\"?\.?\s*$""", re.IGNORECASE, ), "{quote}", ), ( re.compile( r"""^\s*La\s+entrada\s+\"?(?P.+?)\"?\s+está\s+en\s+alemán.*$""", re.IGNORECASE, ), "{quote}", ), ) def __init__(self) -> None: self.backend = self._load_backend() def _load_backend(self): backend_path = get_rewrite_backend() if not backend_path: return None return import_string(backend_path) def backend_prompt(self, field_path: str, text: str) -> str: return ( f"Rewrite the following {self.locale} website copy for a small-business " f"website in a natural, professional, sales-driven tone. Preserve meaning, " f"remove translation artifacts, keep it concise, and do not add commentary.\n" f"Field: {field_path}\n" f"Locale: {self.locale}\n" f"Tone: {self.tone}\n" f"Formality: {self.preferred_formality}\n" f"Text: {text}" ) def _contextual_replacements(self, field_path: str) -> dict[str, str]: lowered = field_path.lower() replacements: dict[str, str] = {} for token, mapping in self.contextual_vocabulary_map.items(): if token in lowered: replacements.update(mapping) return replacements def post_cleanup_text(self, text: str, field_path: str = "") -> str: return text def _apply_replacements(self, text: str, replacements: dict[str, str]) -> str: cleaned = text phrase_replacements = {} token_replacements = {} for source, target in replacements.items(): if not source: continue if re.fullmatch(r"[\wÀ-ÿ-]+", source, flags=re.UNICODE): token_replacements[source] = target else: phrase_replacements[source] = target for source, target in sorted(phrase_replacements.items(), key=lambda item: len(item[0]), reverse=True): cleaned = cleaned.replace(source, target) for source, target in sorted(token_replacements.items(), key=lambda item: len(item[0]), reverse=True): pattern = re.compile(rf"(? str: cleaned = text.strip() for pattern, replacement in self.cleanup_patterns: match = pattern.match(cleaned) if not match: continue cleaned = replacement.format(**match.groupdict()).strip() cleaned = self._apply_replacements(cleaned, self.vocabulary_map) cleaned = self._apply_replacements(cleaned, self._contextual_replacements(field_path)) cleaned = self.post_cleanup_text(cleaned, field_path=field_path) return re.sub(r"\s+", " ", cleaned).strip() def normalize_cta(self, text: str, field_path: str = "") -> str: normalized = self.cleanup_text(text, field_path=field_path) lowered = normalized.lower() for keyword, replacement in self.cta_defaults.items(): if keyword in lowered: return replacement return normalized def rewrite(self, text: str, field_path: str = "", issues: list[Any] | None = None) -> str: cleaned = self.cleanup_text(text, field_path=field_path) lowered_path = field_path.lower() if any(token in lowered_path for token in ("cta", "button", "link_text", "submit")): cleaned = self.normalize_cta(cleaned, field_path=field_path) elif issues and any( issue.issue_type in {"generic_badge_label", "foreign_ui_label", "weak_marketing_copy", "mixed_locale_heading"} for issue in issues ): cleaned = self.cleanup_text(cleaned, field_path=field_path) if self.backend: rewritten = self.backend( locale=self.locale, field_path=field_path, text=cleaned, prompt=self.backend_prompt(field_path, cleaned), ) if isinstance(rewritten, str) and rewritten.strip(): cleaned = rewritten.strip() return cleaned def process_block(self, block_data: Any, field_path: str = "", issue_map: dict[str, list[Any]] | None = None): issue_map = issue_map or {} if isinstance(block_data, dict): changed = False output = {} for key, value in block_data.items(): child_path = f"{field_path}.{key}" if field_path else str(key) new_value, child_changed = self.process_block(value, child_path, issue_map) output[key] = new_value changed = changed or child_changed return output, changed if isinstance(block_data, list): changed = False output = [] for index, value in enumerate(block_data): child_path = f"{field_path}[{index}]" new_value, child_changed = self.process_block(value, child_path, issue_map) output.append(new_value) changed = changed or child_changed return output, changed if isinstance(block_data, str): issues = issue_map.get(field_path, []) needs_rewrite = bool(issues) or any( token in field_path for token in ("cta", "button", "label", "placeholder", "help_text") ) if not needs_rewrite: cleaned = self.cleanup_text(block_data) return cleaned, cleaned != block_data rewritten = self.rewrite(block_data, field_path=field_path, issues=issues) return rewritten, rewritten != block_data return block_data, False def build_issue_map(self, issues: list[Any]) -> dict[str, list[Any]]: issue_map: dict[str, list[Any]] = defaultdict(list) for issue in issues: if issue.field_path: issue_map[issue.field_path].append(issue) return issue_map