362 lines
9.3 KiB
Python
Executable File
362 lines
9.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import magic # python-magic
|
|
except ImportError:
|
|
magic = None
|
|
|
|
|
|
ALBUM_DIR_RE = re.compile(r"^(?P<album>.+?)\s*-\s*\[(?P<year>\d{4})\]\s*$", re.IGNORECASE)
|
|
|
|
IMAGE_MIME_MAP = {
|
|
"image/jpeg": "JPEG",
|
|
"image/jpg": "JPG",
|
|
"image/png": "PNG",
|
|
"image/webp": "WEBP",
|
|
"image/gif": "GIF",
|
|
"image/bmp": "BMP",
|
|
"image/tiff": "TIFF",
|
|
"image/heic": "HEIC",
|
|
"image/heif": "HEIF",
|
|
"image/avif": "AVIF",
|
|
}
|
|
|
|
|
|
def is_hidden(path: Path) -> bool:
|
|
return any(part.startswith(".") for part in path.parts)
|
|
|
|
|
|
def build_magic():
|
|
if magic is None:
|
|
return None
|
|
try:
|
|
return magic.Magic(mime=True)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def detect_mime(path: Path, magic_mime) -> str | None:
|
|
if magic_mime is not None:
|
|
try:
|
|
return magic_mime.from_file(str(path))
|
|
except Exception:
|
|
pass
|
|
|
|
file_exe = shutil.which("file")
|
|
if file_exe is None:
|
|
return None
|
|
|
|
try:
|
|
proc = subprocess.run(
|
|
[file_exe, "--mime-type", "-b", str(path)],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return proc.stdout.strip() or None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def detect_image_format(path: Path, magic_mime) -> str | None:
|
|
mime = detect_mime(path, magic_mime)
|
|
return IMAGE_MIME_MAP.get(mime)
|
|
|
|
|
|
def parse_album_dir_name(dirname: str) -> tuple[str, str]:
|
|
m = ALBUM_DIR_RE.match(dirname)
|
|
if not m:
|
|
return dirname, ""
|
|
return m.group("album").strip(), m.group("year").strip()
|
|
|
|
|
|
def ffprobe_json(path: Path) -> dict | None:
|
|
ffprobe = shutil.which("ffprobe")
|
|
if ffprobe is None:
|
|
raise RuntimeError("ffprobe not found in PATH")
|
|
|
|
cmd = [
|
|
ffprobe,
|
|
"-v", "error",
|
|
"-print_format", "json",
|
|
"-show_format",
|
|
"-show_streams",
|
|
str(path),
|
|
]
|
|
|
|
try:
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
return json.loads(proc.stdout)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def normalize_format(probe: dict, audio_stream: dict) -> str:
|
|
fmt = probe.get("format", {})
|
|
format_name = (fmt.get("format_name") or "").lower()
|
|
format_long_name = (fmt.get("format_long_name") or "").strip()
|
|
tags = fmt.get("tags") or {}
|
|
major_brand = (tags.get("major_brand") or "").strip().upper()
|
|
|
|
names = {x.strip() for x in format_name.split(",") if x.strip()}
|
|
|
|
if "flac" in names:
|
|
return "FLAC"
|
|
if "mp3" in names:
|
|
return "MP3"
|
|
if "wav" in names or "wave" in names:
|
|
return "WAV"
|
|
if "aiff" in names:
|
|
return "AIFF"
|
|
if "ogg" in names:
|
|
return "OGG"
|
|
if "dsf" in names:
|
|
return "DSF"
|
|
if "wavpack" in names or "wv" in names:
|
|
return "WAVPACK"
|
|
if "ape" in names:
|
|
return "APE"
|
|
if "tta" in names:
|
|
return "TTA"
|
|
if "asf" in names:
|
|
return "ASF"
|
|
if "caf" in names:
|
|
return "CAF"
|
|
if "au" in names:
|
|
return "AU"
|
|
if "amr" in names:
|
|
return "AMR"
|
|
if "matroska" in names:
|
|
return "MATROSKA"
|
|
if "webm" in names:
|
|
return "WEBM"
|
|
|
|
# MP4-family container. Distinguish M4A when possible from container metadata.
|
|
if {"mov", "mp4", "m4a", "3gp", "3g2", "mj2"} & names:
|
|
if major_brand.startswith("M4A") or major_brand.startswith("M4B"):
|
|
return "M4A"
|
|
return "MP4"
|
|
|
|
# Fallback: use first recognizable ffprobe format token.
|
|
if names:
|
|
return "/".join(sorted(x.upper() for x in names))
|
|
|
|
if format_long_name:
|
|
return format_long_name.upper().replace(" ", "_")
|
|
|
|
# Final fallback: codec-derived guess
|
|
codec_name = (audio_stream.get("codec_name") or "").lower()
|
|
if codec_name:
|
|
return codec_name.upper()
|
|
|
|
return "UNKNOWN"
|
|
|
|
|
|
def normalize_codec(probe: dict, audio_stream: dict, format_label: str) -> str:
|
|
codec_name = (audio_stream.get("codec_name") or "").lower()
|
|
|
|
if not codec_name:
|
|
return format_label
|
|
|
|
codec_map = {
|
|
"flac": "FLAC",
|
|
"mp3": "MP3",
|
|
"aac": "AAC",
|
|
"alac": "ALAC",
|
|
"opus": "OPUS",
|
|
"vorbis": "VORBIS",
|
|
"ac3": "AC3",
|
|
"eac3": "EAC3",
|
|
"dts": "DTS",
|
|
"ape": "APE",
|
|
"wavpack": "WAVPACK",
|
|
"tta": "TTA",
|
|
"wmav1": "WMA1",
|
|
"wmav2": "WMA2",
|
|
"wmapro": "WMA_PRO",
|
|
"wmalossless": "WMA_LOSSLESS",
|
|
"atrac3": "ATRAC3",
|
|
"atrac3p": "ATRAC3P",
|
|
"tta1": "TTA",
|
|
"dsd_lsbf": "DSD",
|
|
"dsd_msbf": "DSD",
|
|
"dsd_lsbf_planar": "DSD",
|
|
"dsd_msbf_planar": "DSD",
|
|
}
|
|
|
|
if codec_name in codec_map:
|
|
return codec_map[codec_name]
|
|
|
|
if codec_name.startswith("pcm_"):
|
|
return codec_name.upper()
|
|
|
|
if codec_name.startswith("adpcm_"):
|
|
return codec_name.upper()
|
|
|
|
return codec_name.upper()
|
|
|
|
|
|
def probe_audio_file(path: Path) -> tuple[str, str] | None:
|
|
probe = ffprobe_json(path)
|
|
if not probe:
|
|
return None
|
|
|
|
streams = probe.get("streams") or []
|
|
audio_streams = [s for s in streams if s.get("codec_type") == "audio"]
|
|
if not audio_streams:
|
|
return None
|
|
|
|
# Use the first audio stream as the primary stream.
|
|
audio_stream = audio_streams[0]
|
|
format_label = normalize_format(probe, audio_stream)
|
|
codec_label = normalize_codec(probe, audio_stream, format_label)
|
|
return format_label, codec_label
|
|
|
|
|
|
def choose_cover_image_format(album_dir: Path, magic_mime) -> str | bool:
|
|
preferred_name_parts = ("cover", "folder", "front", "album", "art")
|
|
preferred_hits = []
|
|
any_hits = []
|
|
|
|
for path in album_dir.rglob("*"):
|
|
if not path.is_file() or is_hidden(path):
|
|
continue
|
|
|
|
img_fmt = detect_image_format(path, magic_mime)
|
|
if not img_fmt:
|
|
continue
|
|
|
|
any_hits.append(img_fmt)
|
|
stem_lower = path.stem.lower()
|
|
if any(part in stem_lower for part in preferred_name_parts):
|
|
preferred_hits.append(img_fmt)
|
|
|
|
hits = preferred_hits if preferred_hits else any_hits
|
|
if not hits:
|
|
return False
|
|
|
|
counts = Counter(hits)
|
|
return counts.most_common(1)[0][0]
|
|
|
|
|
|
def collect_album_audio_files(album_dir: Path) -> list[tuple[Path, str, str]]:
|
|
results = []
|
|
|
|
for path in album_dir.rglob("*"):
|
|
if not path.is_file() or is_hidden(path):
|
|
continue
|
|
|
|
probed = probe_audio_file(path)
|
|
if probed is None:
|
|
continue
|
|
|
|
file_format, codec = probed
|
|
results.append((path, file_format, codec))
|
|
|
|
return results
|
|
|
|
|
|
def summarize_values(values: list[str]) -> str:
|
|
unique = sorted(set(values))
|
|
if not unique:
|
|
return ""
|
|
if len(unique) == 1:
|
|
return unique[0]
|
|
return f"Mixed({', '.join(unique)})"
|
|
|
|
|
|
def iter_album_dirs(root: Path):
|
|
for artist_dir in sorted(root.iterdir(), key=lambda p: p.name.lower()):
|
|
if not artist_dir.is_dir() or is_hidden(artist_dir):
|
|
continue
|
|
|
|
for album_dir in sorted(artist_dir.iterdir(), key=lambda p: p.name.lower()):
|
|
if not album_dir.is_dir() or is_hidden(album_dir):
|
|
continue
|
|
|
|
yield artist_dir.name, album_dir
|
|
|
|
|
|
def write_csv(root: Path, output_csv: Path):
|
|
magic_mime = build_magic()
|
|
|
|
with output_csv.open("w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
row = [
|
|
"Artist",
|
|
"Album",
|
|
"File Format",
|
|
"Codec",
|
|
"Cover Image",
|
|
"Track Count",
|
|
"Year",
|
|
]
|
|
print(row)
|
|
writer.writerow(row)
|
|
|
|
for artist, album_dir in iter_album_dirs(root):
|
|
album_name, year = parse_album_dir_name(album_dir.name)
|
|
audio_files = collect_album_audio_files(album_dir)
|
|
|
|
if not audio_files:
|
|
continue
|
|
|
|
formats = [file_format for _, file_format, _ in audio_files]
|
|
codecs = [codec for _, _, codec in audio_files]
|
|
|
|
album_format = summarize_values(formats)
|
|
album_codec = summarize_values(codecs)
|
|
cover_image = choose_cover_image_format(album_dir, magic_mime)
|
|
track_count = len(audio_files)
|
|
|
|
row = [
|
|
artist,
|
|
album_name,
|
|
album_format,
|
|
album_codec,
|
|
cover_image if cover_image else "FALSE",
|
|
track_count,
|
|
year,
|
|
]
|
|
|
|
print(row)
|
|
writer.writerow(row)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Walk a music library and generate an album-level CSV summary."
|
|
)
|
|
parser.add_argument("root", help="Root of the music library")
|
|
parser.add_argument("output_csv", help="Output CSV path")
|
|
args = parser.parse_args()
|
|
|
|
root = Path(args.root).expanduser().resolve()
|
|
output_csv = Path(args.output_csv).expanduser().resolve()
|
|
|
|
if not root.is_dir():
|
|
print(f"ERROR: Not a directory: {root}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if shutil.which("ffprobe") is None:
|
|
print("ERROR: ffprobe is required and was not found in PATH.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
write_csv(root, output_csv)
|
|
print(f"Wrote CSV to: {output_csv}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|