mirror of
https://github.com/Dvorinka/SpotifyRecAlg.git
synced 2026-06-03 20:13:03 +00:00
384 lines
11 KiB
Python
384 lines
11 KiB
Python
import json
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
from unidecode import unidecode
|
|
|
|
from swingmusic.db.userdata import PluginTable
|
|
from swingmusic.plugins import Plugin, plugin_method
|
|
from swingmusic.settings import Paths
|
|
|
|
|
|
class LRCProvider:
|
|
"""Base class for synced (LRC format) lyrics providers."""
|
|
|
|
session = requests.Session()
|
|
|
|
def __init__(self) -> None:
|
|
self.session.headers.update(
|
|
{
|
|
"user-agent": (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/122.0.0.0 Safari/537.36"
|
|
)
|
|
}
|
|
)
|
|
|
|
def get_lrc_by_id(self, track_id: str) -> str | None:
|
|
raise NotImplementedError
|
|
|
|
def get_lrc(self, title: str, artist: str, album: str = "") -> list[dict]:
|
|
raise NotImplementedError
|
|
|
|
|
|
class LRCLibProvider(LRCProvider):
|
|
"""LRCLIB-first provider (SpotiFLAC-style exact->fallback search strategy)."""
|
|
|
|
ROOT_URL = "https://lrclib.net/api"
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self._by_id_cache: dict[str, str] = {}
|
|
|
|
def _get_json(self, endpoint: str, params: dict) -> dict | list | None:
|
|
try:
|
|
response = self.session.get(
|
|
f"{self.ROOT_URL}/{endpoint}",
|
|
params=params,
|
|
timeout=10,
|
|
)
|
|
if not response.ok:
|
|
return None
|
|
return response.json()
|
|
except Exception:
|
|
return None
|
|
|
|
def _entry_to_lrc(self, entry: dict) -> str | None:
|
|
synced = (entry.get("syncedLyrics") or "").strip()
|
|
plain = (entry.get("plainLyrics") or "").strip()
|
|
|
|
if synced:
|
|
return synced
|
|
if plain:
|
|
return plain
|
|
return None
|
|
|
|
def _to_result(self, entry: dict) -> dict | None:
|
|
lrc = self._entry_to_lrc(entry)
|
|
if not lrc:
|
|
return None
|
|
|
|
track_id = str(entry.get("id") or "")
|
|
provider_track_id = f"lrclib:{track_id}" if track_id else f"lrclib:{hash(lrc)}"
|
|
self._by_id_cache[provider_track_id] = lrc
|
|
|
|
return {
|
|
"track_id": provider_track_id,
|
|
"title": entry.get("trackName", ""),
|
|
"artist": entry.get("artistName", ""),
|
|
"album": entry.get("albumName", ""),
|
|
"image": None,
|
|
"provider": "lrclib",
|
|
"lrc": lrc,
|
|
}
|
|
|
|
def get_lrc_by_id(self, track_id: str) -> str | None:
|
|
return self._by_id_cache.get(track_id)
|
|
|
|
def get_lrc(self, title: str, artist: str, album: str = "") -> list[dict]:
|
|
if not title or not artist:
|
|
return []
|
|
|
|
results: list[dict] = []
|
|
|
|
# 1) Exact lookup including album when available.
|
|
if album:
|
|
exact_with_album = self._get_json(
|
|
"get",
|
|
{
|
|
"artist_name": artist,
|
|
"track_name": title,
|
|
"album_name": album,
|
|
},
|
|
)
|
|
if isinstance(exact_with_album, dict):
|
|
result = self._to_result(exact_with_album)
|
|
if result:
|
|
results.append(result)
|
|
|
|
# 2) Exact lookup without album.
|
|
if not results:
|
|
exact = self._get_json(
|
|
"get",
|
|
{
|
|
"artist_name": artist,
|
|
"track_name": title,
|
|
},
|
|
)
|
|
if isinstance(exact, dict):
|
|
result = self._to_result(exact)
|
|
if result:
|
|
results.append(result)
|
|
|
|
# 3) Search fallback.
|
|
if not results:
|
|
search_data = self._get_json(
|
|
"search",
|
|
{
|
|
"artist_name": artist,
|
|
"track_name": title,
|
|
},
|
|
)
|
|
if isinstance(search_data, list):
|
|
for entry in search_data:
|
|
result = self._to_result(entry)
|
|
if result:
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
|
|
class MusixmatchProvider(LRCProvider):
|
|
"""Musixmatch provider class."""
|
|
|
|
ROOT_URL = "https://apic-desktop.musixmatch.com/ws/1.1/"
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.token = None
|
|
self.session.headers.update(
|
|
{
|
|
"authority": "apic-desktop.musixmatch.com",
|
|
"cookie": "AWSELBCORS=0; AWSELB=0",
|
|
}
|
|
)
|
|
|
|
def _get(self, action: str, query: list[tuple]):
|
|
if action != "token.get" and self.token is None:
|
|
self._get_token()
|
|
|
|
query.append(("app_id", "web-desktop-app-v1.0"))
|
|
if self.token is not None:
|
|
query.append(("usertoken", self.token))
|
|
|
|
t = str(int(time.time() * 1000))
|
|
query.append(("t", t))
|
|
|
|
try:
|
|
url = self.ROOT_URL + action
|
|
except TypeError:
|
|
return None
|
|
|
|
try:
|
|
response = self.session.get(url, params=query, timeout=10)
|
|
except Exception:
|
|
return None
|
|
|
|
if response is not None and response.ok:
|
|
return response
|
|
|
|
return None
|
|
|
|
def _get_token(self):
|
|
plugin_path = Paths().lyrics_plugins_path
|
|
token_path = plugin_path / "token.json"
|
|
|
|
current_time = int(time.time())
|
|
|
|
if token_path.exists():
|
|
with token_path.open(mode="r", encoding="utf-8") as token_file:
|
|
cached_token_data: dict = json.load(token_file)
|
|
|
|
cached_token = cached_token_data.get("token")
|
|
expiration_time = cached_token_data.get("expiration_time")
|
|
|
|
if cached_token and expiration_time and current_time < expiration_time:
|
|
self.token = cached_token
|
|
return
|
|
|
|
res = self._get("token.get", [("user_language", "en")])
|
|
|
|
if res is None:
|
|
return
|
|
|
|
res = res.json()
|
|
if res["message"]["header"]["status_code"] == 401:
|
|
time.sleep(13)
|
|
return self._get_token()
|
|
|
|
new_token = res["message"]["body"]["user_token"]
|
|
expiration_time = current_time + 600
|
|
|
|
self.token = new_token
|
|
token_data = {"token": new_token, "expiration_time": expiration_time}
|
|
|
|
plugin_path.mkdir(parents=True, exist_ok=True)
|
|
with token_path.open("w", encoding="utf-8") as token_file:
|
|
json.dump(token_data, token_file)
|
|
|
|
def get_lrc_by_id(self, track_id: str) -> str | None:
|
|
res = self._get(
|
|
"track.subtitle.get",
|
|
[("track_id", track_id), ("subtitle_format", "lrc")],
|
|
)
|
|
|
|
try:
|
|
res = res.json()
|
|
body = res["message"]["body"]
|
|
except AttributeError:
|
|
return None
|
|
|
|
if not body:
|
|
return None
|
|
|
|
return body["subtitle"]["subtitle_body"]
|
|
|
|
def get_lrc(self, title: str, artist: str, album: str = "") -> list[dict]:
|
|
res = self._get(
|
|
"track.search",
|
|
[
|
|
("q_track", title),
|
|
("q_artist", artist),
|
|
("page_size", "5"),
|
|
("page", "1"),
|
|
("f_has_lyrics", "1"),
|
|
("s_track_rating", "desc"),
|
|
("quorum_factor", "1.0"),
|
|
],
|
|
)
|
|
|
|
try:
|
|
body = res.json()["message"]["body"]
|
|
except AttributeError:
|
|
return []
|
|
|
|
try:
|
|
tracks = body["track_list"]
|
|
except TypeError:
|
|
return []
|
|
|
|
if not tracks:
|
|
decoded = unidecode(artist)
|
|
if decoded == artist:
|
|
return []
|
|
return self.get_lrc(title, decoded, album)
|
|
|
|
return [
|
|
{
|
|
"track_id": str(t["track"]["track_id"]),
|
|
"title": t["track"]["track_name"],
|
|
"artist": t["track"]["artist_name"],
|
|
"album": t["track"]["album_name"],
|
|
"image": t["track"]["album_coverart_100x100"],
|
|
"provider": "musixmatch",
|
|
}
|
|
for t in tracks
|
|
]
|
|
|
|
|
|
class Lyrics(Plugin):
|
|
def __init__(self) -> None:
|
|
plugin = PluginTable.get_by_name("lyrics_finder")
|
|
if not plugin:
|
|
return
|
|
|
|
super().__init__(plugin.name, "Lyrics finder")
|
|
|
|
self.providers: list[LRCProvider] = [LRCLibProvider(), MusixmatchProvider()]
|
|
self._search_cache: dict[str, str] = {}
|
|
|
|
self.set_active(bool(int(plugin.active)))
|
|
|
|
@staticmethod
|
|
def _write_lrc(path: str, lrc: str) -> None:
|
|
output = Path(path).with_suffix(".lrc")
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
output.write_text(lrc, encoding="utf-8")
|
|
|
|
@plugin_method
|
|
def search_lyrics_by_title_and_artist(self, title: str, artist: str):
|
|
album = ""
|
|
results: list[dict] = []
|
|
seen = set()
|
|
|
|
for provider in self.providers:
|
|
try:
|
|
provider_results = provider.get_lrc(title, artist, album)
|
|
except TypeError:
|
|
provider_results = provider.get_lrc(title, artist) # type: ignore[misc]
|
|
|
|
for item in provider_results:
|
|
if not item:
|
|
continue
|
|
|
|
dedupe_key = (
|
|
(item.get("title") or "").strip().lower(),
|
|
(item.get("artist") or "").strip().lower(),
|
|
(item.get("album") or "").strip().lower(),
|
|
)
|
|
if dedupe_key in seen:
|
|
continue
|
|
|
|
seen.add(dedupe_key)
|
|
|
|
track_id = str(item.get("track_id", "")).strip()
|
|
lrc = item.get("lrc")
|
|
if track_id and lrc:
|
|
self._search_cache[track_id] = lrc
|
|
|
|
results.append(item)
|
|
|
|
return results
|
|
|
|
@plugin_method
|
|
def download_lyrics(self, trackid: str, path: str):
|
|
lrc = self._search_cache.get(trackid)
|
|
|
|
if not lrc:
|
|
for provider in self.providers:
|
|
lrc = provider.get_lrc_by_id(trackid)
|
|
if lrc:
|
|
break
|
|
|
|
if lrc is None:
|
|
return None
|
|
if len(lrc.replace("\n", "").strip()) < 1:
|
|
return None
|
|
|
|
self._write_lrc(path, lrc)
|
|
return lrc
|
|
|
|
@plugin_method
|
|
def download_lyrics_by_metadata(
|
|
self,
|
|
title: str,
|
|
artist: str,
|
|
path: str,
|
|
album: str = "",
|
|
):
|
|
if not title or not artist:
|
|
return None
|
|
|
|
for provider in self.providers:
|
|
try:
|
|
provider_results = provider.get_lrc(title, artist, album)
|
|
except TypeError:
|
|
provider_results = provider.get_lrc(title, artist) # type: ignore[misc]
|
|
|
|
for item in provider_results:
|
|
lrc = item.get("lrc")
|
|
if not lrc:
|
|
track_id = str(item.get("track_id", ""))
|
|
if track_id:
|
|
lrc = provider.get_lrc_by_id(track_id)
|
|
|
|
if not lrc or len(lrc.replace("\n", "").strip()) < 1:
|
|
continue
|
|
|
|
self._write_lrc(path, lrc)
|
|
return lrc
|
|
|
|
return None
|