Files
beets/beetsplug/deezer.py
Konstantin 2e0475b008 make get_search_response return the number of results
this allows subclassses to return an iterator for results and use
internal logic to determine the number of results without prematurely
exhausting it that case
2026-05-17 18:15:07 +02:00

298 lines
10 KiB
Python

# This file is part of beets.
# Copyright 2019, Rahul Ahuja.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Adds Deezer release and track search support to the autotagger"""
from __future__ import annotations
import collections
import time
from typing import TYPE_CHECKING, ClassVar
import requests
from beets import config, ui
from beets.autotag.hooks import AlbumInfo, TrackInfo
from beets.dbcore import types
from beets.exceptions import UserError
from beets.metadata_plugins import IDResponse, SearchApiMetadataSourcePlugin
VARIOUS_ARTISTS_ID = 5080
if TYPE_CHECKING:
from collections.abc import Sequence
from beets.library import Item, Library
from beets.metadata_plugins import QueryType, SearchParams
from ._typing import JSONDict
class DeezerPlugin(SearchApiMetadataSourcePlugin[IDResponse]):
item_types: ClassVar[dict[str, types.Type]] = {
"deezer_track_rank": types.INTEGER,
"deezer_track_id": types.INTEGER,
"deezer_updated": types.DATE,
}
# Base URLs for the Deezer API
# Documentation: https://developers.deezer.com/api/
search_url = "https://api.deezer.com/search/"
album_url = "https://api.deezer.com/album/"
track_url = "https://api.deezer.com/track/"
def __init__(self) -> None:
super().__init__()
def commands(self):
"""Add beet UI commands to interact with Deezer."""
deezer_update_cmd = ui.Subcommand(
"deezerupdate", help=f"Update {self.data_source} rank"
)
def func(lib: Library, opts, args):
items = lib.items(args)
self.deezerupdate(list(items), ui.should_write())
deezer_update_cmd.func = func
return [deezer_update_cmd]
def album_for_id(self, album_id: str) -> AlbumInfo | None:
"""Fetch an album by its Deezer ID or URL."""
if not (deezer_id := self._extract_id(album_id)):
return None
album_url = f"{self.album_url}{deezer_id}"
if not (album_data := self.fetch_data(album_url)):
return None
contributors = album_data.get("contributors")
if contributors is not None:
artist, artist_id = self.get_artist(contributors)
else:
artist, artist_id = None, None
release_date = album_data["release_date"]
date_parts = [int(part) for part in release_date.split("-")]
num_date_parts = len(date_parts)
if num_date_parts == 3:
year, month, day = date_parts
elif num_date_parts == 2:
year, month = date_parts
day = None
elif num_date_parts == 1:
year = date_parts[0]
month = None
day = None
else:
raise UserError(
f"Invalid `release_date` returned by {self.data_source} API: "
f"{release_date!r}"
)
tracks_obj = self.fetch_data(f"{self.album_url}{deezer_id}/tracks")
if tracks_obj is None:
return None
try:
tracks_data = tracks_obj["data"]
except KeyError:
self._log.debug("Error fetching album tracks for {}", deezer_id)
tracks_data = None
if not tracks_data:
return None
while "next" in tracks_obj:
tracks_obj = requests.get(
tracks_obj["next"],
timeout=10,
).json()
tracks_data.extend(tracks_obj["data"])
tracks = []
medium_totals: dict[int | None, int] = collections.defaultdict(int)
for i, track_data in enumerate(tracks_data, start=1):
track = self._get_track(track_data)
track.index = i
medium_totals[track.medium] += 1
tracks.append(track)
for track in tracks:
track.medium_total = medium_totals[track.medium]
is_va = str(album_data["artist"]["id"]) == str(VARIOUS_ARTISTS_ID)
if is_va:
va_name = config["va_name"].as_str()
artist = va_name
return AlbumInfo(
album=album_data["title"],
album_id=deezer_id,
deezer_album_id=deezer_id,
artist=artist,
artist_credit=(
artist if is_va else self.get_artist([album_data["artist"]])[0]
),
artist_id=str(artist_id),
tracks=tracks,
albumtype=album_data["record_type"],
va=is_va,
year=year,
month=month,
day=day,
label=album_data["label"],
mediums=max(filter(None, medium_totals.keys())),
data_source=self.data_source,
data_url=album_data["link"],
cover_art_url=album_data.get("cover_xl"),
)
def track_for_id(self, track_id: str) -> None | TrackInfo:
"""Fetch a track by its Deezer ID or URL and return a
TrackInfo object or None if the track is not found.
:param track_id: (Optional) Deezer ID or URL for the track. Either
``track_id`` or ``track_data`` must be provided.
"""
if not (deezer_id := self._extract_id(track_id)):
self._log.debug("Invalid Deezer track_id: {}", track_id)
return None
if not (track_data := self.fetch_data(f"{self.track_url}{deezer_id}")):
self._log.debug("Track not found: {}", track_id)
return None
track = self._get_track(track_data)
# Get album's tracks to set `track.index` (position on the entire
# release) and `track.medium_total` (total number of tracks on
# the track's disc).
if not (
album_tracks_obj := self.fetch_data(
f"{self.album_url}{track_data['album']['id']}/tracks"
)
):
return None
try:
album_tracks_data = album_tracks_obj["data"]
except KeyError:
self._log.debug(
"Error fetching album tracks for {}", track_data["album"]["id"]
)
return None
medium_total = 0
for i, track_data in enumerate(album_tracks_data, start=1):
if track_data["disk_number"] == track.medium:
medium_total += 1
if track_data["id"] == track.track_id:
track.index = i
track.medium_total = medium_total
return track
def _get_track(self, track_data: JSONDict) -> TrackInfo:
"""Convert a Deezer track object dict to a TrackInfo object.
:param track_data: Deezer Track object dict
"""
artist, artist_id = self.get_artist(
track_data.get("contributors", [track_data["artist"]])
)
return TrackInfo(
title=track_data["title"],
track_id=track_data["id"],
deezer_track_id=track_data["id"],
isrc=track_data.get("isrc"),
artist=artist,
artist_id=str(artist_id),
length=track_data["duration"],
index=track_data.get("track_position"),
medium=track_data.get("disk_number"),
deezer_track_rank=track_data.get("rank"),
medium_index=track_data.get("track_position"),
data_source=self.data_source,
data_url=track_data["link"],
deezer_updated=time.time(),
)
def get_search_query_with_filters(
self,
query_type: QueryType,
items: Sequence[Item],
artist: str,
name: str,
va_likely: bool,
) -> tuple[str, dict[str, str]]:
query = f'album:"{name}"' if query_type == "album" else name
if query_type == "track" or not va_likely:
query += f' artist:"{artist}"'
return query, {}
def get_search_response(
self, params: SearchParams
) -> tuple[int, list[IDResponse]]:
"""Search Deezer and return the raw result payload entries."""
response = requests.get(
f"{self.search_url}{params.query_type}",
params={
**params.filters,
"q": params.query,
"limit": str(params.limit),
},
timeout=10,
)
response.raise_for_status()
json = response.json()["data"]
return len(json), json
def deezerupdate(self, items: Sequence[Item], write: bool):
"""Obtain rank information from Deezer."""
for index, item in enumerate(items, start=1):
self._log.info(
"Processing {}/{} tracks - {} ", index, len(items), item
)
try:
deezer_track_id = item.deezer_track_id
except AttributeError:
self._log.debug("No deezer_track_id present for: {}", item)
continue
try:
rank = self.fetch_data(
f"{self.track_url}{deezer_track_id}"
).get("rank")
self._log.debug(
"Deezer track: {} has {} rank", deezer_track_id, rank
)
except Exception as e:
self._log.debug("Invalid Deezer track_id: {}", e)
continue
item.deezer_track_rank = int(rank)
item.store()
item.deezer_updated = time.time()
if write:
item.try_write()
def fetch_data(self, url: str):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
except requests.exceptions.RequestException as e:
self._log.error("Error fetching data from {}\n Error: {}", url, e)
return None
if "error" in data:
self._log.debug("Deezer API error: {}", data["error"]["message"])
return None
return data