From 4be2b80bf964f74da9d39d25e0a131fece621c16 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Sat, 5 Oct 2024 08:32:26 +0300 Subject: [PATCH] draft stats --- app/api/scrobble/__init__.py | 232 ++++++++++++++++++++++++++++++++++- app/db/userdata.py | 9 ++ app/lib/tagger.py | 2 +- app/models/track.py | 5 +- app/serializers/artist.py | 1 + app/serializers/track.py | 1 + app/utils/stats.py | 165 +++++++++++++++++++++++++ 7 files changed, 412 insertions(+), 3 deletions(-) create mode 100644 app/utils/stats.py diff --git a/app/api/scrobble/__init__.py b/app/api/scrobble/__init__.py index 46ac1338..e06665ce 100644 --- a/app/api/scrobble/__init__.py +++ b/app/api/scrobble/__init__.py @@ -1,14 +1,37 @@ +from dataclasses import dataclass +from math import e +from pprint import pprint from flask_openapi3 import Tag from flask_openapi3 import APIBlueprint -from pydantic import Field +from pydantic import Field, BaseModel from app.api.apischemas import TrackHashSchema +from typing import Literal +from datetime import datetime, timedelta +from collections import defaultdict from app.db.userdata import ScrobbleTable from app.lib.extras import get_extra_info +from app.models.album import Album +from app.models.track import Track +from app.serializers.artist import serialize_for_card +from app.serializers.album import serialize_for_card as serialize_for_album_card +from app.serializers.track import serialize_track, serialize_tracks from app.settings import Defaults from app.store.albums import AlbumStore from app.store.artists import ArtistStore from app.store.tracks import TrackStore +from app.utils.dates import seconds_to_time_string +from app.utils.stats import ( + calculate_album_trend, + calculate_artist_trend, + calculate_new_albums, + calculate_new_artists, + calculate_scrobble_trend, + calculate_track_trend, + get_albums_in_period, + get_artists_in_period, + get_tracks_in_period, +) bp_tag = Tag(name="Logger", description="Log item plays") api = APIBlueprint("logger", __name__, url_prefix="/logger", abp_tags=[bp_tag]) @@ -62,3 +85,210 @@ def log_track(body: LogTrackBody): track.increment_playcount(duration, timestamp) return {"msg": "recorded"}, 201 + + +class TopTracksQuery(BaseModel): + duration: int = Field( + description="Duration in seconds to fetch data for", example=604800 + ) + limit: int = Field(description="Number of top tracks to return", example=10) + order_by: Literal["playcount", "playduration"] = Field( + description="Property to order by", example="playcount" + ) + + +# SECTION: STATS + + +def get_help_text( + playcount: int, playduration: int, order_by: Literal["playcount", "playduration"] +): + """ + Get the help text given the playcount and playduration. + """ + if order_by == "playcount": + if playcount == 0: + return "unplayed" + + return f"{playcount} play{'' if playcount == 1 else 's'}" + if order_by == "playduration": + return seconds_to_time_string(playduration) + + +# DISCLAIMER: Code beyond this point was partially written by Claude 3.5 Sonnet in Cursor. +# TODO: Refactor, group and clean up + + +@api.get("/top-tracks") +def get_top_tracks(query: TopTracksQuery): + """ + Get the top N tracks played within a given duration. + """ + end_time = int(datetime.now().timestamp()) + start_time = end_time - query.duration + previous_start_time = start_time - query.duration + + current_period_tracks, current_period_scrobbles = get_tracks_in_period( + start_time, end_time + ) + previous_period_tracks, previous_period_scrobbles = get_tracks_in_period( + previous_start_time, start_time + ) + scrobble_trend = ( + "rising" + if current_period_scrobbles > previous_period_scrobbles + else ( + "falling" + if current_period_scrobbles < previous_period_scrobbles + else "stable" + ) + ) + + sorted_tracks = sort_tracks(current_period_tracks, query.order_by) + top_tracks = sorted_tracks[: query.limit] + + response = [] + for track in top_tracks: + trend = calculate_track_trend( + track, current_period_tracks, previous_period_tracks + ) + track = { + **serialize_track(track), + "trend": trend, + "help_text": get_help_text( + track.playcount, track.playduration, query.order_by + ), + } + + response.append(track) + + return { + "tracks": response, + "scrobbles": { + "text": f"{current_period_scrobbles} total play{'' if current_period_scrobbles == 1 else 's'}", + "trend": scrobble_trend, + }, + }, 200 + + +def sort_tracks(tracks: list[Track], order_by: Literal["playcount", "playduration"]): + return sorted(tracks, key=lambda x: getattr(x, order_by), reverse=True) + + +class TopArtistsQuery(BaseModel): + duration: int = Field( + description="Duration in seconds to fetch data for", example=604800 + ) + limit: int = Field(description="Number of top artists to return", example=10) + order_by: Literal["playcount", "playduration"] = Field( + description="Property to order by", example="playcount" + ) + + +@api.get("/top-artists") +def get_top_artists(query: TopArtistsQuery): + """ + Get the top N artists played within a given duration. + """ + end_time = int(datetime.now().timestamp()) + start_time = end_time - query.duration + previous_start_time = start_time - query.duration + + current_period_artists = get_artists_in_period(start_time, end_time) + previous_period_artists = get_artists_in_period(previous_start_time, start_time) + + new_artists = calculate_new_artists(current_period_artists, previous_period_artists) + scrobble_trend = calculate_scrobble_trend( + len(current_period_artists), len(previous_period_artists) + ) + + sorted_artists = sort_artists(current_period_artists, query.order_by) + top_artists = sorted_artists[: query.limit] + + response = [] + for artist in top_artists: + trend = calculate_artist_trend( + artist, current_period_artists, previous_period_artists + ) + db_artist = ArtistStore.get_artist_by_hash(artist["artisthash"]) + + if db_artist is None: + continue + + artist = { + **serialize_for_card(db_artist), + "trend": trend, + "help_text": get_help_text( + artist["playcount"], artist["playduration"], query.order_by + ), + } + response.append(artist) + + return { + "artists": response, + "scrobbles": { + "text": f"{new_artists} new artist{'' if new_artists == 1 else 's'} played", + "trend": scrobble_trend, + }, + }, 200 + + +def sort_artists(artists, order_by): + return sorted(artists, key=lambda x: x[order_by], reverse=True) + + +class TopAlbumsQuery(BaseModel): + duration: int = Field( + description="Duration in seconds to fetch data for", example=604800 + ) + limit: int = Field(description="Number of top albums to return", example=10) + order_by: Literal["playcount", "playduration"] = Field( + description="Property to order by", example="playcount" + ) + + +@api.get("/top-albums") +def get_top_albums(query: TopAlbumsQuery): + """ + Get the top N albums played within a given duration. + """ + end_time = int(datetime.now().timestamp()) + start_time = end_time - query.duration + previous_start_time = start_time - query.duration + + current_period_albums = get_albums_in_period(start_time, end_time) + previous_period_albums = get_albums_in_period(previous_start_time, start_time) + + new_albums = calculate_new_albums(current_period_albums, previous_period_albums) + scrobble_trend = calculate_scrobble_trend( + len(current_period_albums), len(previous_period_albums) + ) + + sorted_albums = sort_albums(current_period_albums, query.order_by) + top_albums = sorted_albums[: query.limit] + + response = [] + for album in top_albums: + trend = calculate_album_trend( + album, current_period_albums, previous_period_albums + ) + album = { + **serialize_for_album_card(album), + "trend": trend, + "help_text": get_help_text( + album.playcount, album.playduration, query.order_by + ), + } + response.append(album) + + return { + "albums": response, + "scrobbles": { + "text": f"{new_albums} new album{'' if new_albums == 1 else 's'} played", + "trend": scrobble_trend, + }, + }, 200 + + +def sort_albums(albums: list[Album], order_by: Literal["playcount", "playduration"]): + return sorted(albums, key=lambda x: getattr(x, order_by), reverse=True) diff --git a/app/db/userdata.py b/app/db/userdata.py index 277a2614..209a6e40 100644 --- a/app/db/userdata.py +++ b/app/db/userdata.py @@ -281,6 +281,15 @@ class ScrobbleTable(Base): return tracklog_to_dataclasses(result.fetchall()) + @classmethod + def get_all_in_period(cls, start_time: int, end_time: int): + result = cls.execute( + select(cls) + .where(and_(cls.timestamp >= start_time, cls.timestamp <= end_time)) + .order_by(cls.timestamp.desc()) + ) + return tracklog_to_dataclasses(result.fetchall()) + class PlaylistTable(Base): __tablename__ = "playlist" diff --git a/app/lib/tagger.py b/app/lib/tagger.py index 4c12cce3..8999d2ba 100644 --- a/app/lib/tagger.py +++ b/app/lib/tagger.py @@ -238,7 +238,7 @@ def create_artists( artists = dict() for track in all_tracks: - this_artists = track.artists + this_artists = [*track.artists] for a in track.albumartists: if a not in this_artists: diff --git a/app/models/track.py b/app/models/track.py index 5e572919..8f764bf9 100644 --- a/app/models/track.py +++ b/app/models/track.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass, field +from dataclasses import asdict, dataclass, field from app.config import UserConfig from app.utils.auth import get_current_userid @@ -206,3 +206,6 @@ class Track: self.trackhash = create_hash( self.title, self.album, *(artist["name"] for artist in self.artists) ) + + def copy(self): + return Track(**{**asdict(self), "config": UserConfig()}) diff --git a/app/serializers/artist.py b/app/serializers/artist.py index dc39e72e..fd5b1074 100644 --- a/app/serializers/artist.py +++ b/app/serializers/artist.py @@ -24,6 +24,7 @@ def serialize_for_card(artist: Artist): "extra", "created_date", "date", + "fav_userids", } for key in props_to_remove: diff --git a/app/serializers/track.py b/app/serializers/track.py index 49618050..7458567a 100644 --- a/app/serializers/track.py +++ b/app/serializers/track.py @@ -15,6 +15,7 @@ def serialize_track(track: Track, to_remove: set = set(), remove_disc=True) -> d "og_title", "og_album", "copyright", + "config", "disc", "track", "artist_hashes", diff --git a/app/utils/stats.py b/app/utils/stats.py new file mode 100644 index 00000000..aa841d31 --- /dev/null +++ b/app/utils/stats.py @@ -0,0 +1,165 @@ +from collections import defaultdict +from typing import Any, Callable, TypeVar, Protocol, List +from app.db.userdata import ScrobbleTable +from app.models.track import Track +from app.models.album import Album +from app.store.albums import AlbumStore +from app.store.tracks import TrackStore + + +def get_artists_in_period(start_time: int, end_time: int): + scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time) + artists = defaultdict(lambda: {"playcount": 0, "playduration": 0}) + + for scrobble in scrobbles: + track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash]) + if not track: + continue + track = track[0] + + for artist in track.artists: + artisthash = artist["artisthash"] + + artists[artisthash]["artisthash"] = artist["artisthash"] + artists[artisthash]["playcount"] += 1 + artists[artisthash]["playduration"] += scrobble.duration + + return list(artists.values()) + + +def get_albums_in_period(start_time: int, end_time: int): + scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time) + albums: dict[str, Album] = {} + + for scrobble in scrobbles: + track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash]) + if not track: + continue + + track = track[0] + album_entry = AlbumStore.albummap.get(track.albumhash) + if not album_entry: + continue + + albumhash = album_entry.album.albumhash + if albumhash not in albums: + albums[albumhash] = album_entry.album + albums[albumhash].playcount = 0 + albums[albumhash].playduration = 0 + + albums[albumhash].playcount += 1 + albums[albumhash].playduration += scrobble.duration + + return list(albums.values()) + + +def get_tracks_in_period(start_time: int, end_time: int): + scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time) + tracks: dict[str, Track] = {} + + for scrobble in scrobbles: + if scrobble.trackhash not in tracks: + try: + track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])[0] + except IndexError: + continue + + tracks[scrobble.trackhash] = track + tracks[scrobble.trackhash].playcount = 0 + tracks[scrobble.trackhash].playduration = 0 + + tracks[scrobble.trackhash].playcount += 1 + tracks[scrobble.trackhash].playduration += scrobble.duration + + return list(tracks.values()), len(scrobbles) + + +T = TypeVar("T") + + +def calculate_trend( + item: T, + current_items: List[T], + previous_items: List[T], + key_func: Callable[[T], Any], +): + """ + Calculate the trend of an item based on its position in current and previous lists. + + :param item: The item to calculate the trend for + :param current_items: The current list of items + :param previous_items: The previous list of items + :param key_func: A function to extract the comparison key from an item + :return: A dictionary containing: + - The trend as a string: 'rising', 'falling', or 'stable' + - A boolean flag indicating whether the item is new + """ + current_rank = next( + (i for i, t in enumerate(current_items) if key_func(t) == key_func(item)), -1 + ) + previous_rank = next( + (i for i, t in enumerate(previous_items) if key_func(t) == key_func(item)), -1 + ) + + is_new = previous_rank == -1 + + if is_new: + return {"trend": "rising", "is_new": True} + elif current_rank == -1: + return {"trend": "falling", "is_new": False} + elif current_rank < previous_rank: + return {"trend": "rising", "is_new": False} + elif current_rank > previous_rank: + return {"trend": "falling", "is_new": False} + else: + return {"trend": "stable", "is_new": False} + + +def calculate_album_trend( + album_entry: Album, current_albums: List[Album], previous_albums: List[Album] +): + return calculate_trend( + album_entry, current_albums, previous_albums, lambda a: a.albumhash + ) + + +def calculate_artist_trend( + artist: dict[str, Any], + current_artists: List[dict[str, Any]], + previous_artists: List[dict[str, Any]], +): + return calculate_trend( + artist, current_artists, previous_artists, lambda a: a["artisthash"] + ) + + +def calculate_track_trend( + track: Track, current_tracks: List[Track], previous_tracks: List[Track] +): + return calculate_trend( + track, current_tracks, previous_tracks, lambda t: t.trackhash + ) + + +def calculate_scrobble_trend(current_scrobbles: int, previous_scrobbles: int) -> str: + return ( + "rising" + if current_scrobbles > previous_scrobbles + else ("falling" if current_scrobbles < previous_scrobbles else "stable") + ) + + +def calculate_new_artists( + current_artists: List[dict[str, Any]], previous_artists: List[dict[str, Any]] +): + current_artists_set = set(artist["artisthash"] for artist in current_artists) + previous_artists_set = set(artist["artisthash"] for artist in previous_artists) + + return len(current_artists_set - previous_artists_set) + + +def calculate_new_albums(current_albums: List[Album], previous_albums: List[Album]): + current_albums_set = set(album.albumhash for album in current_albums) + previous_albums_set = set(album.albumhash for album in previous_albums) + + return len(current_albums_set - previous_albums_set)