mirror of
https://github.com/beetbox/beets.git
synced 2026-05-16 13:41:01 -04:00
Update all references in core, plugins, and tests to import UserError from the new location. This centralizes exception handling and improves code organization.
295 lines
10 KiB
Python
295 lines
10 KiB
Python
# This file is part of beets.
|
|
# Copyright 2019, Rahul Ahuja.
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining
|
|
# a copy of this software and associated documentation files (the
|
|
# "Software"), to deal in the Software without restriction, including
|
|
# without limitation the rights to use, copy, modify, merge, publish,
|
|
# distribute, sublicense, and/or sell copies of the Software, and to
|
|
# permit persons to whom the Software is furnished to do so, subject to
|
|
# the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be
|
|
# included in all copies or substantial portions of the Software.
|
|
|
|
"""Adds Deezer release and track search support to the autotagger"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import collections
|
|
import time
|
|
from typing import TYPE_CHECKING, ClassVar
|
|
|
|
import requests
|
|
|
|
from beets import config, ui
|
|
from beets.autotag.hooks import AlbumInfo, TrackInfo
|
|
from beets.dbcore import types
|
|
from beets.exceptions import UserError
|
|
from beets.metadata_plugins import IDResponse, SearchApiMetadataSourcePlugin
|
|
|
|
VARIOUS_ARTISTS_ID = 5080
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Sequence
|
|
|
|
from beets.library import Item, Library
|
|
from beets.metadata_plugins import QueryType, SearchParams
|
|
|
|
from ._typing import JSONDict
|
|
|
|
|
|
class DeezerPlugin(SearchApiMetadataSourcePlugin[IDResponse]):
|
|
item_types: ClassVar[dict[str, types.Type]] = {
|
|
"deezer_track_rank": types.INTEGER,
|
|
"deezer_track_id": types.INTEGER,
|
|
"deezer_updated": types.DATE,
|
|
}
|
|
# Base URLs for the Deezer API
|
|
# Documentation: https://developers.deezer.com/api/
|
|
search_url = "https://api.deezer.com/search/"
|
|
album_url = "https://api.deezer.com/album/"
|
|
track_url = "https://api.deezer.com/track/"
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
def commands(self):
|
|
"""Add beet UI commands to interact with Deezer."""
|
|
deezer_update_cmd = ui.Subcommand(
|
|
"deezerupdate", help=f"Update {self.data_source} rank"
|
|
)
|
|
|
|
def func(lib: Library, opts, args):
|
|
items = lib.items(args)
|
|
self.deezerupdate(list(items), ui.should_write())
|
|
|
|
deezer_update_cmd.func = func
|
|
|
|
return [deezer_update_cmd]
|
|
|
|
def album_for_id(self, album_id: str) -> AlbumInfo | None:
|
|
"""Fetch an album by its Deezer ID or URL."""
|
|
if not (deezer_id := self._extract_id(album_id)):
|
|
return None
|
|
|
|
album_url = f"{self.album_url}{deezer_id}"
|
|
if not (album_data := self.fetch_data(album_url)):
|
|
return None
|
|
|
|
contributors = album_data.get("contributors")
|
|
if contributors is not None:
|
|
artist, artist_id = self.get_artist(contributors)
|
|
else:
|
|
artist, artist_id = None, None
|
|
|
|
release_date = album_data["release_date"]
|
|
date_parts = [int(part) for part in release_date.split("-")]
|
|
num_date_parts = len(date_parts)
|
|
|
|
if num_date_parts == 3:
|
|
year, month, day = date_parts
|
|
elif num_date_parts == 2:
|
|
year, month = date_parts
|
|
day = None
|
|
elif num_date_parts == 1:
|
|
year = date_parts[0]
|
|
month = None
|
|
day = None
|
|
else:
|
|
raise UserError(
|
|
f"Invalid `release_date` returned by {self.data_source} API: "
|
|
f"{release_date!r}"
|
|
)
|
|
tracks_obj = self.fetch_data(f"{self.album_url}{deezer_id}/tracks")
|
|
if tracks_obj is None:
|
|
return None
|
|
try:
|
|
tracks_data = tracks_obj["data"]
|
|
except KeyError:
|
|
self._log.debug("Error fetching album tracks for {}", deezer_id)
|
|
tracks_data = None
|
|
if not tracks_data:
|
|
return None
|
|
while "next" in tracks_obj:
|
|
tracks_obj = requests.get(
|
|
tracks_obj["next"],
|
|
timeout=10,
|
|
).json()
|
|
tracks_data.extend(tracks_obj["data"])
|
|
|
|
tracks = []
|
|
medium_totals: dict[int | None, int] = collections.defaultdict(int)
|
|
for i, track_data in enumerate(tracks_data, start=1):
|
|
track = self._get_track(track_data)
|
|
track.index = i
|
|
medium_totals[track.medium] += 1
|
|
tracks.append(track)
|
|
for track in tracks:
|
|
track.medium_total = medium_totals[track.medium]
|
|
|
|
is_va = str(album_data["artist"]["id"]) == str(VARIOUS_ARTISTS_ID)
|
|
if is_va:
|
|
va_name = config["va_name"].as_str()
|
|
artist = va_name
|
|
|
|
return AlbumInfo(
|
|
album=album_data["title"],
|
|
album_id=deezer_id,
|
|
deezer_album_id=deezer_id,
|
|
artist=artist,
|
|
artist_credit=(
|
|
artist if is_va else self.get_artist([album_data["artist"]])[0]
|
|
),
|
|
artist_id=str(artist_id),
|
|
tracks=tracks,
|
|
albumtype=album_data["record_type"],
|
|
va=is_va,
|
|
year=year,
|
|
month=month,
|
|
day=day,
|
|
label=album_data["label"],
|
|
mediums=max(filter(None, medium_totals.keys())),
|
|
data_source=self.data_source,
|
|
data_url=album_data["link"],
|
|
cover_art_url=album_data.get("cover_xl"),
|
|
)
|
|
|
|
def track_for_id(self, track_id: str) -> None | TrackInfo:
|
|
"""Fetch a track by its Deezer ID or URL and return a
|
|
TrackInfo object or None if the track is not found.
|
|
|
|
:param track_id: (Optional) Deezer ID or URL for the track. Either
|
|
``track_id`` or ``track_data`` must be provided.
|
|
|
|
"""
|
|
if not (deezer_id := self._extract_id(track_id)):
|
|
self._log.debug("Invalid Deezer track_id: {}", track_id)
|
|
return None
|
|
|
|
if not (track_data := self.fetch_data(f"{self.track_url}{deezer_id}")):
|
|
self._log.debug("Track not found: {}", track_id)
|
|
return None
|
|
|
|
track = self._get_track(track_data)
|
|
|
|
# Get album's tracks to set `track.index` (position on the entire
|
|
# release) and `track.medium_total` (total number of tracks on
|
|
# the track's disc).
|
|
if not (
|
|
album_tracks_obj := self.fetch_data(
|
|
f"{self.album_url}{track_data['album']['id']}/tracks"
|
|
)
|
|
):
|
|
return None
|
|
|
|
try:
|
|
album_tracks_data = album_tracks_obj["data"]
|
|
except KeyError:
|
|
self._log.debug(
|
|
"Error fetching album tracks for {}", track_data["album"]["id"]
|
|
)
|
|
return None
|
|
medium_total = 0
|
|
for i, track_data in enumerate(album_tracks_data, start=1):
|
|
if track_data["disk_number"] == track.medium:
|
|
medium_total += 1
|
|
if track_data["id"] == track.track_id:
|
|
track.index = i
|
|
track.medium_total = medium_total
|
|
return track
|
|
|
|
def _get_track(self, track_data: JSONDict) -> TrackInfo:
|
|
"""Convert a Deezer track object dict to a TrackInfo object.
|
|
|
|
:param track_data: Deezer Track object dict
|
|
"""
|
|
artist, artist_id = self.get_artist(
|
|
track_data.get("contributors", [track_data["artist"]])
|
|
)
|
|
return TrackInfo(
|
|
title=track_data["title"],
|
|
track_id=track_data["id"],
|
|
deezer_track_id=track_data["id"],
|
|
isrc=track_data.get("isrc"),
|
|
artist=artist,
|
|
artist_id=str(artist_id),
|
|
length=track_data["duration"],
|
|
index=track_data.get("track_position"),
|
|
medium=track_data.get("disk_number"),
|
|
deezer_track_rank=track_data.get("rank"),
|
|
medium_index=track_data.get("track_position"),
|
|
data_source=self.data_source,
|
|
data_url=track_data["link"],
|
|
deezer_updated=time.time(),
|
|
)
|
|
|
|
def get_search_query_with_filters(
|
|
self,
|
|
query_type: QueryType,
|
|
items: Sequence[Item],
|
|
artist: str,
|
|
name: str,
|
|
va_likely: bool,
|
|
) -> tuple[str, dict[str, str]]:
|
|
query = f'album:"{name}"' if query_type == "album" else name
|
|
if query_type == "track" or not va_likely:
|
|
query += f' artist:"{artist}"'
|
|
|
|
return query, {}
|
|
|
|
def get_search_response(self, params: SearchParams) -> list[IDResponse]:
|
|
"""Search Deezer and return the raw result payload entries."""
|
|
|
|
response = requests.get(
|
|
f"{self.search_url}{params.query_type}",
|
|
params={
|
|
**params.filters,
|
|
"q": params.query,
|
|
"limit": str(params.limit),
|
|
},
|
|
timeout=10,
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()["data"]
|
|
|
|
def deezerupdate(self, items: Sequence[Item], write: bool):
|
|
"""Obtain rank information from Deezer."""
|
|
for index, item in enumerate(items, start=1):
|
|
self._log.info(
|
|
"Processing {}/{} tracks - {} ", index, len(items), item
|
|
)
|
|
try:
|
|
deezer_track_id = item.deezer_track_id
|
|
except AttributeError:
|
|
self._log.debug("No deezer_track_id present for: {}", item)
|
|
continue
|
|
try:
|
|
rank = self.fetch_data(
|
|
f"{self.track_url}{deezer_track_id}"
|
|
).get("rank")
|
|
self._log.debug(
|
|
"Deezer track: {} has {} rank", deezer_track_id, rank
|
|
)
|
|
except Exception as e:
|
|
self._log.debug("Invalid Deezer track_id: {}", e)
|
|
continue
|
|
item.deezer_track_rank = int(rank)
|
|
item.store()
|
|
item.deezer_updated = time.time()
|
|
if write:
|
|
item.try_write()
|
|
|
|
def fetch_data(self, url: str):
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
self._log.error("Error fetching data from {}\n Error: {}", url, e)
|
|
return None
|
|
if "error" in data:
|
|
self._log.debug("Deezer API error: {}", data["error"]["message"])
|
|
return None
|
|
return data
|