134 lines
4.2 KiB
Python
134 lines
4.2 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.db import transaction
|
|
|
|
from wagtail.blocks import StreamValue
|
|
from wagtail.models import Locale, Page
|
|
|
|
CONTACT_SLUGS = {
|
|
"nl": "contact",
|
|
"en": "contact",
|
|
"de": "kontakt",
|
|
"fr": "contact",
|
|
"es": "contacto",
|
|
"it": "contatto",
|
|
"pt": "contato",
|
|
"ru": "контакт",
|
|
}
|
|
|
|
CONTACT_ANCHOR = "#contact-form"
|
|
CTA_BLOCK_TYPES = {"saas_hero_banner", "saas_cta_footer"}
|
|
|
|
|
|
def _with_contact_anchor(url: str | None) -> str | None:
|
|
if not url:
|
|
return url
|
|
if url.endswith(CONTACT_ANCHOR):
|
|
return url
|
|
return f"{url.split('#', 1)[0]}{CONTACT_ANCHOR}"
|
|
|
|
|
|
def _localized_page_url(source: Page, locale: Locale) -> str | None:
|
|
translated = (
|
|
Page.objects.filter(translation_key=source.translation_key, locale=locale)
|
|
.specific()
|
|
.first()
|
|
)
|
|
chosen = translated or source
|
|
return getattr(chosen, "url", None)
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Anchor contact page CTA buttons to the contact form across locales"
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
"--page-id",
|
|
type=int,
|
|
help="Optional Wagtail page id to fix.",
|
|
)
|
|
parser.add_argument(
|
|
"--apply",
|
|
action="store_true",
|
|
help="Persist and publish changes (default is dry-run).",
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
apply_changes = options["apply"]
|
|
page_id = options.get("page_id")
|
|
|
|
with transaction.atomic():
|
|
if page_id:
|
|
page = Page.objects.filter(id=page_id).specific().first()
|
|
if page is None:
|
|
raise CommandError(f"Page id={page_id} not found")
|
|
self._fix_page(page, apply_changes=apply_changes)
|
|
else:
|
|
any_found = False
|
|
for page in self._iter_contact_pages():
|
|
any_found = True
|
|
self._fix_page(page, apply_changes=apply_changes)
|
|
if not any_found:
|
|
raise CommandError("Could not find any localized contact pages")
|
|
|
|
if not apply_changes:
|
|
raise CommandError(
|
|
"Dry-run complete. Re-run with --apply to persist changes."
|
|
)
|
|
|
|
def _iter_contact_pages(self):
|
|
yielded = False
|
|
for code, slug in CONTACT_SLUGS.items():
|
|
locale = Locale.objects.filter(language_code=code).first()
|
|
if locale is None:
|
|
self.stdout.write(f"SKIP {code}: locale not found")
|
|
continue
|
|
pages = list(Page.objects.filter(locale=locale, slug=slug).specific())
|
|
if not pages:
|
|
self.stdout.write(f"SKIP {code}: no contact page for slug={slug}")
|
|
continue
|
|
for page in pages:
|
|
yielded = True
|
|
yield page
|
|
if not yielded:
|
|
return
|
|
|
|
def _fix_page(self, page: Page, *, apply_changes: bool) -> None:
|
|
specific = page.specific
|
|
code = page.locale.language_code
|
|
|
|
if not hasattr(specific, "body"):
|
|
self.stdout.write(f"SKIP {code}: no body streamfield")
|
|
return
|
|
|
|
localized_contact_url = _with_contact_anchor(
|
|
_localized_page_url(page, page.locale)
|
|
)
|
|
body = specific.body
|
|
raw_data = list(body.raw_data)
|
|
changed = False
|
|
|
|
for block in raw_data:
|
|
if block.get("type") not in CTA_BLOCK_TYPES:
|
|
continue
|
|
value = block.get("value")
|
|
if not isinstance(value, dict):
|
|
continue
|
|
if value.get("primary_cta_url") != localized_contact_url:
|
|
value["primary_cta_url"] = localized_contact_url
|
|
block["value"] = value
|
|
changed = True
|
|
|
|
if not changed:
|
|
self.stdout.write(f"OK {code}: contact CTA already anchored")
|
|
return
|
|
|
|
self.stdout.write(f"CHG {code}: anchored contact CTA buttons")
|
|
specific.body = StreamValue(body.stream_block, raw_data, is_lazy=True)
|
|
if apply_changes:
|
|
rev = specific.save_revision()
|
|
rev.publish()
|