938 lines
29 KiB
Python
Executable File
938 lines
29 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import base64
|
||
import csv
|
||
import difflib
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import unicodedata
|
||
import urllib.parse
|
||
import webbrowser
|
||
from dataclasses import dataclass
|
||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||
from pathlib import Path
|
||
from typing import Any, Iterable
|
||
|
||
import requests
|
||
from tqdm import tqdm
|
||
|
||
|
||
SPOTIFY_AUTH_URL = "https://accounts.spotify.com/authorize"
|
||
SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
|
||
SPOTIFY_SAVED_TRACKS_URL = "https://api.spotify.com/v1/me/tracks"
|
||
|
||
SPOTIFY_REDIRECT_URI = "http://127.0.0.1:8888/callback"
|
||
SPOTIFY_SCOPE = "user-library-read"
|
||
|
||
DEFAULT_OUTPUT_DIR = "spotify_beets_reports"
|
||
|
||
|
||
# -----------------------------
|
||
# Models
|
||
# -----------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class SpotifyTrack:
|
||
spotify_track_id: str
|
||
spotify_track_url: str
|
||
isrc: str
|
||
title: str
|
||
artists: str
|
||
primary_artist: str
|
||
album: str
|
||
album_artist: str
|
||
album_id: str
|
||
album_url: str
|
||
album_type: str
|
||
release_date: str
|
||
release_year: str
|
||
disc_number: int | None
|
||
track_number: int | None
|
||
duration_ms: int | None
|
||
total_tracks: int | None
|
||
added_at: str
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class BeetsTrack:
|
||
title: str
|
||
artist: str
|
||
album: str
|
||
albumartist: str
|
||
isrc: str
|
||
mb_trackid: str
|
||
mb_albumid: str
|
||
track: int | None
|
||
disc: int | None
|
||
length_seconds: float | None
|
||
year: str
|
||
path: str
|
||
format: str
|
||
|
||
|
||
@dataclass
|
||
class MatchResult:
|
||
spotify: SpotifyTrack
|
||
status: str
|
||
reason: str
|
||
beets: BeetsTrack | None = None
|
||
score: float | None = None
|
||
|
||
|
||
# -----------------------------
|
||
# Normalization
|
||
# -----------------------------
|
||
|
||
VERSION_NOISE_PATTERNS = [
|
||
r"\bremaster(?:ed)?(?:\s+\d{4})?\b",
|
||
r"\b\d{4}\s+remaster(?:ed)?\b",
|
||
r"\bdeluxe\b",
|
||
r"\bexpanded\b",
|
||
r"\banniversary\b",
|
||
r"\bspecial edition\b",
|
||
r"\bcollector'?s edition\b",
|
||
r"\bbonus track\b",
|
||
r"\bradio edit\b",
|
||
r"\bsingle version\b",
|
||
r"\balbum version\b",
|
||
r"\bexplicit\b",
|
||
r"\bclean\b",
|
||
r"\bmono\b",
|
||
r"\bstereo\b",
|
||
r"\boriginal mix\b",
|
||
r"\bfeat\.?\b",
|
||
r"\bfeaturing\b",
|
||
]
|
||
|
||
|
||
def normalize_text(value: Any) -> str:
|
||
if value is None:
|
||
return ""
|
||
|
||
s = str(value)
|
||
s = unicodedata.normalize("NFKD", s)
|
||
s = "".join(ch for ch in s if not unicodedata.combining(ch))
|
||
s = s.casefold()
|
||
|
||
s = s.replace("&", " and ")
|
||
s = s.replace("’", "'")
|
||
s = s.replace("“", '"').replace("”", '"')
|
||
s = s.replace("–", "-").replace("—", "-")
|
||
|
||
# Remove parenthetical/bracketed version cruft where possible.
|
||
# Keep this conservative: it removes only common edition/version tags.
|
||
for pat in VERSION_NOISE_PATTERNS:
|
||
s = re.sub(pat, " ", s, flags=re.IGNORECASE)
|
||
|
||
# Remove punctuation-ish separators.
|
||
s = re.sub(r"[\[\]\(\)\{\},:;.!?\"'`~]", " ", s)
|
||
s = re.sub(r"\s*-\s*", " ", s)
|
||
|
||
# Collapse all non-word-ish runs.
|
||
s = re.sub(r"[^a-z0-9]+", " ", s)
|
||
s = re.sub(r"\s+", " ", s).strip()
|
||
|
||
# Remove leading articles. Helps with "The Beatles" vs "Beatles".
|
||
s = re.sub(r"^(the|a|an)\s+", "", s)
|
||
|
||
return s
|
||
|
||
|
||
def normalize_isrc(value: Any) -> str:
|
||
if value is None:
|
||
return ""
|
||
return re.sub(r"[^A-Z0-9]", "", str(value).upper())
|
||
|
||
|
||
def to_int(value: Any) -> int | None:
|
||
if value is None or value == "":
|
||
return None
|
||
try:
|
||
return int(value)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
def to_float(value: Any) -> float | None:
|
||
if value is None or value == "":
|
||
return None
|
||
try:
|
||
return float(value)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
def release_year(release_date: str) -> str:
|
||
if not release_date:
|
||
return ""
|
||
m = re.match(r"^(\d{4})", release_date)
|
||
return m.group(1) if m else ""
|
||
|
||
|
||
# -----------------------------
|
||
# Spotify OAuth
|
||
# -----------------------------
|
||
|
||
# class SpotifyCallbackHandler(BaseHTTPRequestHandler):
|
||
# auth_code: str | None = None
|
||
# auth_error: str | None = None
|
||
# expected_state: str | None = None
|
||
|
||
# def do_GET(self) -> None:
|
||
# parsed = urllib.parse.urlparse(self.path)
|
||
# params = urllib.parse.parse_qs(parsed.query)
|
||
|
||
# state = params.get("state", [""])[0]
|
||
# code = params.get("code", [None])[0]
|
||
# error = params.get("error", [None])[0]
|
||
|
||
# if self.expected_state and state != self.expected_state:
|
||
# self.auth_error = "OAuth state mismatch. Refusing token exchange."
|
||
# elif error:
|
||
# self.auth_error = error
|
||
# elif code:
|
||
# self.auth_code = code
|
||
# else:
|
||
# self.auth_error = "No code returned by Spotify."
|
||
|
||
# self.send_response(200)
|
||
# self.send_header("Content-Type", "text/html")
|
||
# self.end_headers()
|
||
|
||
# if self.auth_error:
|
||
# body = f"<h1>Spotify authentication failed</h1><p>{self.auth_error}</p>"
|
||
# else:
|
||
# body = "<h1>Spotify authentication complete</h1><p>You can close this tab.</p>"
|
||
|
||
# self.wfile.write(body.encode("utf-8"))
|
||
|
||
# def log_message(self, fmt: str, *args: Any) -> None:
|
||
# # Silence default HTTP logs.
|
||
# return
|
||
|
||
|
||
def random_state() -> str:
|
||
return hashlib.sha256(os.urandom(32)).hexdigest()
|
||
|
||
|
||
def get_spotify_token(client_id: str, client_secret: str) -> str:
|
||
state = random_state()
|
||
# SpotifyCallbackHandler.auth_code = None
|
||
# SpotifyCallbackHandler.auth_error = None
|
||
# SpotifyCallbackHandler.expected_state = state
|
||
auth_code = None
|
||
auth_error = None
|
||
expected_state = state
|
||
|
||
params = {
|
||
"response_type": "code",
|
||
"client_id": client_id,
|
||
"scope": SPOTIFY_SCOPE,
|
||
"redirect_uri": SPOTIFY_REDIRECT_URI,
|
||
"state": state,
|
||
}
|
||
|
||
auth_url = f"{SPOTIFY_AUTH_URL}?{urllib.parse.urlencode(params)}"
|
||
|
||
print(f"Paste the following URL into a browser:\n{auth_url}\n")
|
||
|
||
#webbrowser.open(auth_url)
|
||
|
||
# server = HTTPServer(("127.0.0.1", 8888), SpotifyCallbackHandler)
|
||
# server.timeout = 180
|
||
|
||
# start = time.time()
|
||
# while time.time() - start < 180:
|
||
# server.handle_request()
|
||
# if SpotifyCallbackHandler.auth_code or SpotifyCallbackHandler.auth_error:
|
||
# break
|
||
|
||
# server.server_close()
|
||
redirected_url = input("Paste the full redirected URL here: ").strip()
|
||
parsed = urllib.parse.urlparse(redirected_url)
|
||
params = urllib.parse.parse_qs(parsed.query)
|
||
|
||
state = params.get("state", [""])[0]
|
||
code = params.get("code", [None])[0]
|
||
error = params.get("error", [None])[0]
|
||
|
||
if expected_state and state != expected_state:
|
||
auth_error = "OAuth state mismatch. Refusing token exchange."
|
||
elif error:
|
||
auth_error = error
|
||
elif code:
|
||
auth_code = code
|
||
else:
|
||
auth_error = "No code returned by Spotify."
|
||
|
||
if auth_error:
|
||
raise RuntimeError(f"Spotify auth failed: {SpotifyCallbackHandler.auth_error}")
|
||
|
||
if not auth_code:
|
||
raise RuntimeError("Timed out waiting for Spotify OAuth callback.")
|
||
|
||
basic = base64.b64encode(f"{client_id}:{client_secret}".encode("utf-8")).decode("ascii")
|
||
|
||
resp = requests.post(
|
||
SPOTIFY_TOKEN_URL,
|
||
headers={
|
||
"Authorization": f"Basic {basic}",
|
||
"Content-Type": "application/x-www-form-urlencoded",
|
||
},
|
||
data={
|
||
"grant_type": "authorization_code",
|
||
"code": auth_code,
|
||
"redirect_uri": SPOTIFY_REDIRECT_URI,
|
||
},
|
||
timeout=30,
|
||
)
|
||
|
||
if resp.status_code >= 400:
|
||
raise RuntimeError(f"Spotify token exchange failed: {resp.status_code} {resp.text}")
|
||
|
||
data = resp.json()
|
||
return data["access_token"]
|
||
|
||
|
||
def spotify_get(url: str, token: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
|
||
while True:
|
||
resp = requests.get(
|
||
url,
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
params=params,
|
||
timeout=60,
|
||
)
|
||
|
||
if resp.status_code == 429:
|
||
retry_after = int(resp.headers.get("Retry-After", "3"))
|
||
print(f"Spotify rate-limited us. Sleeping {retry_after}s...")
|
||
time.sleep(retry_after)
|
||
continue
|
||
|
||
if resp.status_code >= 400:
|
||
raise RuntimeError(f"Spotify request failed: {resp.status_code} {resp.text}")
|
||
|
||
return resp.json()
|
||
|
||
|
||
def fetch_spotify_liked_tracks(token: str) -> list[SpotifyTrack]:
|
||
tracks: list[SpotifyTrack] = []
|
||
|
||
url = SPOTIFY_SAVED_TRACKS_URL
|
||
params = {"limit": 50, "offset": 0}
|
||
|
||
first = spotify_get(url, token, params=params)
|
||
total = int(first.get("total", 0))
|
||
|
||
def consume_page(page: dict[str, Any]) -> None:
|
||
for item in page.get("items", []):
|
||
added_at = item.get("added_at", "")
|
||
track = item.get("track") or {}
|
||
|
||
# Skip unavailable/local/null weirdness.
|
||
if not track or not track.get("id"):
|
||
continue
|
||
|
||
album = track.get("album") or {}
|
||
artists = track.get("artists") or []
|
||
album_artists = album.get("artists") or []
|
||
external_ids = track.get("external_ids") or {}
|
||
|
||
artist_names = [a.get("name", "") for a in artists if a.get("name")]
|
||
album_artist_names = [a.get("name", "") for a in album_artists if a.get("name")]
|
||
|
||
spotify_track_url = (
|
||
track.get("external_urls", {}).get("spotify", "")
|
||
or f"spotify:track:{track.get('id', '')}"
|
||
)
|
||
album_url = (
|
||
album.get("external_urls", {}).get("spotify", "")
|
||
or f"spotify:album:{album.get('id', '')}"
|
||
)
|
||
|
||
release_date = album.get("release_date", "") or ""
|
||
|
||
tracks.append(
|
||
SpotifyTrack(
|
||
spotify_track_id=track.get("id", "") or "",
|
||
spotify_track_url=spotify_track_url,
|
||
isrc=normalize_isrc(external_ids.get("isrc", "")),
|
||
title=track.get("name", "") or "",
|
||
artists="; ".join(artist_names),
|
||
primary_artist=artist_names[0] if artist_names else "",
|
||
album=album.get("name", "") or "",
|
||
album_artist="; ".join(album_artist_names),
|
||
album_id=album.get("id", "") or "",
|
||
album_url=album_url,
|
||
album_type=album.get("album_type", "") or "",
|
||
release_date=release_date,
|
||
release_year=release_year(release_date),
|
||
disc_number=to_int(track.get("disc_number")),
|
||
track_number=to_int(track.get("track_number")),
|
||
duration_ms=to_int(track.get("duration_ms")),
|
||
total_tracks=to_int(album.get("total_tracks")),
|
||
added_at=added_at,
|
||
)
|
||
)
|
||
|
||
consume_page(first)
|
||
|
||
with tqdm(total=total, initial=len(tracks), unit="track", desc="Spotify liked songs") as pbar:
|
||
next_url = first.get("next")
|
||
while next_url:
|
||
page = spotify_get(next_url, token)
|
||
before = len(tracks)
|
||
consume_page(page)
|
||
pbar.update(len(tracks) - before)
|
||
next_url = page.get("next")
|
||
|
||
return tracks
|
||
|
||
|
||
# -----------------------------
|
||
# Beets export
|
||
# -----------------------------
|
||
|
||
def run_beets_export(beet_cmd: str) -> list[dict[str, Any]]:
|
||
cmd = [beet_cmd, "export", "-f", "json"]
|
||
|
||
print(f"Running Beets export: {' '.join(cmd)}")
|
||
|
||
proc = subprocess.run(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
check=False,
|
||
)
|
||
|
||
if proc.returncode != 0:
|
||
raise RuntimeError(
|
||
"Beets export failed.\n"
|
||
f"Command: {' '.join(cmd)}\n"
|
||
f"Exit code: {proc.returncode}\n"
|
||
f"stderr:\n{proc.stderr}"
|
||
)
|
||
|
||
return parse_beets_json(proc.stdout)
|
||
|
||
|
||
def parse_beets_json(raw: str) -> list[dict[str, Any]]:
|
||
raw = raw.strip()
|
||
if not raw:
|
||
return []
|
||
|
||
# Normal JSON array/object.
|
||
try:
|
||
parsed = json.loads(raw)
|
||
if isinstance(parsed, list):
|
||
return parsed
|
||
if isinstance(parsed, dict):
|
||
# Some exporters wrap records.
|
||
for key in ("items", "tracks", "data"):
|
||
if isinstance(parsed.get(key), list):
|
||
return parsed[key]
|
||
return [parsed]
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# JSON Lines fallback.
|
||
rows: list[dict[str, Any]] = []
|
||
for line in raw.splitlines():
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
rows.append(json.loads(line))
|
||
return rows
|
||
|
||
|
||
def load_beets_export_file(path: Path) -> list[dict[str, Any]]:
|
||
print(f"Reading Beets export from {path}")
|
||
return parse_beets_json(path.read_text(encoding="utf-8"))
|
||
|
||
|
||
def beets_track_from_row(row: dict[str, Any]) -> BeetsTrack:
|
||
# Beets field names are generally simple, but this keeps things tolerant.
|
||
title = str(row.get("title", "") or "")
|
||
artist = str(row.get("artist", "") or "")
|
||
album = str(row.get("album", "") or "")
|
||
albumartist = str(row.get("albumartist", row.get("album_artist", "")) or "")
|
||
|
||
year = (
|
||
str(row.get("year", "") or "")
|
||
or str(row.get("original_year", "") or "")
|
||
or str(row.get("date", "") or "")
|
||
)
|
||
|
||
path = row.get("path", "")
|
||
if isinstance(path, bytes):
|
||
path = path.decode("utf-8", errors="replace")
|
||
|
||
return BeetsTrack(
|
||
title=title,
|
||
artist=artist,
|
||
album=album,
|
||
albumartist=albumartist,
|
||
isrc=normalize_isrc(row.get("isrc", "")),
|
||
mb_trackid=str(row.get("mb_trackid", row.get("mb_track_id", "")) or ""),
|
||
mb_albumid=str(row.get("mb_albumid", row.get("mb_album_id", "")) or ""),
|
||
track=to_int(row.get("track", row.get("track_number"))),
|
||
disc=to_int(row.get("disc", row.get("disc_number"))),
|
||
length_seconds=to_float(row.get("length", row.get("duration"))),
|
||
year=year,
|
||
path=str(path or ""),
|
||
format=str(row.get("format", "") or ""),
|
||
)
|
||
|
||
|
||
# -----------------------------
|
||
# Matching
|
||
# -----------------------------
|
||
|
||
class BeetsMatcher:
|
||
def __init__(self, beets_tracks: Iterable[BeetsTrack]) -> None:
|
||
self.tracks = list(beets_tracks)
|
||
|
||
self.by_isrc: dict[str, list[BeetsTrack]] = {}
|
||
self.by_album_track: dict[tuple[str, str, int | None, int | None], list[BeetsTrack]] = {}
|
||
self.by_title_artist: dict[tuple[str, str], list[BeetsTrack]] = {}
|
||
|
||
for t in self.tracks:
|
||
if t.isrc:
|
||
self.by_isrc.setdefault(t.isrc, []).append(t)
|
||
|
||
album_artist = normalize_text(t.albumartist or t.artist)
|
||
album = normalize_text(t.album)
|
||
self.by_album_track.setdefault((album_artist, album, t.disc, t.track), []).append(t)
|
||
|
||
self.by_title_artist.setdefault(
|
||
(normalize_text(t.title), normalize_text(t.artist)),
|
||
[],
|
||
).append(t)
|
||
|
||
def match(self, s: SpotifyTrack) -> MatchResult:
|
||
# 1. ISRC exact match.
|
||
if s.isrc and s.isrc in self.by_isrc:
|
||
return MatchResult(
|
||
spotify=s,
|
||
status="present",
|
||
reason="exact_isrc",
|
||
beets=self.by_isrc[s.isrc][0],
|
||
score=1.0,
|
||
)
|
||
|
||
# 2. Album artist + album + disc + track exact-ish metadata.
|
||
album_artist_candidates = [
|
||
normalize_text(s.album_artist),
|
||
normalize_text(s.primary_artist),
|
||
]
|
||
album_key = normalize_text(s.album)
|
||
|
||
for album_artist_key in album_artist_candidates:
|
||
key = (album_artist_key, album_key, s.disc_number, s.track_number)
|
||
candidates = self.by_album_track.get(key, [])
|
||
if candidates:
|
||
return MatchResult(
|
||
spotify=s,
|
||
status="present",
|
||
reason="album_artist_album_disc_track",
|
||
beets=candidates[0],
|
||
score=1.0,
|
||
)
|
||
|
||
# 3. Title + primary artist, then check duration if possible.
|
||
title_artist_key = (normalize_text(s.title), normalize_text(s.primary_artist))
|
||
candidates = self.by_title_artist.get(title_artist_key, [])
|
||
|
||
if candidates:
|
||
spotify_seconds = (s.duration_ms / 1000.0) if s.duration_ms else None
|
||
|
||
if spotify_seconds is not None:
|
||
duration_candidates = [
|
||
c for c in candidates
|
||
if c.length_seconds is not None
|
||
and abs(c.length_seconds - spotify_seconds) <= 4.0
|
||
]
|
||
if duration_candidates:
|
||
return MatchResult(
|
||
spotify=s,
|
||
status="present",
|
||
reason="title_artist_duration_within_4s",
|
||
beets=duration_candidates[0],
|
||
score=1.0,
|
||
)
|
||
|
||
# If title and artist match exactly after normalization, it is probably present,
|
||
# even if duration is missing from Beets export.
|
||
return MatchResult(
|
||
spotify=s,
|
||
status="present",
|
||
reason="title_artist_exact_normalized",
|
||
beets=candidates[0],
|
||
score=0.97,
|
||
)
|
||
|
||
# 4. Fuzzy possible match. Do not treat this as present.
|
||
possible = self.best_fuzzy_candidate(s)
|
||
if possible is not None:
|
||
beets_track, score = possible
|
||
return MatchResult(
|
||
spotify=s,
|
||
status="possible_match",
|
||
reason="fuzzy_metadata_match_review_needed",
|
||
beets=beets_track,
|
||
score=score,
|
||
)
|
||
|
||
return MatchResult(
|
||
spotify=s,
|
||
status="missing",
|
||
reason="no_match_found",
|
||
beets=None,
|
||
score=None,
|
||
)
|
||
|
||
def best_fuzzy_candidate(self, s: SpotifyTrack) -> tuple[BeetsTrack, float] | None:
|
||
s_title = normalize_text(s.title)
|
||
s_artist = normalize_text(s.primary_artist)
|
||
s_album = normalize_text(s.album)
|
||
s_duration = (s.duration_ms / 1000.0) if s.duration_ms else None
|
||
|
||
best: tuple[BeetsTrack, float] | None = None
|
||
|
||
# Keep this simple and conservative. We only fuzzy scan candidates where either
|
||
# artist or album has some overlap.
|
||
for b in self.tracks:
|
||
b_title = normalize_text(b.title)
|
||
b_artist = normalize_text(b.artist)
|
||
b_album = normalize_text(b.album)
|
||
|
||
if not b_title:
|
||
continue
|
||
|
||
artist_ratio = difflib.SequenceMatcher(None, s_artist, b_artist).ratio()
|
||
album_ratio = difflib.SequenceMatcher(None, s_album, b_album).ratio()
|
||
|
||
if artist_ratio < 0.80 and album_ratio < 0.80:
|
||
continue
|
||
|
||
title_ratio = difflib.SequenceMatcher(None, s_title, b_title).ratio()
|
||
|
||
duration_penalty = 0.0
|
||
if s_duration is not None and b.length_seconds is not None:
|
||
diff = abs(s_duration - b.length_seconds)
|
||
if diff > 12:
|
||
duration_penalty = 0.15
|
||
elif diff > 6:
|
||
duration_penalty = 0.05
|
||
|
||
score = (title_ratio * 0.55) + (artist_ratio * 0.30) + (album_ratio * 0.15) - duration_penalty
|
||
|
||
if best is None or score > best[1]:
|
||
best = (b, score)
|
||
|
||
# Conservative threshold. Below this, it creates too much noise.
|
||
if best and best[1] >= 0.91:
|
||
return best
|
||
|
||
return None
|
||
|
||
|
||
# -----------------------------
|
||
# Reports
|
||
# -----------------------------
|
||
|
||
def write_csv(path: Path, rows: list[dict[str, Any]], fieldnames: list[str]) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
with path.open("w", encoding="utf-8", newline="") as f:
|
||
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
|
||
writer.writeheader()
|
||
writer.writerows(rows)
|
||
|
||
print(f"Wrote {path} ({len(rows)} rows)")
|
||
|
||
|
||
def result_base_row(r: MatchResult) -> dict[str, Any]:
|
||
s = r.spotify
|
||
b = r.beets
|
||
|
||
return {
|
||
"Track Name": s.title,
|
||
"Album": s.album,
|
||
"Artist": s.artists,
|
||
"Primary Artist": s.primary_artist,
|
||
"Album Artist": s.album_artist,
|
||
"Year/Date": s.release_date or s.release_year,
|
||
"Release Year": s.release_year,
|
||
"Disc Number": s.disc_number,
|
||
"Track Number": s.track_number,
|
||
"Spotify ISRC": s.isrc,
|
||
"Spotify Track URL": s.spotify_track_url,
|
||
"Spotify Album URL": s.album_url,
|
||
"Spotify Album ID": s.album_id,
|
||
"Spotify Album Type": s.album_type,
|
||
"Spotify Album Total Tracks": s.total_tracks,
|
||
"Spotify Liked At": s.added_at,
|
||
"Match Status": r.status,
|
||
"Match Reason": r.reason,
|
||
"Match Score": "" if r.score is None else f"{r.score:.3f}",
|
||
"Beets Track": b.title if b else "",
|
||
"Beets Album": b.album if b else "",
|
||
"Beets Artist": b.artist if b else "",
|
||
"Beets Album Artist": b.albumartist if b else "",
|
||
"Beets Year": b.year if b else "",
|
||
"Beets Path": b.path if b else "",
|
||
"Beets Format": b.format if b else "",
|
||
}
|
||
|
||
|
||
TRACK_FIELDS = [
|
||
"Track Name",
|
||
"Album",
|
||
"Artist",
|
||
"Primary Artist",
|
||
"Album Artist",
|
||
"Year/Date",
|
||
"Release Year",
|
||
"Disc Number",
|
||
"Track Number",
|
||
"Spotify ISRC",
|
||
"Spotify Track URL",
|
||
"Spotify Album URL",
|
||
"Spotify Album ID",
|
||
"Spotify Album Type",
|
||
"Spotify Album Total Tracks",
|
||
"Spotify Liked At",
|
||
"Match Status",
|
||
"Match Reason",
|
||
"Match Score",
|
||
"Beets Track",
|
||
"Beets Album",
|
||
"Beets Artist",
|
||
"Beets Album Artist",
|
||
"Beets Year",
|
||
"Beets Path",
|
||
"Beets Format",
|
||
]
|
||
|
||
|
||
def album_report_rows(results: list[MatchResult]) -> list[dict[str, Any]]:
|
||
grouped: dict[str, list[MatchResult]] = {}
|
||
|
||
for r in results:
|
||
key = r.spotify.album_id or f"{r.spotify.album_artist}::{r.spotify.album}"
|
||
grouped.setdefault(key, []).append(r)
|
||
|
||
rows: list[dict[str, Any]] = []
|
||
|
||
for _, group in grouped.items():
|
||
sample = group[0].spotify
|
||
|
||
liked_count = len(group)
|
||
present_count = sum(1 for r in group if r.status == "present")
|
||
missing_count = sum(1 for r in group if r.status == "missing")
|
||
possible_count = sum(1 for r in group if r.status == "possible_match")
|
||
|
||
if present_count == liked_count:
|
||
status = "all_liked_tracks_present"
|
||
elif missing_count == liked_count:
|
||
status = "all_liked_tracks_missing"
|
||
else:
|
||
status = "partial_liked_tracks_present"
|
||
|
||
missing_tracks = [
|
||
r.spotify.title
|
||
for r in group
|
||
if r.status in {"missing", "possible_match"}
|
||
]
|
||
|
||
# Only output albums that need attention.
|
||
if status == "all_liked_tracks_present":
|
||
continue
|
||
|
||
rows.append(
|
||
{
|
||
"Album": sample.album,
|
||
"Album Artist": sample.album_artist,
|
||
"Year/Date": sample.release_date or sample.release_year,
|
||
"Release Year": sample.release_year,
|
||
"Spotify Album Type": sample.album_type,
|
||
"Spotify Album Total Tracks": sample.total_tracks,
|
||
"Liked Tracks Count": liked_count,
|
||
"Present Liked Tracks Count": present_count,
|
||
"Missing Liked Tracks Count": missing_count,
|
||
"Possible Match Count": possible_count,
|
||
"Album Coverage Status": status,
|
||
"Missing Or Review Tracks": " | ".join(missing_tracks),
|
||
"Spotify Album URL": sample.album_url,
|
||
"Spotify Album ID": sample.album_id,
|
||
}
|
||
)
|
||
|
||
rows.sort(
|
||
key=lambda row: (
|
||
row["Album Coverage Status"] != "all_liked_tracks_missing",
|
||
str(row["Album Artist"]).casefold(),
|
||
str(row["Album"]).casefold(),
|
||
)
|
||
)
|
||
|
||
return rows
|
||
|
||
|
||
ALBUM_FIELDS = [
|
||
"Album",
|
||
"Album Artist",
|
||
"Year/Date",
|
||
"Release Year",
|
||
"Spotify Album Type",
|
||
"Spotify Album Total Tracks",
|
||
"Liked Tracks Count",
|
||
"Present Liked Tracks Count",
|
||
"Missing Liked Tracks Count",
|
||
"Possible Match Count",
|
||
"Album Coverage Status",
|
||
"Missing Or Review Tracks",
|
||
"Spotify Album URL",
|
||
"Spotify Album ID",
|
||
]
|
||
|
||
|
||
# -----------------------------
|
||
# Main
|
||
# -----------------------------
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(
|
||
description="Compare Spotify Liked Songs against a Beets library and produce missing-track/album CSV reports."
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--output-dir",
|
||
default=DEFAULT_OUTPUT_DIR,
|
||
help=f"Output directory for CSV reports. Default: {DEFAULT_OUTPUT_DIR}",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--beet-cmd",
|
||
default="beet",
|
||
help="Path/name of beet command. Default: beet",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--beets-json",
|
||
type=Path,
|
||
default=None,
|
||
help="Use an existing Beets JSON export instead of running beet export.",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--spotify-json",
|
||
type=Path,
|
||
default=None,
|
||
help="Use an existing Spotify liked tracks JSON cache instead of calling Spotify.",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--write-raw-cache",
|
||
action="store_true",
|
||
help="Also write raw-ish normalized cache JSON files into the output directory.",
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
output_dir = Path(args.output_dir)
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Load Beets.
|
||
if args.beets_json:
|
||
beets_rows = load_beets_export_file(args.beets_json)
|
||
else:
|
||
beets_rows = run_beets_export(args.beet_cmd)
|
||
|
||
beets_tracks = [beets_track_from_row(row) for row in beets_rows]
|
||
print(f"Loaded {len(beets_tracks)} Beets tracks")
|
||
|
||
# Load Spotify.
|
||
if args.spotify_json:
|
||
print(f"Reading Spotify liked tracks from {args.spotify_json}")
|
||
raw = json.loads(args.spotify_json.read_text(encoding="utf-8"))
|
||
spotify_tracks = [SpotifyTrack(**item) for item in raw]
|
||
else:
|
||
client_id = os.environ.get("SPOTIFY_CLIENT_ID")
|
||
client_secret = os.environ.get("SPOTIFY_CLIENT_SECRET")
|
||
|
||
if not client_id or not client_secret:
|
||
print(
|
||
"Missing SPOTIFY_CLIENT_ID or SPOTIFY_CLIENT_SECRET environment variables.",
|
||
file=sys.stderr,
|
||
)
|
||
return 2
|
||
|
||
token = get_spotify_token(client_id, client_secret)
|
||
spotify_tracks = fetch_spotify_liked_tracks(token)
|
||
|
||
print(f"Loaded {len(spotify_tracks)} Spotify liked tracks")
|
||
|
||
if args.write_raw_cache:
|
||
spotify_cache = output_dir / "spotify_liked_tracks_cache.json"
|
||
beets_cache = output_dir / "beets_tracks_cache.json"
|
||
|
||
spotify_cache.write_text(
|
||
json.dumps([s.__dict__ for s in spotify_tracks], ensure_ascii=False, indent=2),
|
||
encoding="utf-8",
|
||
)
|
||
beets_cache.write_text(
|
||
json.dumps([b.__dict__ for b in beets_tracks], ensure_ascii=False, indent=2),
|
||
encoding="utf-8",
|
||
)
|
||
|
||
print(f"Wrote {spotify_cache}")
|
||
print(f"Wrote {beets_cache}")
|
||
|
||
# Match.
|
||
matcher = BeetsMatcher(beets_tracks)
|
||
|
||
results: list[MatchResult] = []
|
||
for s in tqdm(spotify_tracks, desc="Matching", unit="track"):
|
||
results.append(matcher.match(s))
|
||
|
||
missing = [result_base_row(r) for r in results if r.status == "missing"]
|
||
possible = [result_base_row(r) for r in results if r.status == "possible_match"]
|
||
present = [result_base_row(r) for r in results if r.status == "present"]
|
||
albums = album_report_rows(results)
|
||
|
||
write_csv(output_dir / "missing_tracks.csv", missing, TRACK_FIELDS)
|
||
write_csv(output_dir / "possible_matches.csv", possible, TRACK_FIELDS)
|
||
write_csv(output_dir / "present_tracks.csv", present, TRACK_FIELDS)
|
||
write_csv(output_dir / "partial_or_missing_albums.csv", albums, ALBUM_FIELDS)
|
||
|
||
print()
|
||
print("Summary")
|
||
print("-------")
|
||
print(f"Spotify liked tracks: {len(spotify_tracks)}")
|
||
print(f"Beets tracks: {len(beets_tracks)}")
|
||
print(f"Present: {len(present)}")
|
||
print(f"Missing: {len(missing)}")
|
||
print(f"Possible matches: {len(possible)}")
|
||
print(f"Albums needing review:{len(albums)}")
|
||
print()
|
||
print(f"Primary buying list: {output_dir / 'missing_tracks.csv'}")
|
||
print(f"Album review list: {output_dir / 'partial_or_missing_albums.csv'}")
|
||
print(f"Manual review list: {output_dir / 'possible_matches.csv'}")
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|