307 lines
10 KiB
Python
307 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.db import transaction
|
|
|
|
from wagtail.models import Page
|
|
|
|
|
|
def _load_json_rows(path: Path) -> list[dict[str, Any]]:
|
|
try:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
except FileNotFoundError as exc:
|
|
raise CommandError(f"Input file not found: {path}") from exc
|
|
except json.JSONDecodeError as exc:
|
|
raise CommandError(f"Invalid JSON in {path}: {exc}") from exc
|
|
|
|
if not isinstance(payload, list):
|
|
raise CommandError(f"Expected a JSON list in {path}")
|
|
|
|
rows: list[dict[str, Any]] = []
|
|
for idx, row in enumerate(payload, start=1):
|
|
if not isinstance(row, dict):
|
|
raise CommandError(f"Row {idx} in {path} is not a JSON object")
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
def _is_safe_apply_row(row: dict[str, Any]) -> bool:
|
|
return bool(row.get("apply_now")) and row.get("recommended_action") == "apply_and_publish"
|
|
|
|
|
|
def _timestamp() -> str:
|
|
return datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
|
|
|
|
|
|
def _snapshot_path_for(input_path: Path) -> Path:
|
|
return input_path.with_name(
|
|
f"{input_path.stem}_preapply_snapshot_{_timestamp()}.json"
|
|
)
|
|
|
|
|
|
def _snapshot_entry(page: Page) -> dict[str, Any]:
|
|
specific = page.specific
|
|
return {
|
|
"page_id": specific.id,
|
|
"title": specific.title,
|
|
"slug": specific.slug,
|
|
"live": bool(specific.live),
|
|
"seo_title": getattr(specific, "seo_title", "") or "",
|
|
"search_description": getattr(specific, "search_description", "") or "",
|
|
"latest_revision_id": getattr(specific, "latest_revision_id", None),
|
|
"live_revision_id": getattr(specific, "live_revision_id", None),
|
|
}
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Apply or roll back priority SEO metadata updates from a JSON matrix"
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
"--input",
|
|
help="Path to the JSON apply matrix.",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Preview changes only (default when --apply is not set).",
|
|
)
|
|
parser.add_argument(
|
|
"--apply",
|
|
action="store_true",
|
|
help="Persist approved SEO metadata changes.",
|
|
)
|
|
parser.add_argument(
|
|
"--rollback-from",
|
|
help="Path to a previously exported snapshot JSON file.",
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
input_value = options.get("input")
|
|
rollback_value = options.get("rollback_from")
|
|
dry_run = bool(options.get("dry_run"))
|
|
apply_changes = bool(options.get("apply"))
|
|
|
|
if apply_changes and dry_run:
|
|
raise CommandError("Use either --dry-run or --apply, not both.")
|
|
|
|
if not apply_changes:
|
|
dry_run = True
|
|
|
|
if rollback_value and input_value:
|
|
raise CommandError("Use either --input or --rollback-from, not both.")
|
|
|
|
if rollback_value:
|
|
snapshot_rows = _load_json_rows(Path(rollback_value))
|
|
self._run_rollback(snapshot_rows, dry_run=dry_run)
|
|
return
|
|
|
|
if not input_value:
|
|
raise CommandError("The --input option is required unless --rollback-from is used.")
|
|
|
|
matrix_path = Path(input_value)
|
|
rows = _load_json_rows(matrix_path)
|
|
self._run_apply(rows, matrix_path=matrix_path, dry_run=dry_run)
|
|
|
|
def _load_page(self, page_id: int) -> Page:
|
|
page = Page.objects.filter(id=page_id).specific().first()
|
|
if page is None:
|
|
raise CommandError(f"Page id={page_id} not found")
|
|
return page
|
|
|
|
def _export_snapshot(
|
|
self, rows: list[dict[str, Any]], *, matrix_path: Path
|
|
) -> tuple[Path, list[dict[str, Any]]]:
|
|
snapshot_rows: list[dict[str, Any]] = []
|
|
seen: set[int] = set()
|
|
|
|
for row in rows:
|
|
if not _is_safe_apply_row(row):
|
|
continue
|
|
page_id = int(row["page_id"])
|
|
if page_id in seen:
|
|
continue
|
|
seen.add(page_id)
|
|
page = self._load_page(page_id)
|
|
snapshot_row = _snapshot_entry(page)
|
|
snapshot_row["url"] = row.get("url", "")
|
|
snapshot_rows.append(snapshot_row)
|
|
|
|
snapshot_path = _snapshot_path_for(matrix_path)
|
|
snapshot_path.write_text(
|
|
json.dumps(snapshot_rows, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
return snapshot_path, snapshot_rows
|
|
|
|
def _run_apply(
|
|
self, rows: list[dict[str, Any]], *, matrix_path: Path, dry_run: bool
|
|
) -> None:
|
|
applied = 0
|
|
skipped = 0
|
|
errors = 0
|
|
changed_ids: list[int] = []
|
|
snapshot_path: Path | None = None
|
|
snapshot_rows: list[dict[str, Any]] = []
|
|
|
|
if not dry_run:
|
|
snapshot_path, snapshot_rows = self._export_snapshot(rows, matrix_path=matrix_path)
|
|
|
|
def process() -> None:
|
|
nonlocal applied, skipped, errors
|
|
for row in rows:
|
|
page_id = int(row["page_id"])
|
|
if not _is_safe_apply_row(row):
|
|
skipped += 1
|
|
self.stdout.write(
|
|
f"SKIP page={page_id}: matrix action={row.get('recommended_action')}"
|
|
)
|
|
continue
|
|
|
|
try:
|
|
page = self._load_page(page_id)
|
|
except CommandError as exc:
|
|
errors += 1
|
|
self.stdout.write(f"ERR page={page_id}: {exc}")
|
|
continue
|
|
|
|
specific = page.specific
|
|
current_seo = getattr(specific, "seo_title", "") or ""
|
|
current_desc = getattr(specific, "search_description", "") or ""
|
|
target_seo = row.get("proposed_seo_title", "") or ""
|
|
target_desc = row.get("proposed_search_description", "") or ""
|
|
|
|
if current_seo == target_seo and current_desc == target_desc:
|
|
skipped += 1
|
|
self.stdout.write(
|
|
f"SKIP page={page_id}: metadata already matches target"
|
|
)
|
|
continue
|
|
|
|
changed_ids.append(page_id)
|
|
if dry_run:
|
|
applied += 1
|
|
self.stdout.write(
|
|
f"DRY page={page_id}: would update seo_title/search_description"
|
|
)
|
|
continue
|
|
|
|
specific.seo_title = target_seo
|
|
specific.search_description = target_desc
|
|
revision = specific.save_revision()
|
|
if row.get("should_be_published_immediately"):
|
|
revision.publish()
|
|
applied += 1
|
|
self.stdout.write(
|
|
f"APPLY page={page_id}: updated seo_title/search_description"
|
|
)
|
|
|
|
if dry_run:
|
|
process()
|
|
else:
|
|
with transaction.atomic():
|
|
process()
|
|
|
|
self._print_summary(
|
|
total_rows=len(rows),
|
|
applied=applied,
|
|
skipped=skipped,
|
|
errors=errors,
|
|
changed_ids=changed_ids,
|
|
snapshot_path=snapshot_path,
|
|
mode="dry-run" if dry_run else "apply",
|
|
)
|
|
|
|
def _run_rollback(self, snapshot_rows: list[dict[str, Any]], *, dry_run: bool) -> None:
|
|
applied = 0
|
|
skipped = 0
|
|
errors = 0
|
|
changed_ids: list[int] = []
|
|
|
|
def process() -> None:
|
|
nonlocal applied, skipped, errors
|
|
for row in snapshot_rows:
|
|
page_id = int(row["page_id"])
|
|
try:
|
|
page = self._load_page(page_id)
|
|
except CommandError as exc:
|
|
errors += 1
|
|
self.stdout.write(f"ERR page={page_id}: {exc}")
|
|
continue
|
|
|
|
specific = page.specific
|
|
target_seo = row.get("seo_title", "") or ""
|
|
target_desc = row.get("search_description", "") or ""
|
|
current_seo = getattr(specific, "seo_title", "") or ""
|
|
current_desc = getattr(specific, "search_description", "") or ""
|
|
|
|
if current_seo == target_seo and current_desc == target_desc:
|
|
skipped += 1
|
|
self.stdout.write(
|
|
f"SKIP page={page_id}: current metadata already matches snapshot"
|
|
)
|
|
continue
|
|
|
|
changed_ids.append(page_id)
|
|
if dry_run:
|
|
applied += 1
|
|
self.stdout.write(
|
|
f"DRY page={page_id}: would restore seo_title/search_description"
|
|
)
|
|
continue
|
|
|
|
specific.seo_title = target_seo
|
|
specific.search_description = target_desc
|
|
revision = specific.save_revision()
|
|
if row.get("live"):
|
|
revision.publish()
|
|
applied += 1
|
|
self.stdout.write(
|
|
f"ROLL page={page_id}: restored seo_title/search_description"
|
|
)
|
|
|
|
if dry_run:
|
|
process()
|
|
else:
|
|
with transaction.atomic():
|
|
process()
|
|
|
|
self._print_summary(
|
|
total_rows=len(snapshot_rows),
|
|
applied=applied,
|
|
skipped=skipped,
|
|
errors=errors,
|
|
changed_ids=changed_ids,
|
|
snapshot_path=None,
|
|
mode="rollback-dry-run" if dry_run else "rollback-apply",
|
|
)
|
|
|
|
def _print_summary(
|
|
self,
|
|
*,
|
|
total_rows: int,
|
|
applied: int,
|
|
skipped: int,
|
|
errors: int,
|
|
changed_ids: list[int],
|
|
snapshot_path: Path | None,
|
|
mode: str,
|
|
) -> None:
|
|
self.stdout.write("")
|
|
self.stdout.write("Summary")
|
|
self.stdout.write(f"mode: {mode}")
|
|
self.stdout.write(f"total rows: {total_rows}")
|
|
self.stdout.write(f"applied: {applied}")
|
|
self.stdout.write(f"skipped: {skipped}")
|
|
self.stdout.write(f"errors: {errors}")
|
|
self.stdout.write(
|
|
"page IDs changed: "
|
|
+ (", ".join(str(page_id) for page_id in changed_ids) if changed_ids else "-")
|
|
)
|
|
self.stdout.write(f"snapshot path: {snapshot_path if snapshot_path else '-'}")
|