Files
beets/beetsplug/tidal/__init__.py
Šarūnas Nejus 30ab81eb5b Extract UserError from beets.ui to a new beets.exceptions module.
Update all references in core, plugins, and tests to import UserError
from the new location. This centralizes exception handling and improves
code organization.
2026-05-16 11:18:51 +01:00

502 lines
16 KiB
Python

from __future__ import annotations
import itertools
import os
import re
from functools import cached_property
from typing import TYPE_CHECKING, overload
import confuse
from beets import ui
from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.exceptions import UserError
from beets.logging import getLogger
from beets.metadata_plugins import MetadataSourcePlugin
from .api import TidalAPI
if TYPE_CHECKING:
import optparse
from collections.abc import Iterable, Sequence
from beets.library.models import Item, Library
from .api_types import (
AlbumAttributes,
ResourceIdentifier,
TidalAlbum,
TidalArtist,
TidalTrack,
TrackAttributes,
)
log = getLogger("beets.tidal")
class TidalPlugin(MetadataSourcePlugin):
def __init__(self) -> None:
super().__init__()
self.config.add(
{
"client_id": "mcjmpl1bPATJXcBT",
"tokenfile": "tidal_token.json",
}
)
self.config["client_id"].redact = True
# We need to be authenticated if plugin is used to fetch metadata
# otherwise the import cannot run.
self.register_listener("import_begin", self.require_authentication)
@cached_property
def api(self) -> TidalAPI:
return TidalAPI(
client_id=self.config["client_id"].as_str(),
token_path=self._tokenfile(),
)
def _tokenfile(self) -> str:
"""Return the configured path to the token file in the app directory."""
return self.config["tokenfile"].get(confuse.Filename(in_app_dir=True))
def require_authentication(self):
if not os.path.isfile(self._tokenfile()):
raise UserError(
"Please login to TIDAL"
" using `beet tidal --auth` or disable tidal plugin"
)
def commands(self) -> list[ui.Subcommand]:
tidal_cmd = ui.Subcommand(
"tidal", help="Tidal metadata plugin commands"
)
tidal_cmd.parser.add_option(
"-a",
"--auth",
action="store_true",
help="Authenticate and login to Tidal",
default=False,
)
def func(lib: Library, opts: optparse.Values, args: list[str]):
if opts.auth:
self.api.ui_authenticate_flow()
else:
tidal_cmd.print_help()
tidal_cmd.func = func
return [tidal_cmd]
def album_for_id(self, album_id: str) -> AlbumInfo | None:
if not (tidal_id := self._extract_id(album_id)):
return None
if album := list(self.search_albums_by_ids(tidal_ids=[tidal_id])):
return album[0]
log.warning("Could not find album:{0}", tidal_id)
return None
def albums_for_ids(self, ids: Iterable[str]) -> Iterable[AlbumInfo | None]:
yield from self.search_albums_by_ids(ids=ids)
def track_for_id(self, track_id: str) -> TrackInfo | None:
if not (tidal_id := self._extract_id(track_id)):
return None
if track := list(self.search_tracks_by_ids(tidal_ids=[tidal_id])):
return track[0]
log.warning("Could not find track:{0}", tidal_id)
return None
def tracks_for_ids(self, ids: Iterable[str]) -> Iterable[TrackInfo | None]:
yield from self.search_tracks_by_ids(ids=ids)
def candidates(
self, items: Sequence[Item], artist: str, album: str, va_likely: bool
) -> Iterable[AlbumInfo]:
candidates: list[AlbumInfo] = []
# Tidal allows to lookup via isrc and barcode (nice!)
# We just return early here as a lookup via isrc should
# return a 100% match
barcodes: list[str] = list(
filter(None, set(i.get("barcode") for i in items))
)
if barcodes and (
candidates := list(
filter(None, self.search_albums_by_ids(barcode_ids=barcodes)),
)
):
return candidates
for query in self._album_queries(items):
candidates += self.search_albums_by_query(query)
log.debug("Found {0} candidates", len(candidates))
return candidates
def item_candidates(
self, item: Item, artist: str, title: str
) -> Iterable[TrackInfo]:
candidates: list[TrackInfo] = []
# Tidal allows to lookup via isrc and barcode (nice!)
# We just return early here as a lookup via isrc should
# return a 100% match
if isrc := item.get("isrc"):
if candidates := list(
filter(None, self.search_tracks_by_ids(isrcs=[isrc]))
):
return candidates
for query in self._item_queries(item):
candidates += self.search_tracks_by_query(query)
log.debug("Found {0} candidates", len(candidates))
return candidates
@staticmethod
def _item_queries(item: Item) -> Iterable[str]:
"""Search queries for items."""
yield item.title
if item.artist:
yield f"{item.artist} {item.title}"
@staticmethod
def _album_queries(items: Sequence[Item]) -> Iterable[str]:
"""Search queries for albums."""
album_names = set(i.album for i in items)
artist_names = set(i.artist for i in items)
for album, artist in itertools.product(album_names, artist_names):
yield f"{artist} {album}"
def search_tracks_by_query(self, query: str) -> Iterable[TrackInfo]:
"""Search for tracks given a string query."""
search_doc = self.api.search_results(
query,
include=["tracks.artists"],
)
track_by_id: dict[str, TidalTrack] = {
item["id"]: item
for item in search_doc.get("included", [])
if item["type"] == "tracks"
}
artist_by_id: dict[str, TidalArtist] = {
item["id"]: item
for item in search_doc.get("included", [])
if item["type"] == "artists"
}
for track_rel in search_doc["data"]["relationships"]["tracks"]["data"]:
if track := track_by_id.get(track_rel["id"]):
yield self._get_track_info(track, artist_by_id=artist_by_id)
else:
log.warning(
"Track with id {0} not found in lookup",
track_rel["id"],
)
def search_albums_by_query(self, query: str) -> Iterable[AlbumInfo]:
"""Search for album given a string query."""
search_doc = self.api.search_results(
query,
include=["albums"],
# include="albums.items.artists" <- not supported
# This is a bit inconvenient, but we fetch the items and artists
# for all albums separately.
)
album_ids = [
album_rel["id"]
for album_rel in search_doc["data"]["relationships"]["albums"][
"data"
]
]
yield from filter(None, self.search_albums_by_ids(tidal_ids=album_ids))
@overload
def search_tracks_by_ids(
self, *, ids: Iterable[str]
) -> Iterable[TrackInfo | None]: ...
@overload
def search_tracks_by_ids(
self, *, tidal_ids: Iterable[str]
) -> Iterable[TrackInfo | None]: ...
@overload
def search_tracks_by_ids(
self, *, isrcs: Iterable[str]
) -> Iterable[TrackInfo | None]: ...
def search_tracks_by_ids(
self,
ids: Iterable[str] | None = None,
tidal_ids: Iterable[str] | None = None,
isrcs: Iterable[str] | None = None,
) -> Iterable[TrackInfo | None]:
_ids: list[str | None] = list(tidal_ids or [])
isrcs = list(isrcs or [])
if ids:
_ids = list(map(self._extract_id, ids))
tracks_doc = self.api.get_tracks(
ids=list(filter(None, _ids)),
isrcs=isrcs,
include=["artists"],
)
track_by_id: dict[str, TidalTrack] = {
item["id"]: item
for item in tracks_doc.get("data", [])
if item["type"] == "tracks"
}
artist_by_id: dict[str, TidalArtist] = {
item["id"]: item
for item in tracks_doc.get("included", [])
if item["type"] == "artists"
}
for _id in _ids:
if _id is not None and (track := track_by_id.get(_id)):
yield self._get_track_info(track, artist_by_id=artist_by_id)
else:
yield None
if isrcs:
isrc_to_track: dict[str, TidalTrack] = {
t["attributes"]["isrc"]: t for t in track_by_id.values()
}
for isrc in isrcs:
if track := isrc_to_track.get(isrc):
yield self._get_track_info(track, artist_by_id=artist_by_id)
else:
yield None
@overload
def search_albums_by_ids(
self, *, ids: Iterable[str]
) -> Iterable[AlbumInfo | None]: ...
@overload
def search_albums_by_ids(
self, *, tidal_ids: Iterable[str]
) -> Iterable[AlbumInfo | None]: ...
@overload
def search_albums_by_ids(
self, *, barcode_ids: Iterable[str]
) -> Iterable[AlbumInfo | None]: ...
def search_albums_by_ids(
self,
ids: Iterable[str] | None = None,
tidal_ids: Iterable[str] | None = None,
barcode_ids: Iterable[str] | None = None,
) -> Iterable[AlbumInfo | None]:
_ids: list[str | None] = list(tidal_ids or [])
barcode_ids = list(barcode_ids or [])
if ids:
_ids = list(map(self._extract_id, ids))
albums_doc = self.api.get_albums(
ids=list(filter(None, _ids)),
barcode_ids=barcode_ids,
include=["items.artists", "artists"],
)
album_by_id: dict[str, TidalAlbum] = {
item["id"]: item
for item in albums_doc.get("data", [])
if item["type"] == "albums"
}
track_by_id: dict[str, TidalTrack] = {
item["id"]: item
for item in albums_doc.get("included", [])
if item["type"] == "tracks"
}
artist_by_id: dict[str, TidalArtist] = {
item["id"]: item
for item in albums_doc.get("included", [])
if item["type"] == "artists"
}
for _id in _ids:
if _id is not None and (album := album_by_id.get(_id)):
yield self._get_album_info(
album,
track_by_id=track_by_id,
artist_by_id=artist_by_id,
)
else:
yield None
if barcode_ids:
barcode_to_album: dict[str, TidalAlbum] = {
a["attributes"]["barcodeId"]: a for a in album_by_id.values()
}
for barcode in barcode_ids:
if album := barcode_to_album.get(barcode):
yield self._get_album_info(
album,
track_by_id=track_by_id,
artist_by_id=artist_by_id,
)
else:
yield None
def _get_album_info(
self,
album: TidalAlbum,
track_by_id: dict[str, TidalTrack],
artist_by_id: dict[str, TidalArtist],
) -> AlbumInfo:
track_infos: list[TrackInfo] = []
for i, track_rel in enumerate(
album["relationships"]["items"]["data"], start=1
):
if track := track_by_id.get(track_rel["id"]):
track_info = self._get_track_info(track, artist_by_id)
track_info.index = i
track_infos.append(track_info)
artist_names, artist_ids = self._parse_artists(
album["relationships"]["artists"]["data"],
artist_by_id,
)
date_parts = self._parse_release_date(album["attributes"])
return AlbumInfo(
# Identifier
data_source=self.data_source,
album_id=album["id"],
artists_ids=artist_ids,
data_url=self._parse_data_url(album["attributes"]),
barcode=album["attributes"]["barcodeId"],
# Meta
album=self._parse_title(album["attributes"]),
tracks=track_infos,
artist=", ".join(artist_names),
artists=artist_names,
duration=self._duration_to_seconds(album["attributes"]["duration"]),
albumtype=album["attributes"]["albumType"],
label=self._parse_label(album["attributes"]),
year=date_parts[0] if date_parts else None,
month=date_parts[1] if date_parts else None,
day=date_parts[2] if date_parts else None,
)
def _get_track_info(
self,
track: TidalTrack,
artist_by_id: dict[str, TidalArtist],
) -> TrackInfo:
artist_names, artist_ids = self._parse_artists(
track["relationships"]["artists"]["data"],
artist_by_id,
)
return TrackInfo(
# Identifier
data_source=self.data_source,
track_id=track["id"],
artists_ids=artist_ids,
data_url=self._parse_data_url(track["attributes"]),
# Meta
title=self._parse_title(track["attributes"]),
isrc=track["attributes"]["isrc"],
artist=", ".join(artist_names),
artists=artist_names,
duration=self._duration_to_seconds(track["attributes"]["duration"]),
label=self._parse_label(track["attributes"]),
)
@staticmethod
def _parse_artists(
artist_relationships: list[ResourceIdentifier],
artist_by_id: dict[str, TidalArtist],
) -> tuple[list[str], list[str]]:
"""Extract artists from a relationship.
Artists are sorted in the track/album response relationship but not in the
track/album responses included items.
"""
artist_names = []
artist_ids = []
for artist_rel in artist_relationships:
if artist := artist_by_id.get(artist_rel["id"]):
artist_ids.append(artist["id"])
artist_names.append(artist["attributes"]["name"])
else:
log.warning(
"Artist with id {0} not found in lookup",
artist_rel["id"],
)
return artist_names, artist_ids
@staticmethod
def _parse_title(attributes: AlbumAttributes | TrackAttributes):
"""
Tidal UIs append the version string at the end of the title. We do the same here
by formatting it as ``"{title} ({version})"`` to stay consistent.
"""
if version := attributes.get("version"):
return f"{attributes['title']} ({version})"
else:
return attributes["title"]
@staticmethod
def _parse_data_url(
attributes: AlbumAttributes | TrackAttributes,
) -> str | None:
if external_links := attributes.get("externalLinks"):
return external_links[0].get("href")
return None
@staticmethod
def _duration_to_seconds(duration: str) -> int | None:
"""Convert ISO 8601 duration to seconds. E.g. 'PT15M2S' -> 902."""
match = ISO_8601_RE.match(duration)
if not match:
log.warning("Invalid ISO 8601 duration: {0}", duration)
return None
parts = {k: int(v) if v else 0 for k, v in match.groupdict().items()}
return parts["seconds"] + parts["minutes"] * 60 + parts["hours"] * 3600
@staticmethod
def _parse_label(
attributes: AlbumAttributes | TrackAttributes,
) -> str | None:
if copyright := attributes.get("copyright"):
return copyright["text"]
return None
@staticmethod
def _parse_release_date(
attributes: AlbumAttributes,
) -> tuple[int, int, int] | None:
"""Returns year, month, day from iso YYYY-MM-DD"""
if (
(release_date := attributes.get("releaseDate"))
and (parts := release_date.split("-"))
and len(parts) == 3
):
return int(parts[0]), int(parts[1]), int(parts[2])
return None
ISO_8601_RE = re.compile(
r"^P"
r"T"
r"(?:(?P<hours>\d+)H)?"
r"(?:(?P<minutes>\d+)M)?"
r"(?:(?P<seconds>\d+)S)?$"
)