Files
mirage/mirror_manager.py
2025-12-02 02:58:50 -05:00

191 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python3
import json
import subprocess
import datetime as dt
from pathlib import Path
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
BASE = Path("/srv/www")
DATA_FILE = BASE / "data" / "mirrors.json"
MIRROR_ROOT = BASE / "mirrors"
LOG_ROOT = BASE / "logs"
MIRROR_ROOT.mkdir(parents=True, exist_ok=True)
LOG_ROOT.mkdir(parents=True, exist_ok=True)
DATA_FILE.parent.mkdir(parents=True, exist_ok=True)
_LOCK = threading.Lock()
def _now_iso() -> str:
return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def load_mirrors() -> list[dict]:
with _LOCK:
if not DATA_FILE.exists():
return []
with DATA_FILE.open("r", encoding="utf-8") as f:
return json.load(f)
def save_mirrors(mirrors: list[dict]) -> None:
with _LOCK:
tmp = DATA_FILE.with_suffix(".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(mirrors, f, indent=2)
tmp.replace(DATA_FILE)
def get_mirror(mirrors: list[dict], slug: str) -> dict | None:
for m in mirrors:
if m["slug"] == slug:
return m
return None
def _normalise_categories(raw: str) -> list[str]:
# "tutorials, wgpu, rust" -> ["tutorials","wgpu","rust"]
parts = [p.strip() for p in raw.split(",")]
return [p for p in parts if p]
def add_mirror(slug: str,
categories: str,
url: str,
ignore_robots: bool = False) -> dict:
mirrors = load_mirrors()
if get_mirror(mirrors, slug) is not None:
raise ValueError(f"Mirror with slug '{slug}' already exists")
cats = _normalise_categories(categories)
if not cats:
raise ValueError("At least one category is required")
m = {
"slug": slug,
"categories": cats,
"url": url,
"ignore_robots": bool(ignore_robots),
"created_at": _now_iso(),
"last_updated": None,
"status": "queued", # idle | updating | queued | warning | error
"last_error": None,
}
mirrors.append(m)
save_mirrors(mirrors)
return m
def _set_status(slug: str, *,
status: str,
last_error: str | None = None,
last_updated: str | None = None):
mirrors = load_mirrors()
m = get_mirror(mirrors, slug)
if m is None:
return
m["status"] = status
if last_error is not None:
m["last_error"] = last_error
if last_updated is not None:
m["last_updated"] = last_updated
save_mirrors(mirrors)
def update_mirror(slug: str) -> None:
"""Run wget mirror for a single slug (blocking in this thread)."""
mirrors = load_mirrors()
m = get_mirror(mirrors, slug)
if m is None:
raise ValueError(f"No such mirror: {slug}")
_set_status(slug, status="updating", last_error=None)
target_dir = MIRROR_ROOT / slug
target_dir.mkdir(parents=True, exist_ok=True)
log_file = LOG_ROOT / f"{slug}.log"
robots_setting = "off" if m.get("ignore_robots") else "on"
cmd = [
"wget",
"--mirror", # recurse, keep timestamps
"--convert-links",
"--adjust-extension",
"--page-requisites",
"--no-parent",
"--wait=0.5",
"--random-wait",
"--limit-rate=50m",
"--tries=3",
"--retry-connrefused",
f"--execute=robots={robots_setting}",
"-P",
str(target_dir),
m["url"],
]
try:
with log_file.open("a", encoding="utf-8") as lf:
lf.write(f"\n=== {_now_iso()} : Starting mirror of {
m['url']} ===\n")
lf.flush()
proc = subprocess.run(
cmd,
stdout=lf,
stderr=subprocess.STDOUT,
)
lf.write(f"=== {_now_iso()} : wget exited with code {
proc.returncode} ===\n")
lf.flush()
# Classify result
if proc.returncode == 0:
_set_status(slug, status="idle",
last_updated=_now_iso(), last_error=None)
else:
# If we see FINISHED in the log and the directory has content,
# treat this as a partial/ok-with-warnings case.
text = log_file.read_text(encoding="utf-8", errors="ignore")
has_finished = "FINISHED --" in text
has_files = any(target_dir.rglob("*"))
if has_finished and has_files:
_set_status(
slug,
status="warning",
last_updated=_now_iso(),
last_error=f"wget exited with {
proc.returncode} (partial; see log)",
)
else:
_set_status(
slug,
status="error",
last_error=f"wget exited with {proc.returncode}",
)
except Exception as e:
_set_status(
slug,
status="error",
last_error=f"{type(e).__name__}: {e}",
)
def update_all_mirrors(max_workers: int = 3) -> None:
mirrors = load_mirrors()
slugs = [m["slug"] for m in mirrors]
if not slugs:
return
# Run several in parallel
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(update_mirror, slug): slug for slug in slugs}
for fut in as_completed(futures):
slug = futures[fut]
try:
fut.result()
except Exception as e:
_set_status(slug, status="error", last_error=f"{
type(e).__name__}: {e}")