draft stats

This commit is contained in:
cwilvx
2024-10-05 08:32:26 +03:00
parent cb2e98a832
commit 4be2b80bf9
7 changed files with 412 additions and 3 deletions
+231 -1
View File
@@ -1,14 +1,37 @@
from dataclasses import dataclass
from math import e
from pprint import pprint
from flask_openapi3 import Tag
from flask_openapi3 import APIBlueprint
from pydantic import Field
from pydantic import Field, BaseModel
from app.api.apischemas import TrackHashSchema
from typing import Literal
from datetime import datetime, timedelta
from collections import defaultdict
from app.db.userdata import ScrobbleTable
from app.lib.extras import get_extra_info
from app.models.album import Album
from app.models.track import Track
from app.serializers.artist import serialize_for_card
from app.serializers.album import serialize_for_card as serialize_for_album_card
from app.serializers.track import serialize_track, serialize_tracks
from app.settings import Defaults
from app.store.albums import AlbumStore
from app.store.artists import ArtistStore
from app.store.tracks import TrackStore
from app.utils.dates import seconds_to_time_string
from app.utils.stats import (
calculate_album_trend,
calculate_artist_trend,
calculate_new_albums,
calculate_new_artists,
calculate_scrobble_trend,
calculate_track_trend,
get_albums_in_period,
get_artists_in_period,
get_tracks_in_period,
)
bp_tag = Tag(name="Logger", description="Log item plays")
api = APIBlueprint("logger", __name__, url_prefix="/logger", abp_tags=[bp_tag])
@@ -62,3 +85,210 @@ def log_track(body: LogTrackBody):
track.increment_playcount(duration, timestamp)
return {"msg": "recorded"}, 201
class TopTracksQuery(BaseModel):
duration: int = Field(
description="Duration in seconds to fetch data for", example=604800
)
limit: int = Field(description="Number of top tracks to return", example=10)
order_by: Literal["playcount", "playduration"] = Field(
description="Property to order by", example="playcount"
)
# SECTION: STATS
def get_help_text(
playcount: int, playduration: int, order_by: Literal["playcount", "playduration"]
):
"""
Get the help text given the playcount and playduration.
"""
if order_by == "playcount":
if playcount == 0:
return "unplayed"
return f"{playcount} play{'' if playcount == 1 else 's'}"
if order_by == "playduration":
return seconds_to_time_string(playduration)
# DISCLAIMER: Code beyond this point was partially written by Claude 3.5 Sonnet in Cursor.
# TODO: Refactor, group and clean up
@api.get("/top-tracks")
def get_top_tracks(query: TopTracksQuery):
"""
Get the top N tracks played within a given duration.
"""
end_time = int(datetime.now().timestamp())
start_time = end_time - query.duration
previous_start_time = start_time - query.duration
current_period_tracks, current_period_scrobbles = get_tracks_in_period(
start_time, end_time
)
previous_period_tracks, previous_period_scrobbles = get_tracks_in_period(
previous_start_time, start_time
)
scrobble_trend = (
"rising"
if current_period_scrobbles > previous_period_scrobbles
else (
"falling"
if current_period_scrobbles < previous_period_scrobbles
else "stable"
)
)
sorted_tracks = sort_tracks(current_period_tracks, query.order_by)
top_tracks = sorted_tracks[: query.limit]
response = []
for track in top_tracks:
trend = calculate_track_trend(
track, current_period_tracks, previous_period_tracks
)
track = {
**serialize_track(track),
"trend": trend,
"help_text": get_help_text(
track.playcount, track.playduration, query.order_by
),
}
response.append(track)
return {
"tracks": response,
"scrobbles": {
"text": f"{current_period_scrobbles} total play{'' if current_period_scrobbles == 1 else 's'}",
"trend": scrobble_trend,
},
}, 200
def sort_tracks(tracks: list[Track], order_by: Literal["playcount", "playduration"]):
return sorted(tracks, key=lambda x: getattr(x, order_by), reverse=True)
class TopArtistsQuery(BaseModel):
duration: int = Field(
description="Duration in seconds to fetch data for", example=604800
)
limit: int = Field(description="Number of top artists to return", example=10)
order_by: Literal["playcount", "playduration"] = Field(
description="Property to order by", example="playcount"
)
@api.get("/top-artists")
def get_top_artists(query: TopArtistsQuery):
"""
Get the top N artists played within a given duration.
"""
end_time = int(datetime.now().timestamp())
start_time = end_time - query.duration
previous_start_time = start_time - query.duration
current_period_artists = get_artists_in_period(start_time, end_time)
previous_period_artists = get_artists_in_period(previous_start_time, start_time)
new_artists = calculate_new_artists(current_period_artists, previous_period_artists)
scrobble_trend = calculate_scrobble_trend(
len(current_period_artists), len(previous_period_artists)
)
sorted_artists = sort_artists(current_period_artists, query.order_by)
top_artists = sorted_artists[: query.limit]
response = []
for artist in top_artists:
trend = calculate_artist_trend(
artist, current_period_artists, previous_period_artists
)
db_artist = ArtistStore.get_artist_by_hash(artist["artisthash"])
if db_artist is None:
continue
artist = {
**serialize_for_card(db_artist),
"trend": trend,
"help_text": get_help_text(
artist["playcount"], artist["playduration"], query.order_by
),
}
response.append(artist)
return {
"artists": response,
"scrobbles": {
"text": f"{new_artists} new artist{'' if new_artists == 1 else 's'} played",
"trend": scrobble_trend,
},
}, 200
def sort_artists(artists, order_by):
return sorted(artists, key=lambda x: x[order_by], reverse=True)
class TopAlbumsQuery(BaseModel):
duration: int = Field(
description="Duration in seconds to fetch data for", example=604800
)
limit: int = Field(description="Number of top albums to return", example=10)
order_by: Literal["playcount", "playduration"] = Field(
description="Property to order by", example="playcount"
)
@api.get("/top-albums")
def get_top_albums(query: TopAlbumsQuery):
"""
Get the top N albums played within a given duration.
"""
end_time = int(datetime.now().timestamp())
start_time = end_time - query.duration
previous_start_time = start_time - query.duration
current_period_albums = get_albums_in_period(start_time, end_time)
previous_period_albums = get_albums_in_period(previous_start_time, start_time)
new_albums = calculate_new_albums(current_period_albums, previous_period_albums)
scrobble_trend = calculate_scrobble_trend(
len(current_period_albums), len(previous_period_albums)
)
sorted_albums = sort_albums(current_period_albums, query.order_by)
top_albums = sorted_albums[: query.limit]
response = []
for album in top_albums:
trend = calculate_album_trend(
album, current_period_albums, previous_period_albums
)
album = {
**serialize_for_album_card(album),
"trend": trend,
"help_text": get_help_text(
album.playcount, album.playduration, query.order_by
),
}
response.append(album)
return {
"albums": response,
"scrobbles": {
"text": f"{new_albums} new album{'' if new_albums == 1 else 's'} played",
"trend": scrobble_trend,
},
}, 200
def sort_albums(albums: list[Album], order_by: Literal["playcount", "playduration"]):
return sorted(albums, key=lambda x: getattr(x, order_by), reverse=True)
+9
View File
@@ -281,6 +281,15 @@ class ScrobbleTable(Base):
return tracklog_to_dataclasses(result.fetchall())
@classmethod
def get_all_in_period(cls, start_time: int, end_time: int):
result = cls.execute(
select(cls)
.where(and_(cls.timestamp >= start_time, cls.timestamp <= end_time))
.order_by(cls.timestamp.desc())
)
return tracklog_to_dataclasses(result.fetchall())
class PlaylistTable(Base):
__tablename__ = "playlist"
+1 -1
View File
@@ -238,7 +238,7 @@ def create_artists(
artists = dict()
for track in all_tracks:
this_artists = track.artists
this_artists = [*track.artists]
for a in track.albumartists:
if a not in this_artists:
+4 -1
View File
@@ -1,4 +1,4 @@
from dataclasses import dataclass, field
from dataclasses import asdict, dataclass, field
from app.config import UserConfig
from app.utils.auth import get_current_userid
@@ -206,3 +206,6 @@ class Track:
self.trackhash = create_hash(
self.title, self.album, *(artist["name"] for artist in self.artists)
)
def copy(self):
return Track(**{**asdict(self), "config": UserConfig()})
+1
View File
@@ -24,6 +24,7 @@ def serialize_for_card(artist: Artist):
"extra",
"created_date",
"date",
"fav_userids",
}
for key in props_to_remove:
+1
View File
@@ -15,6 +15,7 @@ def serialize_track(track: Track, to_remove: set = set(), remove_disc=True) -> d
"og_title",
"og_album",
"copyright",
"config",
"disc",
"track",
"artist_hashes",
+165
View File
@@ -0,0 +1,165 @@
from collections import defaultdict
from typing import Any, Callable, TypeVar, Protocol, List
from app.db.userdata import ScrobbleTable
from app.models.track import Track
from app.models.album import Album
from app.store.albums import AlbumStore
from app.store.tracks import TrackStore
def get_artists_in_period(start_time: int, end_time: int):
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time)
artists = defaultdict(lambda: {"playcount": 0, "playduration": 0})
for scrobble in scrobbles:
track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])
if not track:
continue
track = track[0]
for artist in track.artists:
artisthash = artist["artisthash"]
artists[artisthash]["artisthash"] = artist["artisthash"]
artists[artisthash]["playcount"] += 1
artists[artisthash]["playduration"] += scrobble.duration
return list(artists.values())
def get_albums_in_period(start_time: int, end_time: int):
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time)
albums: dict[str, Album] = {}
for scrobble in scrobbles:
track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])
if not track:
continue
track = track[0]
album_entry = AlbumStore.albummap.get(track.albumhash)
if not album_entry:
continue
albumhash = album_entry.album.albumhash
if albumhash not in albums:
albums[albumhash] = album_entry.album
albums[albumhash].playcount = 0
albums[albumhash].playduration = 0
albums[albumhash].playcount += 1
albums[albumhash].playduration += scrobble.duration
return list(albums.values())
def get_tracks_in_period(start_time: int, end_time: int):
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time)
tracks: dict[str, Track] = {}
for scrobble in scrobbles:
if scrobble.trackhash not in tracks:
try:
track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])[0]
except IndexError:
continue
tracks[scrobble.trackhash] = track
tracks[scrobble.trackhash].playcount = 0
tracks[scrobble.trackhash].playduration = 0
tracks[scrobble.trackhash].playcount += 1
tracks[scrobble.trackhash].playduration += scrobble.duration
return list(tracks.values()), len(scrobbles)
T = TypeVar("T")
def calculate_trend(
item: T,
current_items: List[T],
previous_items: List[T],
key_func: Callable[[T], Any],
):
"""
Calculate the trend of an item based on its position in current and previous lists.
:param item: The item to calculate the trend for
:param current_items: The current list of items
:param previous_items: The previous list of items
:param key_func: A function to extract the comparison key from an item
:return: A dictionary containing:
- The trend as a string: 'rising', 'falling', or 'stable'
- A boolean flag indicating whether the item is new
"""
current_rank = next(
(i for i, t in enumerate(current_items) if key_func(t) == key_func(item)), -1
)
previous_rank = next(
(i for i, t in enumerate(previous_items) if key_func(t) == key_func(item)), -1
)
is_new = previous_rank == -1
if is_new:
return {"trend": "rising", "is_new": True}
elif current_rank == -1:
return {"trend": "falling", "is_new": False}
elif current_rank < previous_rank:
return {"trend": "rising", "is_new": False}
elif current_rank > previous_rank:
return {"trend": "falling", "is_new": False}
else:
return {"trend": "stable", "is_new": False}
def calculate_album_trend(
album_entry: Album, current_albums: List[Album], previous_albums: List[Album]
):
return calculate_trend(
album_entry, current_albums, previous_albums, lambda a: a.albumhash
)
def calculate_artist_trend(
artist: dict[str, Any],
current_artists: List[dict[str, Any]],
previous_artists: List[dict[str, Any]],
):
return calculate_trend(
artist, current_artists, previous_artists, lambda a: a["artisthash"]
)
def calculate_track_trend(
track: Track, current_tracks: List[Track], previous_tracks: List[Track]
):
return calculate_trend(
track, current_tracks, previous_tracks, lambda t: t.trackhash
)
def calculate_scrobble_trend(current_scrobbles: int, previous_scrobbles: int) -> str:
return (
"rising"
if current_scrobbles > previous_scrobbles
else ("falling" if current_scrobbles < previous_scrobbles else "stable")
)
def calculate_new_artists(
current_artists: List[dict[str, Any]], previous_artists: List[dict[str, Any]]
):
current_artists_set = set(artist["artisthash"] for artist in current_artists)
previous_artists_set = set(artist["artisthash"] for artist in previous_artists)
return len(current_artists_set - previous_artists_set)
def calculate_new_albums(current_albums: List[Album], previous_albums: List[Album]):
current_albums_set = set(album.albumhash for album in current_albums)
previous_albums_set = set(album.albumhash for album in previous_albums)
return len(current_albums_set - previous_albums_set)