Files
beets-setup/scripts/unknown/old_spotify.py
2026-05-12 12:27:32 -04:00

938 lines
29 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import csv
import difflib
import hashlib
import json
import os
import re
import subprocess
import sys
import time
import unicodedata
import urllib.parse
import webbrowser
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Any, Iterable
import requests
from tqdm import tqdm
SPOTIFY_AUTH_URL = "https://accounts.spotify.com/authorize"
SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
SPOTIFY_SAVED_TRACKS_URL = "https://api.spotify.com/v1/me/tracks"
SPOTIFY_REDIRECT_URI = "http://127.0.0.1:8888/callback"
SPOTIFY_SCOPE = "user-library-read"
DEFAULT_OUTPUT_DIR = "spotify_beets_reports"
# -----------------------------
# Models
# -----------------------------
@dataclass(frozen=True)
class SpotifyTrack:
spotify_track_id: str
spotify_track_url: str
isrc: str
title: str
artists: str
primary_artist: str
album: str
album_artist: str
album_id: str
album_url: str
album_type: str
release_date: str
release_year: str
disc_number: int | None
track_number: int | None
duration_ms: int | None
total_tracks: int | None
added_at: str
@dataclass(frozen=True)
class BeetsTrack:
title: str
artist: str
album: str
albumartist: str
isrc: str
mb_trackid: str
mb_albumid: str
track: int | None
disc: int | None
length_seconds: float | None
year: str
path: str
format: str
@dataclass
class MatchResult:
spotify: SpotifyTrack
status: str
reason: str
beets: BeetsTrack | None = None
score: float | None = None
# -----------------------------
# Normalization
# -----------------------------
VERSION_NOISE_PATTERNS = [
r"\bremaster(?:ed)?(?:\s+\d{4})?\b",
r"\b\d{4}\s+remaster(?:ed)?\b",
r"\bdeluxe\b",
r"\bexpanded\b",
r"\banniversary\b",
r"\bspecial edition\b",
r"\bcollector'?s edition\b",
r"\bbonus track\b",
r"\bradio edit\b",
r"\bsingle version\b",
r"\balbum version\b",
r"\bexplicit\b",
r"\bclean\b",
r"\bmono\b",
r"\bstereo\b",
r"\boriginal mix\b",
r"\bfeat\.?\b",
r"\bfeaturing\b",
]
def normalize_text(value: Any) -> str:
if value is None:
return ""
s = str(value)
s = unicodedata.normalize("NFKD", s)
s = "".join(ch for ch in s if not unicodedata.combining(ch))
s = s.casefold()
s = s.replace("&", " and ")
s = s.replace("", "'")
s = s.replace("", '"').replace("", '"')
s = s.replace("", "-").replace("", "-")
# Remove parenthetical/bracketed version cruft where possible.
# Keep this conservative: it removes only common edition/version tags.
for pat in VERSION_NOISE_PATTERNS:
s = re.sub(pat, " ", s, flags=re.IGNORECASE)
# Remove punctuation-ish separators.
s = re.sub(r"[\[\]\(\)\{\},:;.!?\"'`~]", " ", s)
s = re.sub(r"\s*-\s*", " ", s)
# Collapse all non-word-ish runs.
s = re.sub(r"[^a-z0-9]+", " ", s)
s = re.sub(r"\s+", " ", s).strip()
# Remove leading articles. Helps with "The Beatles" vs "Beatles".
s = re.sub(r"^(the|a|an)\s+", "", s)
return s
def normalize_isrc(value: Any) -> str:
if value is None:
return ""
return re.sub(r"[^A-Z0-9]", "", str(value).upper())
def to_int(value: Any) -> int | None:
if value is None or value == "":
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def to_float(value: Any) -> float | None:
if value is None or value == "":
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def release_year(release_date: str) -> str:
if not release_date:
return ""
m = re.match(r"^(\d{4})", release_date)
return m.group(1) if m else ""
# -----------------------------
# Spotify OAuth
# -----------------------------
# class SpotifyCallbackHandler(BaseHTTPRequestHandler):
# auth_code: str | None = None
# auth_error: str | None = None
# expected_state: str | None = None
# def do_GET(self) -> None:
# parsed = urllib.parse.urlparse(self.path)
# params = urllib.parse.parse_qs(parsed.query)
# state = params.get("state", [""])[0]
# code = params.get("code", [None])[0]
# error = params.get("error", [None])[0]
# if self.expected_state and state != self.expected_state:
# self.auth_error = "OAuth state mismatch. Refusing token exchange."
# elif error:
# self.auth_error = error
# elif code:
# self.auth_code = code
# else:
# self.auth_error = "No code returned by Spotify."
# self.send_response(200)
# self.send_header("Content-Type", "text/html")
# self.end_headers()
# if self.auth_error:
# body = f"<h1>Spotify authentication failed</h1><p>{self.auth_error}</p>"
# else:
# body = "<h1>Spotify authentication complete</h1><p>You can close this tab.</p>"
# self.wfile.write(body.encode("utf-8"))
# def log_message(self, fmt: str, *args: Any) -> None:
# # Silence default HTTP logs.
# return
def random_state() -> str:
return hashlib.sha256(os.urandom(32)).hexdigest()
def get_spotify_token(client_id: str, client_secret: str) -> str:
state = random_state()
# SpotifyCallbackHandler.auth_code = None
# SpotifyCallbackHandler.auth_error = None
# SpotifyCallbackHandler.expected_state = state
auth_code = None
auth_error = None
expected_state = state
params = {
"response_type": "code",
"client_id": client_id,
"scope": SPOTIFY_SCOPE,
"redirect_uri": SPOTIFY_REDIRECT_URI,
"state": state,
}
auth_url = f"{SPOTIFY_AUTH_URL}?{urllib.parse.urlencode(params)}"
print(f"Paste the following URL into a browser:\n{auth_url}\n")
#webbrowser.open(auth_url)
# server = HTTPServer(("127.0.0.1", 8888), SpotifyCallbackHandler)
# server.timeout = 180
# start = time.time()
# while time.time() - start < 180:
# server.handle_request()
# if SpotifyCallbackHandler.auth_code or SpotifyCallbackHandler.auth_error:
# break
# server.server_close()
redirected_url = input("Paste the full redirected URL here: ").strip()
parsed = urllib.parse.urlparse(redirected_url)
params = urllib.parse.parse_qs(parsed.query)
state = params.get("state", [""])[0]
code = params.get("code", [None])[0]
error = params.get("error", [None])[0]
if expected_state and state != expected_state:
auth_error = "OAuth state mismatch. Refusing token exchange."
elif error:
auth_error = error
elif code:
auth_code = code
else:
auth_error = "No code returned by Spotify."
if auth_error:
raise RuntimeError(f"Spotify auth failed: {SpotifyCallbackHandler.auth_error}")
if not auth_code:
raise RuntimeError("Timed out waiting for Spotify OAuth callback.")
basic = base64.b64encode(f"{client_id}:{client_secret}".encode("utf-8")).decode("ascii")
resp = requests.post(
SPOTIFY_TOKEN_URL,
headers={
"Authorization": f"Basic {basic}",
"Content-Type": "application/x-www-form-urlencoded",
},
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": SPOTIFY_REDIRECT_URI,
},
timeout=30,
)
if resp.status_code >= 400:
raise RuntimeError(f"Spotify token exchange failed: {resp.status_code} {resp.text}")
data = resp.json()
return data["access_token"]
def spotify_get(url: str, token: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
while True:
resp = requests.get(
url,
headers={"Authorization": f"Bearer {token}"},
params=params,
timeout=60,
)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", "3"))
print(f"Spotify rate-limited us. Sleeping {retry_after}s...")
time.sleep(retry_after)
continue
if resp.status_code >= 400:
raise RuntimeError(f"Spotify request failed: {resp.status_code} {resp.text}")
return resp.json()
def fetch_spotify_liked_tracks(token: str) -> list[SpotifyTrack]:
tracks: list[SpotifyTrack] = []
url = SPOTIFY_SAVED_TRACKS_URL
params = {"limit": 50, "offset": 0}
first = spotify_get(url, token, params=params)
total = int(first.get("total", 0))
def consume_page(page: dict[str, Any]) -> None:
for item in page.get("items", []):
added_at = item.get("added_at", "")
track = item.get("track") or {}
# Skip unavailable/local/null weirdness.
if not track or not track.get("id"):
continue
album = track.get("album") or {}
artists = track.get("artists") or []
album_artists = album.get("artists") or []
external_ids = track.get("external_ids") or {}
artist_names = [a.get("name", "") for a in artists if a.get("name")]
album_artist_names = [a.get("name", "") for a in album_artists if a.get("name")]
spotify_track_url = (
track.get("external_urls", {}).get("spotify", "")
or f"spotify:track:{track.get('id', '')}"
)
album_url = (
album.get("external_urls", {}).get("spotify", "")
or f"spotify:album:{album.get('id', '')}"
)
release_date = album.get("release_date", "") or ""
tracks.append(
SpotifyTrack(
spotify_track_id=track.get("id", "") or "",
spotify_track_url=spotify_track_url,
isrc=normalize_isrc(external_ids.get("isrc", "")),
title=track.get("name", "") or "",
artists="; ".join(artist_names),
primary_artist=artist_names[0] if artist_names else "",
album=album.get("name", "") or "",
album_artist="; ".join(album_artist_names),
album_id=album.get("id", "") or "",
album_url=album_url,
album_type=album.get("album_type", "") or "",
release_date=release_date,
release_year=release_year(release_date),
disc_number=to_int(track.get("disc_number")),
track_number=to_int(track.get("track_number")),
duration_ms=to_int(track.get("duration_ms")),
total_tracks=to_int(album.get("total_tracks")),
added_at=added_at,
)
)
consume_page(first)
with tqdm(total=total, initial=len(tracks), unit="track", desc="Spotify liked songs") as pbar:
next_url = first.get("next")
while next_url:
page = spotify_get(next_url, token)
before = len(tracks)
consume_page(page)
pbar.update(len(tracks) - before)
next_url = page.get("next")
return tracks
# -----------------------------
# Beets export
# -----------------------------
def run_beets_export(beet_cmd: str) -> list[dict[str, Any]]:
cmd = [beet_cmd, "export", "-f", "json"]
print(f"Running Beets export: {' '.join(cmd)}")
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
if proc.returncode != 0:
raise RuntimeError(
"Beets export failed.\n"
f"Command: {' '.join(cmd)}\n"
f"Exit code: {proc.returncode}\n"
f"stderr:\n{proc.stderr}"
)
return parse_beets_json(proc.stdout)
def parse_beets_json(raw: str) -> list[dict[str, Any]]:
raw = raw.strip()
if not raw:
return []
# Normal JSON array/object.
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
return parsed
if isinstance(parsed, dict):
# Some exporters wrap records.
for key in ("items", "tracks", "data"):
if isinstance(parsed.get(key), list):
return parsed[key]
return [parsed]
except json.JSONDecodeError:
pass
# JSON Lines fallback.
rows: list[dict[str, Any]] = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
return rows
def load_beets_export_file(path: Path) -> list[dict[str, Any]]:
print(f"Reading Beets export from {path}")
return parse_beets_json(path.read_text(encoding="utf-8"))
def beets_track_from_row(row: dict[str, Any]) -> BeetsTrack:
# Beets field names are generally simple, but this keeps things tolerant.
title = str(row.get("title", "") or "")
artist = str(row.get("artist", "") or "")
album = str(row.get("album", "") or "")
albumartist = str(row.get("albumartist", row.get("album_artist", "")) or "")
year = (
str(row.get("year", "") or "")
or str(row.get("original_year", "") or "")
or str(row.get("date", "") or "")
)
path = row.get("path", "")
if isinstance(path, bytes):
path = path.decode("utf-8", errors="replace")
return BeetsTrack(
title=title,
artist=artist,
album=album,
albumartist=albumartist,
isrc=normalize_isrc(row.get("isrc", "")),
mb_trackid=str(row.get("mb_trackid", row.get("mb_track_id", "")) or ""),
mb_albumid=str(row.get("mb_albumid", row.get("mb_album_id", "")) or ""),
track=to_int(row.get("track", row.get("track_number"))),
disc=to_int(row.get("disc", row.get("disc_number"))),
length_seconds=to_float(row.get("length", row.get("duration"))),
year=year,
path=str(path or ""),
format=str(row.get("format", "") or ""),
)
# -----------------------------
# Matching
# -----------------------------
class BeetsMatcher:
def __init__(self, beets_tracks: Iterable[BeetsTrack]) -> None:
self.tracks = list(beets_tracks)
self.by_isrc: dict[str, list[BeetsTrack]] = {}
self.by_album_track: dict[tuple[str, str, int | None, int | None], list[BeetsTrack]] = {}
self.by_title_artist: dict[tuple[str, str], list[BeetsTrack]] = {}
for t in self.tracks:
if t.isrc:
self.by_isrc.setdefault(t.isrc, []).append(t)
album_artist = normalize_text(t.albumartist or t.artist)
album = normalize_text(t.album)
self.by_album_track.setdefault((album_artist, album, t.disc, t.track), []).append(t)
self.by_title_artist.setdefault(
(normalize_text(t.title), normalize_text(t.artist)),
[],
).append(t)
def match(self, s: SpotifyTrack) -> MatchResult:
# 1. ISRC exact match.
if s.isrc and s.isrc in self.by_isrc:
return MatchResult(
spotify=s,
status="present",
reason="exact_isrc",
beets=self.by_isrc[s.isrc][0],
score=1.0,
)
# 2. Album artist + album + disc + track exact-ish metadata.
album_artist_candidates = [
normalize_text(s.album_artist),
normalize_text(s.primary_artist),
]
album_key = normalize_text(s.album)
for album_artist_key in album_artist_candidates:
key = (album_artist_key, album_key, s.disc_number, s.track_number)
candidates = self.by_album_track.get(key, [])
if candidates:
return MatchResult(
spotify=s,
status="present",
reason="album_artist_album_disc_track",
beets=candidates[0],
score=1.0,
)
# 3. Title + primary artist, then check duration if possible.
title_artist_key = (normalize_text(s.title), normalize_text(s.primary_artist))
candidates = self.by_title_artist.get(title_artist_key, [])
if candidates:
spotify_seconds = (s.duration_ms / 1000.0) if s.duration_ms else None
if spotify_seconds is not None:
duration_candidates = [
c for c in candidates
if c.length_seconds is not None
and abs(c.length_seconds - spotify_seconds) <= 4.0
]
if duration_candidates:
return MatchResult(
spotify=s,
status="present",
reason="title_artist_duration_within_4s",
beets=duration_candidates[0],
score=1.0,
)
# If title and artist match exactly after normalization, it is probably present,
# even if duration is missing from Beets export.
return MatchResult(
spotify=s,
status="present",
reason="title_artist_exact_normalized",
beets=candidates[0],
score=0.97,
)
# 4. Fuzzy possible match. Do not treat this as present.
possible = self.best_fuzzy_candidate(s)
if possible is not None:
beets_track, score = possible
return MatchResult(
spotify=s,
status="possible_match",
reason="fuzzy_metadata_match_review_needed",
beets=beets_track,
score=score,
)
return MatchResult(
spotify=s,
status="missing",
reason="no_match_found",
beets=None,
score=None,
)
def best_fuzzy_candidate(self, s: SpotifyTrack) -> tuple[BeetsTrack, float] | None:
s_title = normalize_text(s.title)
s_artist = normalize_text(s.primary_artist)
s_album = normalize_text(s.album)
s_duration = (s.duration_ms / 1000.0) if s.duration_ms else None
best: tuple[BeetsTrack, float] | None = None
# Keep this simple and conservative. We only fuzzy scan candidates where either
# artist or album has some overlap.
for b in self.tracks:
b_title = normalize_text(b.title)
b_artist = normalize_text(b.artist)
b_album = normalize_text(b.album)
if not b_title:
continue
artist_ratio = difflib.SequenceMatcher(None, s_artist, b_artist).ratio()
album_ratio = difflib.SequenceMatcher(None, s_album, b_album).ratio()
if artist_ratio < 0.80 and album_ratio < 0.80:
continue
title_ratio = difflib.SequenceMatcher(None, s_title, b_title).ratio()
duration_penalty = 0.0
if s_duration is not None and b.length_seconds is not None:
diff = abs(s_duration - b.length_seconds)
if diff > 12:
duration_penalty = 0.15
elif diff > 6:
duration_penalty = 0.05
score = (title_ratio * 0.55) + (artist_ratio * 0.30) + (album_ratio * 0.15) - duration_penalty
if best is None or score > best[1]:
best = (b, score)
# Conservative threshold. Below this, it creates too much noise.
if best and best[1] >= 0.91:
return best
return None
# -----------------------------
# Reports
# -----------------------------
def write_csv(path: Path, rows: list[dict[str, Any]], fieldnames: list[str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
print(f"Wrote {path} ({len(rows)} rows)")
def result_base_row(r: MatchResult) -> dict[str, Any]:
s = r.spotify
b = r.beets
return {
"Track Name": s.title,
"Album": s.album,
"Artist": s.artists,
"Primary Artist": s.primary_artist,
"Album Artist": s.album_artist,
"Year/Date": s.release_date or s.release_year,
"Release Year": s.release_year,
"Disc Number": s.disc_number,
"Track Number": s.track_number,
"Spotify ISRC": s.isrc,
"Spotify Track URL": s.spotify_track_url,
"Spotify Album URL": s.album_url,
"Spotify Album ID": s.album_id,
"Spotify Album Type": s.album_type,
"Spotify Album Total Tracks": s.total_tracks,
"Spotify Liked At": s.added_at,
"Match Status": r.status,
"Match Reason": r.reason,
"Match Score": "" if r.score is None else f"{r.score:.3f}",
"Beets Track": b.title if b else "",
"Beets Album": b.album if b else "",
"Beets Artist": b.artist if b else "",
"Beets Album Artist": b.albumartist if b else "",
"Beets Year": b.year if b else "",
"Beets Path": b.path if b else "",
"Beets Format": b.format if b else "",
}
TRACK_FIELDS = [
"Track Name",
"Album",
"Artist",
"Primary Artist",
"Album Artist",
"Year/Date",
"Release Year",
"Disc Number",
"Track Number",
"Spotify ISRC",
"Spotify Track URL",
"Spotify Album URL",
"Spotify Album ID",
"Spotify Album Type",
"Spotify Album Total Tracks",
"Spotify Liked At",
"Match Status",
"Match Reason",
"Match Score",
"Beets Track",
"Beets Album",
"Beets Artist",
"Beets Album Artist",
"Beets Year",
"Beets Path",
"Beets Format",
]
def album_report_rows(results: list[MatchResult]) -> list[dict[str, Any]]:
grouped: dict[str, list[MatchResult]] = {}
for r in results:
key = r.spotify.album_id or f"{r.spotify.album_artist}::{r.spotify.album}"
grouped.setdefault(key, []).append(r)
rows: list[dict[str, Any]] = []
for _, group in grouped.items():
sample = group[0].spotify
liked_count = len(group)
present_count = sum(1 for r in group if r.status == "present")
missing_count = sum(1 for r in group if r.status == "missing")
possible_count = sum(1 for r in group if r.status == "possible_match")
if present_count == liked_count:
status = "all_liked_tracks_present"
elif missing_count == liked_count:
status = "all_liked_tracks_missing"
else:
status = "partial_liked_tracks_present"
missing_tracks = [
r.spotify.title
for r in group
if r.status in {"missing", "possible_match"}
]
# Only output albums that need attention.
if status == "all_liked_tracks_present":
continue
rows.append(
{
"Album": sample.album,
"Album Artist": sample.album_artist,
"Year/Date": sample.release_date or sample.release_year,
"Release Year": sample.release_year,
"Spotify Album Type": sample.album_type,
"Spotify Album Total Tracks": sample.total_tracks,
"Liked Tracks Count": liked_count,
"Present Liked Tracks Count": present_count,
"Missing Liked Tracks Count": missing_count,
"Possible Match Count": possible_count,
"Album Coverage Status": status,
"Missing Or Review Tracks": " | ".join(missing_tracks),
"Spotify Album URL": sample.album_url,
"Spotify Album ID": sample.album_id,
}
)
rows.sort(
key=lambda row: (
row["Album Coverage Status"] != "all_liked_tracks_missing",
str(row["Album Artist"]).casefold(),
str(row["Album"]).casefold(),
)
)
return rows
ALBUM_FIELDS = [
"Album",
"Album Artist",
"Year/Date",
"Release Year",
"Spotify Album Type",
"Spotify Album Total Tracks",
"Liked Tracks Count",
"Present Liked Tracks Count",
"Missing Liked Tracks Count",
"Possible Match Count",
"Album Coverage Status",
"Missing Or Review Tracks",
"Spotify Album URL",
"Spotify Album ID",
]
# -----------------------------
# Main
# -----------------------------
def main() -> int:
parser = argparse.ArgumentParser(
description="Compare Spotify Liked Songs against a Beets library and produce missing-track/album CSV reports."
)
parser.add_argument(
"--output-dir",
default=DEFAULT_OUTPUT_DIR,
help=f"Output directory for CSV reports. Default: {DEFAULT_OUTPUT_DIR}",
)
parser.add_argument(
"--beet-cmd",
default="beet",
help="Path/name of beet command. Default: beet",
)
parser.add_argument(
"--beets-json",
type=Path,
default=None,
help="Use an existing Beets JSON export instead of running beet export.",
)
parser.add_argument(
"--spotify-json",
type=Path,
default=None,
help="Use an existing Spotify liked tracks JSON cache instead of calling Spotify.",
)
parser.add_argument(
"--write-raw-cache",
action="store_true",
help="Also write raw-ish normalized cache JSON files into the output directory.",
)
args = parser.parse_args()
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Load Beets.
if args.beets_json:
beets_rows = load_beets_export_file(args.beets_json)
else:
beets_rows = run_beets_export(args.beet_cmd)
beets_tracks = [beets_track_from_row(row) for row in beets_rows]
print(f"Loaded {len(beets_tracks)} Beets tracks")
# Load Spotify.
if args.spotify_json:
print(f"Reading Spotify liked tracks from {args.spotify_json}")
raw = json.loads(args.spotify_json.read_text(encoding="utf-8"))
spotify_tracks = [SpotifyTrack(**item) for item in raw]
else:
client_id = os.environ.get("SPOTIFY_CLIENT_ID")
client_secret = os.environ.get("SPOTIFY_CLIENT_SECRET")
if not client_id or not client_secret:
print(
"Missing SPOTIFY_CLIENT_ID or SPOTIFY_CLIENT_SECRET environment variables.",
file=sys.stderr,
)
return 2
token = get_spotify_token(client_id, client_secret)
spotify_tracks = fetch_spotify_liked_tracks(token)
print(f"Loaded {len(spotify_tracks)} Spotify liked tracks")
if args.write_raw_cache:
spotify_cache = output_dir / "spotify_liked_tracks_cache.json"
beets_cache = output_dir / "beets_tracks_cache.json"
spotify_cache.write_text(
json.dumps([s.__dict__ for s in spotify_tracks], ensure_ascii=False, indent=2),
encoding="utf-8",
)
beets_cache.write_text(
json.dumps([b.__dict__ for b in beets_tracks], ensure_ascii=False, indent=2),
encoding="utf-8",
)
print(f"Wrote {spotify_cache}")
print(f"Wrote {beets_cache}")
# Match.
matcher = BeetsMatcher(beets_tracks)
results: list[MatchResult] = []
for s in tqdm(spotify_tracks, desc="Matching", unit="track"):
results.append(matcher.match(s))
missing = [result_base_row(r) for r in results if r.status == "missing"]
possible = [result_base_row(r) for r in results if r.status == "possible_match"]
present = [result_base_row(r) for r in results if r.status == "present"]
albums = album_report_rows(results)
write_csv(output_dir / "missing_tracks.csv", missing, TRACK_FIELDS)
write_csv(output_dir / "possible_matches.csv", possible, TRACK_FIELDS)
write_csv(output_dir / "present_tracks.csv", present, TRACK_FIELDS)
write_csv(output_dir / "partial_or_missing_albums.csv", albums, ALBUM_FIELDS)
print()
print("Summary")
print("-------")
print(f"Spotify liked tracks: {len(spotify_tracks)}")
print(f"Beets tracks: {len(beets_tracks)}")
print(f"Present: {len(present)}")
print(f"Missing: {len(missing)}")
print(f"Possible matches: {len(possible)}")
print(f"Albums needing review:{len(albums)}")
print()
print(f"Primary buying list: {output_dir / 'missing_tracks.csv'}")
print(f"Album review list: {output_dir / 'partial_or_missing_albums.csv'}")
print(f"Manual review list: {output_dir / 'possible_matches.csv'}")
return 0
if __name__ == "__main__":
raise SystemExit(main())