Files
beets-setup/scripts/unknown/scrape-prev.py
2026-05-12 12:27:32 -04:00

362 lines
9.3 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import csv
import json
import re
import shutil
import subprocess
import sys
from collections import Counter
from pathlib import Path
try:
import magic # python-magic
except ImportError:
magic = None
ALBUM_DIR_RE = re.compile(r"^(?P<album>.+?)\s*-\s*\[(?P<year>\d{4})\]\s*$", re.IGNORECASE)
IMAGE_MIME_MAP = {
"image/jpeg": "JPEG",
"image/jpg": "JPG",
"image/png": "PNG",
"image/webp": "WEBP",
"image/gif": "GIF",
"image/bmp": "BMP",
"image/tiff": "TIFF",
"image/heic": "HEIC",
"image/heif": "HEIF",
"image/avif": "AVIF",
}
def is_hidden(path: Path) -> bool:
return any(part.startswith(".") for part in path.parts)
def build_magic():
if magic is None:
return None
try:
return magic.Magic(mime=True)
except Exception:
return None
def detect_mime(path: Path, magic_mime) -> str | None:
if magic_mime is not None:
try:
return magic_mime.from_file(str(path))
except Exception:
pass
file_exe = shutil.which("file")
if file_exe is None:
return None
try:
proc = subprocess.run(
[file_exe, "--mime-type", "-b", str(path)],
capture_output=True,
text=True,
check=True,
)
return proc.stdout.strip() or None
except Exception:
return None
def detect_image_format(path: Path, magic_mime) -> str | None:
mime = detect_mime(path, magic_mime)
return IMAGE_MIME_MAP.get(mime)
def parse_album_dir_name(dirname: str) -> tuple[str, str]:
m = ALBUM_DIR_RE.match(dirname)
if not m:
return dirname, ""
return m.group("album").strip(), m.group("year").strip()
def ffprobe_json(path: Path) -> dict | None:
ffprobe = shutil.which("ffprobe")
if ffprobe is None:
raise RuntimeError("ffprobe not found in PATH")
cmd = [
ffprobe,
"-v", "error",
"-print_format", "json",
"-show_format",
"-show_streams",
str(path),
]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, check=True)
return json.loads(proc.stdout)
except Exception:
return None
def normalize_format(probe: dict, audio_stream: dict) -> str:
fmt = probe.get("format", {})
format_name = (fmt.get("format_name") or "").lower()
format_long_name = (fmt.get("format_long_name") or "").strip()
tags = fmt.get("tags") or {}
major_brand = (tags.get("major_brand") or "").strip().upper()
names = {x.strip() for x in format_name.split(",") if x.strip()}
if "flac" in names:
return "FLAC"
if "mp3" in names:
return "MP3"
if "wav" in names or "wave" in names:
return "WAV"
if "aiff" in names:
return "AIFF"
if "ogg" in names:
return "OGG"
if "dsf" in names:
return "DSF"
if "wavpack" in names or "wv" in names:
return "WAVPACK"
if "ape" in names:
return "APE"
if "tta" in names:
return "TTA"
if "asf" in names:
return "ASF"
if "caf" in names:
return "CAF"
if "au" in names:
return "AU"
if "amr" in names:
return "AMR"
if "matroska" in names:
return "MATROSKA"
if "webm" in names:
return "WEBM"
# MP4-family container. Distinguish M4A when possible from container metadata.
if {"mov", "mp4", "m4a", "3gp", "3g2", "mj2"} & names:
if major_brand.startswith("M4A") or major_brand.startswith("M4B"):
return "M4A"
return "MP4"
# Fallback: use first recognizable ffprobe format token.
if names:
return "/".join(sorted(x.upper() for x in names))
if format_long_name:
return format_long_name.upper().replace(" ", "_")
# Final fallback: codec-derived guess
codec_name = (audio_stream.get("codec_name") or "").lower()
if codec_name:
return codec_name.upper()
return "UNKNOWN"
def normalize_codec(probe: dict, audio_stream: dict, format_label: str) -> str:
codec_name = (audio_stream.get("codec_name") or "").lower()
if not codec_name:
return format_label
codec_map = {
"flac": "FLAC",
"mp3": "MP3",
"aac": "AAC",
"alac": "ALAC",
"opus": "OPUS",
"vorbis": "VORBIS",
"ac3": "AC3",
"eac3": "EAC3",
"dts": "DTS",
"ape": "APE",
"wavpack": "WAVPACK",
"tta": "TTA",
"wmav1": "WMA1",
"wmav2": "WMA2",
"wmapro": "WMA_PRO",
"wmalossless": "WMA_LOSSLESS",
"atrac3": "ATRAC3",
"atrac3p": "ATRAC3P",
"tta1": "TTA",
"dsd_lsbf": "DSD",
"dsd_msbf": "DSD",
"dsd_lsbf_planar": "DSD",
"dsd_msbf_planar": "DSD",
}
if codec_name in codec_map:
return codec_map[codec_name]
if codec_name.startswith("pcm_"):
return codec_name.upper()
if codec_name.startswith("adpcm_"):
return codec_name.upper()
return codec_name.upper()
def probe_audio_file(path: Path) -> tuple[str, str] | None:
probe = ffprobe_json(path)
if not probe:
return None
streams = probe.get("streams") or []
audio_streams = [s for s in streams if s.get("codec_type") == "audio"]
if not audio_streams:
return None
# Use the first audio stream as the primary stream.
audio_stream = audio_streams[0]
format_label = normalize_format(probe, audio_stream)
codec_label = normalize_codec(probe, audio_stream, format_label)
return format_label, codec_label
def choose_cover_image_format(album_dir: Path, magic_mime) -> str | bool:
preferred_name_parts = ("cover", "folder", "front", "album", "art")
preferred_hits = []
any_hits = []
for path in album_dir.rglob("*"):
if not path.is_file() or is_hidden(path):
continue
img_fmt = detect_image_format(path, magic_mime)
if not img_fmt:
continue
any_hits.append(img_fmt)
stem_lower = path.stem.lower()
if any(part in stem_lower for part in preferred_name_parts):
preferred_hits.append(img_fmt)
hits = preferred_hits if preferred_hits else any_hits
if not hits:
return False
counts = Counter(hits)
return counts.most_common(1)[0][0]
def collect_album_audio_files(album_dir: Path) -> list[tuple[Path, str, str]]:
results = []
for path in album_dir.rglob("*"):
if not path.is_file() or is_hidden(path):
continue
probed = probe_audio_file(path)
if probed is None:
continue
file_format, codec = probed
results.append((path, file_format, codec))
return results
def summarize_values(values: list[str]) -> str:
unique = sorted(set(values))
if not unique:
return ""
if len(unique) == 1:
return unique[0]
return f"Mixed({', '.join(unique)})"
def iter_album_dirs(root: Path):
for artist_dir in sorted(root.iterdir(), key=lambda p: p.name.lower()):
if not artist_dir.is_dir() or is_hidden(artist_dir):
continue
for album_dir in sorted(artist_dir.iterdir(), key=lambda p: p.name.lower()):
if not album_dir.is_dir() or is_hidden(album_dir):
continue
yield artist_dir.name, album_dir
def write_csv(root: Path, output_csv: Path):
magic_mime = build_magic()
with output_csv.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
row = [
"Artist",
"Album",
"File Format",
"Codec",
"Cover Image",
"Track Count",
"Year",
]
print(row)
writer.writerow(row)
for artist, album_dir in iter_album_dirs(root):
album_name, year = parse_album_dir_name(album_dir.name)
audio_files = collect_album_audio_files(album_dir)
if not audio_files:
continue
formats = [file_format for _, file_format, _ in audio_files]
codecs = [codec for _, _, codec in audio_files]
album_format = summarize_values(formats)
album_codec = summarize_values(codecs)
cover_image = choose_cover_image_format(album_dir, magic_mime)
track_count = len(audio_files)
row = [
artist,
album_name,
album_format,
album_codec,
cover_image if cover_image else "FALSE",
track_count,
year,
]
print(row)
writer.writerow(row)
def main():
parser = argparse.ArgumentParser(
description="Walk a music library and generate an album-level CSV summary."
)
parser.add_argument("root", help="Root of the music library")
parser.add_argument("output_csv", help="Output CSV path")
args = parser.parse_args()
root = Path(args.root).expanduser().resolve()
output_csv = Path(args.output_csv).expanduser().resolve()
if not root.is_dir():
print(f"ERROR: Not a directory: {root}", file=sys.stderr)
sys.exit(1)
if shutil.which("ffprobe") is None:
print("ERROR: ffprobe is required and was not found in PATH.", file=sys.stderr)
sys.exit(1)
write_csv(root, output_csv)
print(f"Wrote CSV to: {output_csv}")
if __name__ == "__main__":
main()