139 lines
3.7 KiB
Python
Executable File
139 lines
3.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Manage the various mirrors for the mirror website.
|
|
"""
|
|
|
|
import json
|
|
import subprocess
|
|
import datetime as dt
|
|
from pathlib import Path
|
|
|
|
BASE = Path("/srv/www")
|
|
DATA_FILE = BASE / "data" / "mirrors.json"
|
|
MIRROR_ROOT = BASE / "mirrors"
|
|
LOG_ROOT = BASE / "logs"
|
|
|
|
MIRROR_ROOT.mkdir(parents=True, exist_ok=True)
|
|
LOG_ROOT.mkdir(parents=True, exist_ok=True)
|
|
DATA_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
|
|
|
|
|
def load_mirrors() -> list[dict]:
|
|
if not DATA_FILE.exists():
|
|
return []
|
|
with DATA_FILE.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_mirrors(mirrors: list[dict]) -> None:
|
|
tmp = DATA_FILE.with_suffix(".tmp")
|
|
with tmp.open("w", encoding="utf-8") as f:
|
|
json.dump(mirrors, f, indent=2)
|
|
tmp.replace(DATA_FILE)
|
|
|
|
|
|
def get_mirror(mirrors: list[dict], slug: str) -> dict | None:
|
|
for m in mirrors:
|
|
if m["slug"] == slug:
|
|
return m
|
|
return None
|
|
|
|
|
|
def add_mirror(slug: str,
|
|
category: str,
|
|
url: str,
|
|
ignore_robots: bool = False) -> dict:
|
|
mirrors = load_mirrors()
|
|
if get_mirror(mirrors, slug) is not None:
|
|
raise ValueError(f"Mirror with slug '{slug}' already exists!")
|
|
|
|
m = {
|
|
"slug": slug,
|
|
"category": category,
|
|
"url": url,
|
|
"ignore_robots": bool(ignore_robots),
|
|
"created_at": _now_iso(),
|
|
"last_updated": None,
|
|
"status": "queued",
|
|
"last_error": None,
|
|
}
|
|
|
|
mirrors.append(m)
|
|
save_mirrors(mirrors)
|
|
return m
|
|
|
|
|
|
def update_mirror(slug: str) -> None:
|
|
"""Run wget mirror for a singel slug (blocking)."""
|
|
mirrors = load_mirrors()
|
|
m = get_mirror(mirrors, slug)
|
|
if m is None:
|
|
raise ValueError(f"No such mirror: {slug}")
|
|
|
|
m["status"] = "updating"
|
|
m["last_error"] = None
|
|
save_mirrors(mirrors)
|
|
|
|
target_dir = MIRROR_ROOT / slug
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
log_file = LOG_ROOT / f"{slug}.log"
|
|
|
|
robots_setting = "off" if m.get("ignore_robots") else "on"
|
|
|
|
# Polite wget:
|
|
# --mirror implies -r -N -l inf --no-remove-listing
|
|
cmd = [
|
|
"wget",
|
|
"--mirror",
|
|
"--convert-links",
|
|
"--adjust-extension",
|
|
"--page-requisites",
|
|
"--no-parent",
|
|
"--wait=0.70",
|
|
"--random-wait",
|
|
# "--limit-rate=50m",
|
|
f"execute=robots={robots_setting}",
|
|
"-P",
|
|
str(target_dir),
|
|
m["url"],
|
|
]
|
|
|
|
try:
|
|
with log_file.open("a", encoding="utf-8") as lf:
|
|
lf.write(f"\n=== {_now_iso()} : "
|
|
f"Starting mirror of {m['url']} ===\n")
|
|
lf.flush()
|
|
subprocess.run(
|
|
cmd,
|
|
stdout=lf,
|
|
stderr=subprocess.STDOUT,
|
|
check=True,
|
|
)
|
|
lf.write(f"=== {_now_iso()} : Completed mirror of {m['url']} ===\n")
|
|
lf.flush()
|
|
m["last_updated"] = _now_iso()
|
|
m["status"] = "idle"
|
|
m["last_error"] = None
|
|
except subprocess.CalledProcessError as e:
|
|
m["status"] = "error"
|
|
m["last_error"] = f"wget exited with {e.returncode}"
|
|
with log_file.open("a", encoding="utf-8") as lf:
|
|
lf.write(f"*** ERROR: wget failed with code {e.returncode}\n")
|
|
except Exception as e:
|
|
m["status"] = "error"
|
|
m["last_error"] = f"{type(e).__name__}: {e}"
|
|
with log_file.open("a", encoding="utf-8") as lf:
|
|
lf.write(f"*** ERROR: {type(e).__name__}: {e}\n")
|
|
finally:
|
|
save_mirrors(mirrors)
|
|
|
|
|
|
def update_all_mirrors() -> None:
|
|
mirrors = load_mirrors()
|
|
for m in mirrors:
|
|
update_mirror(m["slug"])
|