Files
beets-setup/scripts/unknown/scrape.py
2026-05-12 12:27:32 -04:00

732 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import csv
import io
import json
import re
import shutil
import subprocess
import sys
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from statistics import mean
from typing import Optional
try:
import magic # python-magic
except ImportError:
magic = None
try:
from tqdm import tqdm
except ImportError:
print("ERROR: tqdm is required. Install it with: pip install tqdm", file=sys.stderr)
sys.exit(1)
ALBUM_DIR_RE = re.compile(r"^(?P<album>.+?)\s*-\s*\[(?P<year>\d{4})\]\s*$", re.IGNORECASE)
TRACK_FILE_RE = re.compile(r"^(?P<num>\d+)\.\s*(?P<title>.*)$")
IMAGE_MIME_MAP = {
"image/jpeg": "JPEG",
"image/jpg": "JPG",
"image/png": "PNG",
"image/webp": "WEBP",
"image/gif": "GIF",
"image/bmp": "BMP",
"image/tiff": "TIFF",
"image/heic": "HEIC",
"image/heif": "HEIF",
"image/avif": "AVIF",
}
LOSSLESS_CODECS = {
"FLAC",
"ALAC",
"APE",
"TTA",
"DSD",
"WMA_LOSSLESS",
"WAVPACK", # usually lossless; ffprobe does not expose mode cleanly
}
@dataclass(frozen=True)
class AlbumKey:
artist: str
album: str
year: str
album_dir: Path
@dataclass
class TrackInfo:
artist: str
album: str
year: str
album_dir: Path
relative_path: str
file_name: str
track_number: str
track_number_sort: int
track_title: str
file_format: str
codec: str
cover_image: str
bit_depth: Optional[int]
sample_rate_hz: Optional[int]
bit_rate_kbps: Optional[float]
channels: Optional[int]
channel_layout: str
sample_format: str
duration_seconds: Optional[float]
lossless: str
def is_hidden(path: Path) -> bool:
return any(part.startswith(".") for part in path.parts)
def parse_album_dir_name(dirname: str) -> tuple[str, str]:
m = ALBUM_DIR_RE.match(dirname)
if not m:
return dirname, ""
return m.group("album").strip(), m.group("year").strip()
def parse_track_name(file_name: str) -> tuple[str, int, str]:
stem = Path(file_name).stem
m = TRACK_FILE_RE.match(stem)
if not m:
return "", 10**9, stem
num = m.group("num")
title = m.group("title").strip() or stem
return num, int(num), title
def safe_int(value) -> Optional[int]:
if value in (None, "", "N/A"):
return None
try:
return int(value)
except Exception:
return None
def safe_float(value) -> Optional[float]:
if value in (None, "", "N/A"):
return None
try:
return float(value)
except Exception:
return None
def fmt_num(value: Optional[float], digits: int = 2) -> str:
if value is None:
return ""
if digits == 0:
return str(int(round(value)))
return f"{value:.{digits}f}"
def fmt_range_int(values: list[int]) -> str:
if not values:
return ""
lo = min(values)
hi = max(values)
return str(lo) if lo == hi else f"{lo}-{hi}"
def fmt_range_float(values: list[float], digits: int = 2) -> str:
if not values:
return ""
lo = min(values)
hi = max(values)
if abs(lo - hi) < 1e-9:
return fmt_num(lo, digits)
return f"{fmt_num(lo, digits)}-{fmt_num(hi, digits)}"
def summarize_text(values: list[str]) -> str:
clean = sorted({v for v in values if v not in ("", None)})
if not clean:
return ""
if len(clean) == 1:
return clean[0]
return f"Mixed({', '.join(clean)})"
def serialize_csv_row(row: list[str]) -> str:
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(row)
return buf.getvalue().rstrip("\r\n")
def build_magic():
if magic is None:
return None
try:
return magic.Magic(mime=True)
except Exception:
return None
def detect_mime(path: Path, magic_mime) -> Optional[str]:
if magic_mime is not None:
try:
return magic_mime.from_file(str(path))
except Exception:
pass
file_exe = shutil.which("file")
if file_exe is None:
return None
try:
proc = subprocess.run(
[file_exe, "--mime-type", "-b", str(path)],
capture_output=True,
text=True,
check=True,
)
out = proc.stdout.strip()
return out or None
except Exception:
return None
def detect_image_format(path: Path, magic_mime) -> Optional[str]:
mime = detect_mime(path, magic_mime)
return IMAGE_MIME_MAP.get(mime)
def ffprobe_json(path: Path) -> Optional[dict]:
ffprobe = shutil.which("ffprobe")
if ffprobe is None:
raise RuntimeError("ffprobe not found in PATH")
cmd = [
ffprobe,
"-v", "error",
"-print_format", "json",
"-show_format",
"-show_streams",
str(path),
]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, check=True)
return json.loads(proc.stdout)
except Exception:
return None
def extract_primary_audio_stream(probe: dict) -> Optional[dict]:
streams = probe.get("streams") or []
for stream in streams:
if stream.get("codec_type") == "audio":
return stream
return None
def has_video_stream(probe: dict) -> bool:
streams = probe.get("streams") or []
return any(s.get("codec_type") == "video" for s in streams)
def normalize_format(probe: dict, audio_stream: dict) -> str:
fmt = probe.get("format") or {}
format_name = (fmt.get("format_name") or "").lower()
format_long_name = (fmt.get("format_long_name") or "").strip()
names = {x.strip() for x in format_name.split(",") if x.strip()}
major_brand = ((fmt.get("tags") or {}).get("major_brand") or "").upper()
if "flac" in names:
return "FLAC"
if "mp3" in names:
return "MP3"
if "wav" in names or "wave" in names:
return "WAV"
if "aiff" in names:
return "AIFF"
if "ogg" in names:
return "OGG"
if "dsf" in names:
return "DSF"
if "wavpack" in names or "wv" in names:
return "WAVPACK"
if "ape" in names:
return "APE"
if "tta" in names:
return "TTA"
if "asf" in names:
return "ASF"
if "caf" in names:
return "CAF"
if "au" in names:
return "AU"
if "amr" in names:
return "AMR"
if "matroska" in names:
return "MATROSKA" if has_video_stream(probe) else "MKA"
if "webm" in names:
return "WEBM"
if {"mov", "mp4", "m4a", "3gp", "3g2", "mj2"} & names:
if major_brand.startswith("M4A") or major_brand.startswith("M4B"):
return "M4A"
return "MP4" if has_video_stream(probe) else "M4A"
codec_name = (audio_stream.get("codec_name") or "").lower()
if codec_name == "aac":
return "AAC"
if codec_name:
return codec_name.upper()
if names:
return "/".join(sorted(x.upper() for x in names))
if format_long_name:
return format_long_name.upper().replace(" ", "_")
return "UNKNOWN"
def normalize_codec(audio_stream: dict, format_label: str) -> str:
codec_name = (audio_stream.get("codec_name") or "").lower()
codec_map = {
"flac": "FLAC",
"mp3": "MP3",
"aac": "AAC",
"alac": "ALAC",
"opus": "OPUS",
"vorbis": "VORBIS",
"ac3": "AC3",
"eac3": "EAC3",
"dts": "DTS",
"ape": "APE",
"wavpack": "WAVPACK",
"tta": "TTA",
"wmav1": "WMA1",
"wmav2": "WMA2",
"wmapro": "WMA_PRO",
"wmalossless": "WMA_LOSSLESS",
"atrac3": "ATRAC3",
"atrac3p": "ATRAC3P",
"dsd_lsbf": "DSD",
"dsd_msbf": "DSD",
"dsd_lsbf_planar": "DSD",
"dsd_msbf_planar": "DSD",
}
if codec_name in codec_map:
return codec_map[codec_name]
if codec_name.startswith("pcm_"):
return codec_name.upper()
if codec_name.startswith("adpcm_"):
return codec_name.upper()
return codec_name.upper() if codec_name else format_label
def normalize_sample_format(audio_stream: dict) -> str:
sample_fmt = (audio_stream.get("sample_fmt") or "").strip()
return sample_fmt.upper() if sample_fmt else ""
def infer_bit_depth_from_sample_fmt(sample_fmt: str) -> Optional[int]:
sf = sample_fmt.lower()
m = re.search(r"([su])(\d+)", sf)
if m:
try:
return int(m.group(2))
except Exception:
return None
if sf.startswith("flt"):
return 32
if sf.startswith("dbl"):
return 64
return None
def extract_bit_depth(audio_stream: dict) -> Optional[int]:
raw = safe_int(audio_stream.get("bits_per_raw_sample"))
if raw and raw > 0:
return raw
bps = safe_int(audio_stream.get("bits_per_sample"))
if bps and bps > 0:
return bps
return infer_bit_depth_from_sample_fmt(audio_stream.get("sample_fmt") or "")
def infer_lossless(codec: str) -> str:
cu = codec.upper()
if cu.startswith("PCM_"):
return "TRUE"
if cu in LOSSLESS_CODECS or "LOSSLESS" in cu:
return "TRUE"
if cu in {
"MP3",
"AAC",
"OPUS",
"VORBIS",
"AC3",
"EAC3",
"DTS",
"WMA1",
"WMA2",
"WMA_PRO",
"ATRAC3",
"ATRAC3P",
}:
return "FALSE"
return "UNKNOWN"
def probe_audio_file(path: Path) -> Optional[dict]:
probe = ffprobe_json(path)
if not probe:
return None
audio_stream = extract_primary_audio_stream(probe)
if not audio_stream:
return None
fmt = probe.get("format") or {}
file_format = normalize_format(probe, audio_stream)
codec = normalize_codec(audio_stream, file_format)
bit_depth = extract_bit_depth(audio_stream)
sample_rate = safe_int(audio_stream.get("sample_rate"))
channels = safe_int(audio_stream.get("channels"))
channel_layout = (audio_stream.get("channel_layout") or "").strip()
sample_format = normalize_sample_format(audio_stream)
stream_bitrate = safe_float(audio_stream.get("bit_rate"))
format_bitrate = safe_float(fmt.get("bit_rate"))
bit_rate = stream_bitrate if stream_bitrate is not None else format_bitrate
bit_rate_kbps = (bit_rate / 1000.0) if bit_rate is not None else None
duration = safe_float(audio_stream.get("duration"))
if duration is None:
duration = safe_float(fmt.get("duration"))
lossless = infer_lossless(codec)
return {
"file_format": file_format,
"codec": codec,
"bit_depth": bit_depth,
"sample_rate_hz": sample_rate,
"bit_rate_kbps": bit_rate_kbps,
"channels": channels,
"channel_layout": channel_layout,
"sample_format": sample_format,
"duration_seconds": duration,
"lossless": lossless,
}
def choose_cover_image_format(hits: list[tuple[bool, str]]) -> str:
if not hits:
return "FALSE"
preferred = [fmt for is_pref, fmt in hits if is_pref]
usable = preferred if preferred else [fmt for _, fmt in hits]
counts = Counter(usable)
return counts.most_common(1)[0][0]
def collect_files(root: Path) -> list[tuple[AlbumKey, Path]]:
items: list[tuple[AlbumKey, Path]] = []
for artist_dir in sorted(root.iterdir(), key=lambda p: p.name.lower()):
if not artist_dir.is_dir() or is_hidden(artist_dir):
continue
for album_dir in sorted(artist_dir.iterdir(), key=lambda p: p.name.lower()):
if not album_dir.is_dir() or is_hidden(album_dir):
continue
album_name, year = parse_album_dir_name(album_dir.name)
album_key = AlbumKey(
artist=artist_dir.name,
album=album_name,
year=year,
album_dir=album_dir,
)
for path in sorted(album_dir.rglob("*"), key=lambda p: str(p).lower()):
if path.is_file() and not is_hidden(path):
items.append((album_key, path))
return items
def scan_library(root: Path):
magic_mime = build_magic()
album_cover_hits: dict[AlbumKey, list[tuple[bool, str]]] = defaultdict(list)
tracks: list[TrackInfo] = []
all_files = collect_files(root)
total_files = len(all_files)
with tqdm(total=total_files, desc="Scanning", unit="file") as pbar:
for album_key, path in all_files:
probed = probe_audio_file(path)
if probed is not None:
rel_path = str(path.relative_to(root))
track_num, track_num_sort, track_title = parse_track_name(path.name)
tracks.append(
TrackInfo(
artist=album_key.artist,
album=album_key.album,
year=album_key.year,
album_dir=album_key.album_dir,
relative_path=rel_path,
file_name=path.name,
track_number=track_num,
track_number_sort=track_num_sort,
track_title=track_title,
file_format=probed["file_format"],
codec=probed["codec"],
cover_image="", # filled later
bit_depth=probed["bit_depth"],
sample_rate_hz=probed["sample_rate_hz"],
bit_rate_kbps=probed["bit_rate_kbps"],
channels=probed["channels"],
channel_layout=probed["channel_layout"],
sample_format=probed["sample_format"],
duration_seconds=probed["duration_seconds"],
lossless=probed["lossless"],
)
)
else:
img_fmt = detect_image_format(path, magic_mime)
if img_fmt:
stem_lower = path.stem.lower()
preferred = any(x in stem_lower for x in ("cover", "folder", "front", "album", "art"))
album_cover_hits[album_key].append((preferred, img_fmt))
pbar.update(1)
pbar.set_postfix(audio_tracks=len(tracks))
cover_map = {album_key: choose_cover_image_format(hits) for album_key, hits in album_cover_hits.items()}
# Any album with tracks but no image hit should still resolve to FALSE
for track in tracks:
key = AlbumKey(track.artist, track.album, track.year, track.album_dir)
cover = cover_map.get(key, "FALSE")
track.cover_image = cover
return tracks
def track_sort_key(track: TrackInfo):
return (
track.artist.lower(),
track.album.lower(),
track.track_number_sort,
track.file_name.lower(),
track.relative_path.lower(),
)
def build_track_rows(tracks: list[TrackInfo]) -> list[list[str]]:
rows = []
for t in sorted(tracks, key=track_sort_key):
rows.append([
t.artist,
t.album,
t.track_number,
t.track_title,
t.relative_path,
t.file_format,
t.codec,
t.cover_image,
str(t.bit_depth) if t.bit_depth is not None else "",
str(t.sample_rate_hz) if t.sample_rate_hz is not None else "",
fmt_num(t.bit_rate_kbps, 2),
str(t.channels) if t.channels is not None else "",
t.channel_layout,
t.sample_format,
fmt_num(t.duration_seconds, 3),
t.lossless,
t.year,
])
return rows
def build_album_rows(tracks: list[TrackInfo]) -> list[list[str]]:
albums: dict[AlbumKey, list[TrackInfo]] = defaultdict(list)
for track in tracks:
key = AlbumKey(track.artist, track.album, track.year, track.album_dir)
albums[key].append(track)
rows = []
for key in sorted(albums.keys(), key=lambda k: (k.artist.lower(), k.album.lower(), k.year)):
album_tracks = sorted(albums[key], key=track_sort_key)
bit_depths = [t.bit_depth for t in album_tracks if t.bit_depth is not None]
sample_rates = [t.sample_rate_hz for t in album_tracks if t.sample_rate_hz is not None]
bitrates = [t.bit_rate_kbps for t in album_tracks if t.bit_rate_kbps is not None]
durations = [t.duration_seconds for t in album_tracks if t.duration_seconds is not None]
rows.append([
key.artist,
key.album,
summarize_text([t.file_format for t in album_tracks]),
summarize_text([t.codec for t in album_tracks]),
album_tracks[0].cover_image if album_tracks else "FALSE",
str(len(album_tracks)),
key.year,
fmt_num(mean(bit_depths), 2) if bit_depths else "",
fmt_range_int(bit_depths),
fmt_num(mean(sample_rates), 2) if sample_rates else "",
fmt_range_int(sample_rates),
fmt_num(mean(bitrates), 2) if bitrates else "",
fmt_range_float(bitrates, 2),
summarize_text([str(t.channels) for t in album_tracks if t.channels is not None]),
summarize_text([t.channel_layout for t in album_tracks]),
summarize_text([t.sample_format for t in album_tracks]),
fmt_num(sum(durations), 3) if durations else "",
summarize_text([t.lossless for t in album_tracks]),
])
return rows
def write_csv_with_stdout(output_path: Path, header: list[str], rows: list[list[str]], desc: str, unit: str):
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(header)
tqdm.write(f"# writing {output_path}")
tqdm.write(serialize_csv_row(header))
with tqdm(total=len(rows), desc=desc, unit=unit) as pbar:
for row in rows:
writer.writerow(row)
tqdm.write(serialize_csv_row(row))
pbar.update(1)
def main():
parser = argparse.ArgumentParser(
description="Walk a music library and generate album-level and/or track-level CSV reports."
)
parser.add_argument("root", help="Root of the music library")
parser.add_argument(
"--mode",
choices=["album", "track", "both"],
default="album",
help="Which CSV(s) to emit",
)
parser.add_argument("--album-output", help="Album CSV output path")
parser.add_argument("--track-output", help="Track CSV output path")
args = parser.parse_args()
root = Path(args.root).expanduser().resolve()
if not root.is_dir():
print(f"ERROR: Not a directory: {root}", file=sys.stderr)
sys.exit(1)
if shutil.which("ffprobe") is None:
print("ERROR: ffprobe is required and was not found in PATH.", file=sys.stderr)
sys.exit(1)
if args.mode in ("album", "both") and not args.album_output:
print("ERROR: --album-output is required for --mode album/both", file=sys.stderr)
sys.exit(1)
if args.mode in ("track", "both") and not args.track_output:
print("ERROR: --track-output is required for --mode track/both", file=sys.stderr)
sys.exit(1)
tracks = scan_library(root)
if not tracks:
print("No audio tracks found.", file=sys.stderr)
sys.exit(0)
album_header = [
"Artist",
"Album",
"File Format",
"Codec",
"Cover Image",
"Track Count",
"Year",
"Avg Bit Depth",
"Bit Depth Range",
"Avg Sample Rate Hz",
"Sample Rate Range Hz",
"Avg Bit Rate Kbps",
"Bit Rate Range Kbps",
"Channels",
"Channel Layouts",
"Sample Formats",
"Total Duration Seconds",
"Lossless",
]
track_header = [
"Artist",
"Album",
"Track Number",
"Track Title",
"Relative Path",
"File Format",
"Codec",
"Cover Image",
"Bit Depth",
"Sample Rate Hz",
"Bit Rate Kbps",
"Channels",
"Channel Layout",
"Sample Format",
"Duration Seconds",
"Lossless",
"Year",
]
if args.mode in ("album", "both"):
album_rows = build_album_rows(tracks)
write_csv_with_stdout(
Path(args.album_output).expanduser().resolve(),
album_header,
album_rows,
desc="Writing album CSV",
unit="album",
)
if args.mode in ("track", "both"):
track_rows = build_track_rows(tracks)
write_csv_with_stdout(
Path(args.track_output).expanduser().resolve(),
track_header,
track_rows,
desc="Writing track CSV",
unit="track",
)
if __name__ == "__main__":
main()