lastfm: dump failed scrobbles locally

+ bump tinytag to v2.0.0 and refactor taglib.py
+ add explicit flag to track model
This commit is contained in:
cwilvx
2025-01-06 00:18:17 +03:00
parent 2cc063ad76
commit 2a12487220
10 changed files with 188 additions and 198 deletions
+5 -3
View File
@@ -7,6 +7,7 @@ from pprint import pprint
import random
from datetime import datetime
from itertools import groupby
from typing import Any
from flask_openapi3 import APIBlueprint, Tag
from pydantic import Field
@@ -72,6 +73,7 @@ def get_artist(path: ArtistHashSchema, query: GetArtistQuery):
except ValueError:
year = 0
genres = [*artist.genres]
decade = None
if year:
@@ -79,7 +81,7 @@ def get_artist(path: ArtistHashSchema, query: GetArtistQuery):
decade = str(decade)[2:] + "s"
if decade:
artist.genres.insert(0, {"name": decade, "genrehash": decade})
genres.insert(0, {"name": decade, "genrehash": decade})
stats = get_track_group_stats(tracks)
duration = sum(t.duration for t in tracks) if tracks else 0
@@ -105,7 +107,7 @@ def get_artist(path: ArtistHashSchema, query: GetArtistQuery):
"duration": duration,
"trackcount": tcount,
"albumcount": artist.albumcount,
"genres": artist.genres,
"genres": genres,
"is_favorite": artist.is_favorite,
},
"tracks": tracks,
@@ -150,7 +152,7 @@ def get_artist_albums(path: ArtistHashSchema, query: GetArtistAlbumsQuery):
albums = [a for a in albumdict.values()]
all_albums = sorted(albums, key=lambda a: a.date, reverse=True)
res = {
res: dict[str, Any] = {
"albums": [],
"appearances": [],
"compilations": [],
+2
View File
@@ -27,6 +27,8 @@ class UserConfig:
"AC/DC",
"Bob marley & the wailers",
"Crosby, Stills, Nash & Young",
"Smith & Thell",
"Peter, Paul & Mary",
}
)
genreSeparators: set[str] = field(default_factory=lambda: {"/", ";", "&"})
+1 -1
View File
@@ -465,7 +465,7 @@ class LibDataTable(Base):
@classmethod
def find_one(cls, hash: str, type: Literal["album", "artist"]):
result = cls.execute(
select(cls).where((cls.itemhash == hash) & (cls.itemtype == type))
select(cls).where((cls.itemhash == type + hash) & (cls.itemtype == type))
)
return result.fetchone()
+8 -2
View File
@@ -124,9 +124,15 @@ class ProcessArtistColors:
artist.set_color(colors[0])
# INFO: Write to the database.
print("RECORD")
print(record)
if record is None:
LibDataTable.insert_one(
{"itemhash": artisthash, "color": colors[0], "itemtype": "artist"}
{
"itemhash": "artist" + artisthash,
"color": colors[0],
"itemtype": "artist",
}
)
else:
LibDataTable.update_one(artisthash, {"color": colors[0]})
LibDataTable.update_one("artist" + artisthash, {"color": colors[0]})
+70 -177
View File
@@ -155,35 +155,54 @@ def get_tags(filepath: str, config: UserConfig):
return None
try:
tags: Any = TinyTag.get(filepath)
except: # noqa: E722
tags = TinyTag.get(filepath)
except Exception as e: # noqa: E722
return None
metadata: dict[str, Any] = {
"album": tags.album,
"albumartists": tags.albumartist,
"artists": tags.artist,
"title": tags.title,
"last_mod": last_mod,
"filepath": win_replace_slash(filepath),
"folder": win_replace_slash(os.path.dirname(filepath)),
"bitrate": tags.bitrate,
"duration": tags.duration,
"track": tags.track,
"disc": tags.disc,
"genres": tags.genre,
"copyright": " ".join(tags.other.get("copyright", [])),
"extra": {},
}
no_albumartist: bool = (tags.albumartist == "") or (tags.albumartist is None)
no_artist: bool = (tags.artist == "") or (tags.artist is None)
if no_albumartist and not no_artist:
tags.albumartist = tags.artist
# INFO: If no albumartist, use the artist
metadata["albumartists"] = tags.artist
if no_artist and not no_albumartist:
tags.artist = tags.albumartist
# INFO: If no artist, use the albumartist
metadata["artist"] = tags.albumartist
parse_data = None
# INFO: If title or album is empty, extract the album and title from the filename
to_filename = ["title", "album"]
for tag in to_filename:
p = getattr(tags, tag)
p = metadata[tag]
if p == "" or p is None:
parse_data = extract_artist_title(filename, config)
title = parse_data.title.replace("_", " ")
setattr(tags, tag, title)
metadata[tag] = title
# tags.title = tags.title.replace("_", " ")
# tags.album = tags.album.replace("_", " ")
parse = ["artist", "albumartist"]
# INFO: If artist or albumartist is empty
# extract the artist and albumartist from the filename
parse = ["artists", "albumartists"]
for tag in parse:
p = getattr(tags, tag)
p = metadata[tag]
if p == "" or p is None:
if not parse_data:
@@ -192,194 +211,68 @@ def get_tags(filepath: str, config: UserConfig):
artist = parse_data.artist
if artist:
setattr(tags, tag, ", ".join(artist))
metadata[tag] = ", ".join(artist)
else:
setattr(tags, tag, "Unknown")
metadata[tag] = "Unknown"
# TODO: Move parsing title, album and artist to startup. (Maybe!)
to_check = ["album", "year", "albumartist"]
# INFO: If these are empty, set to "Unknown"
to_check = ["album", "albumartists"]
for prop in to_check:
p = getattr(tags, prop)
p = metadata[prop]
if (p is None) or (p == ""):
setattr(tags, prop, "Unknown")
metadata[prop] = "Unknown"
# INFO: Round the bitrate and duration
to_round = ["bitrate", "duration"]
for prop in to_round:
try:
setattr(tags, prop, math.floor(getattr(tags, prop)))
metadata[prop] = math.floor(getattr(tags, prop))
except TypeError:
setattr(tags, prop, 0)
metadata[prop] = 0
# INFO: Convert these to int
to_int = ["track", "disc"]
for prop in to_int:
try:
setattr(tags, prop, int(getattr(tags, prop)))
metadata[prop] = int(getattr(tags, prop))
except (ValueError, TypeError):
setattr(tags, prop, 1)
metadata[prop] = 1
try:
tags.copyright = tags.extra["copyright"]
except KeyError:
tags.copyright = None
# INFO: Extract copyright from extra data
metadata["date"] = parse_date(tags.year or "") or int(last_mod)
# tags.image = f"{tags.albumhash}.webp"
tags.folder = win_replace_slash(os.path.dirname(filepath))
tags.date = parse_date(tags.year) or int(last_mod)
tags.filepath = win_replace_slash(filepath)
tags.last_mod = last_mod
tags.artists = tags.artist
tags.albumartists = tags.albumartist
# split_artist = split_artists(tags.artist, separators=config.artistSeparators)
# split_albumartists = split_artists(tags.albumartist, separators=config.artistSeparators)
# new_title = tags.title
# TODO: Figure out which is the best spot to create these hashes
# create albumhash using og_album
tags.albumhash = create_hash(tags.album or "", tags.albumartist)
metadata["albumhash"] = create_hash(
tags.album or "", metadata.get("albumartists", "")
)
# extract featured artists
# if config.extractFeaturedArtists:
# feat, new_title = parse_feat_from_title(
# tags.title, separators=config.artistSeparators
# )
# original_lower = "-".join([create_hash(a) for a in split_artist])
# split_artist.extend(a for a in feat if create_hash(a) not in original_lower)
metadata["trackhash"] = create_hash(
metadata.get("artist", ""), metadata.get("album", ""), metadata.get("title", "")
)
# if no albumartist, assign to the first artist
if not tags.albumartist:
tags.albumartist = split_artists(tags.artist, config)[:1]
# create json objects for artists and albumartists
# tags.artists = [
# {
# "artisthash": create_hash(a, decode=True),
# "name": a,
# }
# for a in split_artist
# ]
# tags.albumartists = [
# {
# "artisthash": create_hash(a, decode=True),
# "name": a,
# }
# for a in split_albumartists
# ]
# tags.artisthashes = list(
# {a["artisthash"] for a in tags.artists}
# )
# remove prod by
# if config.removeProdBy:
# new_title = remove_prod(new_title)
# if track is a single, ie.
# if og_title == album, rename album to new_title
# if tags.title == tags.album:
# tags.album = new_title
# remove remaster from track title
# if config.removeRemasterInfo:
# new_title = clean_title(new_title)
# save final title
# tags.og_title = tags.title
# tags.title = new_title
# tags.og_album = tags.album
# clean album title
# if config.cleanAlbumTitle:
# tags.album, _ = get_base_title_and_versions(tags.album, get_versions=False)
# merge album versions
# if config.mergeAlbums:
# tags.albumhash = create_hash(
# tags.album, *(a["name"] for a in tags.albumartists)
# )
# process genres
# if tags.genre:
# src_genres: str = tags.genre
# src_genres = src_genres.lower()
# # separators = {"/", ";", "&"}
# separators = set(config.genreSeparators)
# contains_rnb = "r&b" in src_genres
# contains_rock = "rock & roll" in src_genres
# if contains_rnb:
# src_genres = src_genres.replace("r&b", "RnB")
# if contains_rock:
# src_genres = src_genres.replace("rock & roll", "rock")
# for s in separators:
# src_genres = src_genres.replace(s, ",")
# genres_list: list[str] = src_genres.split(",")
# tags.genres = [
# {"name": g.strip(), "genrehash": create_hash(g.strip())}
# for g in genres_list
# ]
# tags.genrehashes = [g["genrehash"] for g in tags.genres]
# else:
# tags.genres = []
# tags.genrehashes = []
tags.genres = tags.genre
# sub underscore with space
# tags.title = tags.title.replace("_", " ")
# tags.album = tags.album.replace("_", " ")
tags.trackhash = create_hash(tags.artists, tags.album, tags.title)
more_extra = {
"audio_offset": tags.audio_offset,
"bitdepth": tags.bitdepth,
"composer": tags.composer,
"channels": tags.channels,
"comment": tags.comment,
"disc_total": tags.disc_total,
"filesize": tags.filesize,
"samplerate": tags.samplerate,
"track_total": tags.track_total,
"hashinfo": {
"algo": "sha1",
"format": "[:5]+[-5:]", # first 5 + last 5 chars
},
extra: dict[str, Any] = {
k: v for k, v in tags.as_dict().items() if metadata.get(k, "meh") == "meh"
}
tags.extra = {**tags.extra, **more_extra}
extra["hashinfo"] = {
"algo": "sha1",
"format": "[:5]+[-5:]", # first 5 + last 5 chars
}
tags = tags.__dict__
to_pop = ["filename", "artist", "albumartist", "year"]
# delete all tag properties that start with _ (tinytag internals)
for tag in list(tags):
if tag.startswith("_"):
del tags[tag]
# REMOVE EMPTY VALUES
for key, value in extra.items():
if (
value is None
or value == ""
# INFO: If value is a list, check if it's empty or if the first element is empty
or (type(value) is list and "".join(value) == "")
):
to_pop.append(key)
to_delete = [
"filesize",
"audio_offset",
"channels",
"comment",
"composer",
"disc_total",
"samplerate",
"track_total",
"year",
"bitdepth",
"artist",
"albumartist",
"genre",
]
for key in to_pop:
extra.pop(key, None)
for tag in to_delete:
del tags[tag]
return tags
metadata["extra"] = extra
return metadata
+3 -4
View File
@@ -16,10 +16,9 @@ class CustomFormatter(logging.Formatter):
red = "\033[41m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
# format = (
# "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
# )
format_ = "%(message)s"
# format_ = "[%(asctime)s] %(name)s %(levelname)s %(message)s (%(filename)s:%(lineno)d)"
format_ = "[%(asctime)s] [%(levelname)s] %(message)s (%(filename)s:%(lineno)d)\n"
# format_ = "%(message)s"
FORMATS = {
logging.DEBUG: grey + format_ + reset,
+3
View File
@@ -50,6 +50,7 @@ class Track:
_pos: int = 0
_ati: str = ""
image: str = ""
explicit: bool = False
fav_userids: list[int] = field(default_factory=list)
@property
@@ -78,6 +79,8 @@ class Track:
self.og_album = self.album
self.folder = self.folder + "/"
self.weakhash = create_hash(self.title, self.artists)
explicit_tag = self.extra.get("explicit", ["0"])
self.explicit = int(explicit_tag[0]) == 1
self.image = self.albumhash + ".webp"
self.extra = {
+88 -3
View File
@@ -1,3 +1,6 @@
import json
from pathlib import Path
import time
import requests
from typing import Any
from hashlib import md5
@@ -5,13 +8,21 @@ from urllib.parse import quote_plus
from app.config import UserConfig
from app.models.track import Track
from app.settings import Paths
from app.utils.auth import get_current_userid
from app.utils.threading import background
from app.plugins import Plugin, plugin_method
from app.logger import log
class LastFmPlugin(Plugin):
"""
Last.fm scrobbler plugin.
"""
UPLOADING_DUMPS = False
def __init__(self):
self.config = UserConfig()
super().__init__("lastfm", "Last.fm scrobbler")
@@ -71,11 +82,85 @@ class LastFmPlugin(Plugin):
"albumArtist": track.albumartists[0]["name"],
}
success = self.post_scrobble_data({**data})
if not success:
self.dump_scrobble(data)
else:
self.upload_dumps()
return success
def post_scrobble_data(self, data: dict[str, Any]):
"""
Uploads the scrobble data and handles the
response from the lastfm scrobble endpoint.
"""
log.info(f"scrobble data: {data}")
try:
res = self.post(data)
log.info("scrobble response:" + str(res.text))
log.info("scrobble response json:" + str(res.json()))
except Exception as e:
log.info("scrobble error" + str(e))
log.warn("scrobble response error" + str(e))
return False
log.info("scrobble response text: " + str(res.text))
log.info("scrobble response json: " + str(res.json()))
res_json: dict[str, Any] = res.json()
if res_json.get("error"):
log.error("LASTFM: scrobble error" + str(res_json))
if res_json["error"] == 9:
log.error("LAST.FM: Invalid session key")
# Invalid session key
self.config.lastfmSessionKeys.pop(str(get_current_userid()))
self.config.lastfmSessionKeys = self.config.lastfmSessionKeys
return False
if res_json.get("scrobbles", {}).get("@attr", {}).get("accepted") == 1:
log.info("scrobble accepted")
return True
return False
# SECTION: Persistence
def dump_scrobble(self, data: dict[str, Any]):
"""
Dumps the scrobble data to a file in the lastfm plugin directory.
"""
dump_dir = Path(Paths.get_plugins_path(), "lastfm")
if not dump_dir.exists():
dump_dir.mkdir(parents=True, exist_ok=True)
path = dump_dir / f"{int(time.time())}.json"
log.info(f"Dumping scrobble to {path}")
with open(path, "w") as f:
json.dump(data, f)
def upload_dumps(self):
"""
Uploads the scrobble dumps to the lastfm api.
"""
if self.UPLOADING_DUMPS:
return
self.UPLOADING_DUMPS = True
dump_dir = Path(Paths.get_plugins_path(), "lastfm")
if not dump_dir.exists():
return
try:
for file in dump_dir.iterdir():
log.info(f"Uploading dump: {file}")
with open(file, "r") as f:
data = json.load(f)
success = self.post_scrobble_data(data)
if success:
file.unlink()
finally:
self.UPLOADING_DUMPS = False
Generated
+7 -7
View File
@@ -2323,17 +2323,17 @@ widechars = ["wcwidth"]
[[package]]
name = "tinytag"
version = "1.10.1"
description = "Read music meta data and length of MP3, OGG, OPUS, MP4, M4A, FLAC, WMA and Wave files"
version = "2.0.0"
description = "Read audio file metadata"
optional = false
python-versions = ">=2.7"
python-versions = ">=3.7"
files = [
{file = "tinytag-1.10.1-py3-none-any.whl", hash = "sha256:e437654d04c966fbbbdbf807af61eb9759f1d80e4173a7d26202506b37cfdaf0"},
{file = "tinytag-1.10.1.tar.gz", hash = "sha256:122a63b836f85094aacca43fc807aaee3290be3de17d134f5f4a08b509ae268f"},
{file = "tinytag-2.0.0-py3-none-any.whl", hash = "sha256:971b9dceae2d1de73b5e8300639ea0b41454633b899426e702aed15f0e72a9b4"},
{file = "tinytag-2.0.0.tar.gz", hash = "sha256:d041f53d15553bb148549bfbc7feab445caf7105ba95fa2ecb9827bb06b62275"},
]
[package.extras]
tests = ["flake8", "pytest", "pytest-cov"]
tests = ["coverage", "mypy", "pycodestyle", "pylint", "pytest"]
[[package]]
name = "tomli"
@@ -2775,4 +2775,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<3.12"
content-hash = "85f8932739522e7b53b4fe5bbecc3c10a30bb690e25bf9404209c57ec71e88d3"
content-hash = "733ca957831c695560fe292a6dfdad13c3fc905695f473cd48cf13bfba8defdc"
+1 -1
View File
@@ -14,7 +14,7 @@ Pillow = "^9.0.1"
"colorgram.py" = "^1.2.0"
tqdm = "^4.65.0"
rapidfuzz = "^2.13.7"
tinytag = "^1.10.1"
tinytag = ">=2.0.0"
Unidecode = "^1.3.6"
psutil = "^5.9.4"
show-in-file-manager = "^1.1.4"