mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
add stats to artist and album endpoints
return artist albums and other versions
This commit is contained in:
+48
-25
@@ -3,6 +3,7 @@ Contains all the album routes.
|
||||
"""
|
||||
|
||||
from dataclasses import asdict
|
||||
from pprint import pprint
|
||||
import random
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
@@ -21,15 +22,44 @@ from app.utils.hashing import create_hash
|
||||
from app.lib.albumslib import sort_by_track_no
|
||||
from app.serializers.album import serialize_for_card_many
|
||||
from app.serializers.track import serialize_tracks
|
||||
from app.utils.stats import get_track_group_stats
|
||||
|
||||
|
||||
bp_tag = Tag(name="Album", description="Single album")
|
||||
api = APIBlueprint("album", __name__, url_prefix="/album", abp_tags=[bp_tag])
|
||||
|
||||
|
||||
class GetAlbumVersionsBody(BaseModel):
|
||||
og_album_title: str = Field(
|
||||
description="The original album title (album.og_title)",
|
||||
example=Defaults.API_ALBUMNAME,
|
||||
)
|
||||
|
||||
albumhash: str = Field(
|
||||
description="The album hash of the album to exclude from the results.",
|
||||
example=Defaults.API_ALBUMHASH,
|
||||
)
|
||||
|
||||
|
||||
class GetMoreFromArtistsBody(AlbumLimitSchema):
|
||||
albumartists: list = Field(
|
||||
description="The artist hashes to get more albums from",
|
||||
)
|
||||
|
||||
base_title: str = Field(
|
||||
description="The base title of the album to exclude from the results.",
|
||||
example=Defaults.API_ALBUMNAME,
|
||||
default=None,
|
||||
)
|
||||
|
||||
|
||||
class GetAlbumInfoBody(AlbumHashSchema, AlbumLimitSchema):
|
||||
pass
|
||||
|
||||
|
||||
# NOTE: Don't use "/" as it will cause redirects (failure)
|
||||
@api.post("")
|
||||
def get_album_tracks_and_info(body: AlbumHashSchema):
|
||||
def get_album_tracks_and_info(body: GetAlbumInfoBody):
|
||||
"""
|
||||
Get album and tracks
|
||||
|
||||
@@ -52,7 +82,22 @@ def get_album_tracks_and_info(body: AlbumHashSchema):
|
||||
track_total = sum({int(t.extra.get("track_total", 1) or 1) for t in tracks})
|
||||
avg_bitrate = sum(t.bitrate for t in tracks) // (len(tracks) or 1)
|
||||
|
||||
more_from_data = GetMoreFromArtistsBody(
|
||||
albumartists=[a["artisthash"] for a in album.albumartists],
|
||||
albumlimit=body.limit,
|
||||
base_title=album.base_title,
|
||||
)
|
||||
other_versions_data = GetAlbumVersionsBody(
|
||||
albumhash=albumhash,
|
||||
og_album_title=album.og_title,
|
||||
)
|
||||
|
||||
|
||||
more_from_albums = get_more_from_artist(more_from_data)
|
||||
other_versions = get_album_versions(other_versions_data)
|
||||
|
||||
return {
|
||||
"stats": get_track_group_stats(tracks, is_album=True),
|
||||
"info": {
|
||||
**asdict(album),
|
||||
"is_favorite": album.is_favorite,
|
||||
@@ -67,6 +112,8 @@ def get_album_tracks_and_info(body: AlbumHashSchema):
|
||||
},
|
||||
"copyright": tracks[0].copyright,
|
||||
"tracks": serialize_tracks(tracks, remove_disc=False),
|
||||
"more_from": more_from_albums,
|
||||
"other_versions": other_versions,
|
||||
}
|
||||
|
||||
|
||||
@@ -84,18 +131,6 @@ def get_album_tracks(path: AlbumHashSchema):
|
||||
return serialize_tracks(tracks)
|
||||
|
||||
|
||||
class GetMoreFromArtistsBody(AlbumLimitSchema):
|
||||
albumartists: list = Field(
|
||||
description="The artist hashes to get more albums from",
|
||||
)
|
||||
|
||||
base_title: str = Field(
|
||||
description="The base title of the album to exclude from the results.",
|
||||
example=Defaults.API_ALBUMNAME,
|
||||
default=None,
|
||||
)
|
||||
|
||||
|
||||
@api.post("/from-artist")
|
||||
def get_more_from_artist(body: GetMoreFromArtistsBody):
|
||||
"""
|
||||
@@ -135,18 +170,6 @@ def get_more_from_artist(body: GetMoreFromArtistsBody):
|
||||
return all_albums
|
||||
|
||||
|
||||
class GetAlbumVersionsBody(BaseModel):
|
||||
og_album_title: str = Field(
|
||||
description="The original album title (album.og_title)",
|
||||
example=Defaults.API_ALBUMNAME,
|
||||
)
|
||||
|
||||
albumhash: str = Field(
|
||||
description="The album hash of the album to exclude from the results.",
|
||||
example=Defaults.API_ALBUMHASH,
|
||||
)
|
||||
|
||||
|
||||
@api.post("/other-versions")
|
||||
def get_album_versions(body: GetAlbumVersionsBody):
|
||||
"""
|
||||
|
||||
@@ -80,6 +80,7 @@ class AlbumLimitSchema(BaseModel):
|
||||
description="The number of albums to return",
|
||||
example=Defaults.API_CARD_LIMIT,
|
||||
default=Defaults.API_CARD_LIMIT,
|
||||
alias="albumlimit",
|
||||
)
|
||||
|
||||
|
||||
@@ -92,4 +93,5 @@ class ArtistLimitSchema(BaseModel):
|
||||
description="The number of artists to return",
|
||||
example=Defaults.API_CARD_LIMIT,
|
||||
default=Defaults.API_CARD_LIMIT,
|
||||
alias="artistlimit",
|
||||
)
|
||||
|
||||
+3
-1
@@ -28,6 +28,7 @@ from app.serializers.track import serialize_track
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.artists import ArtistStore
|
||||
from app.store.tracks import TrackStore
|
||||
from app.utils.stats import get_track_group_stats
|
||||
|
||||
bp_tag = Tag(name="Artist", description="Single artist")
|
||||
api = APIBlueprint("artist", __name__, url_prefix="/artist", abp_tags=[bp_tag])
|
||||
@@ -80,9 +81,9 @@ def get_artist(path: ArtistHashSchema, query: GetArtistQuery):
|
||||
if decade:
|
||||
artist.genres.insert(0, {"name": decade, "genrehash": decade})
|
||||
|
||||
stats = get_track_group_stats(tracks)
|
||||
duration = sum(t.duration for t in tracks) if tracks else 0
|
||||
tracks = tracks[:limit] if (limit and limit != -1) else tracks
|
||||
|
||||
tracks = [
|
||||
{
|
||||
**serialize_track(t),
|
||||
@@ -109,6 +110,7 @@ def get_artist(path: ArtistHashSchema, query: GetArtistQuery):
|
||||
},
|
||||
"tracks": tracks,
|
||||
"albums": albums,
|
||||
"stats": stats,
|
||||
}
|
||||
|
||||
|
||||
|
||||
+6
-20
@@ -73,23 +73,13 @@ def get_date_range(duration: str):
|
||||
date_range = None
|
||||
|
||||
match duration:
|
||||
case "week":
|
||||
case "week" | "month" | "year":
|
||||
date_range = (
|
||||
pendulum.now().subtract().start_of("week").timestamp(),
|
||||
pendulum.now().end_of("week").timestamp(),
|
||||
)
|
||||
case "month":
|
||||
date_range = (
|
||||
pendulum.now().subtract().start_of("month").timestamp(),
|
||||
pendulum.now().end_of("month").timestamp(),
|
||||
)
|
||||
case "year":
|
||||
date_range = (
|
||||
pendulum.now().subtract().start_of("year").timestamp(),
|
||||
pendulum.now().end_of("year").timestamp(),
|
||||
pendulum.now().subtract().start_of(duration).timestamp(),
|
||||
pendulum.now().end_of(duration).timestamp(),
|
||||
)
|
||||
case "alltime":
|
||||
date_range = (float(0), pendulum.now().timestamp())
|
||||
date_range = (0, pendulum.now().timestamp())
|
||||
case _:
|
||||
raise ValueError(f"Invalid duration: {duration}")
|
||||
|
||||
@@ -101,12 +91,8 @@ def get_duration_in_seconds(duration: str) -> int:
|
||||
Returns the number of seconds in a given duration.
|
||||
"""
|
||||
match duration:
|
||||
case "week":
|
||||
return 604800
|
||||
case "month":
|
||||
return 2629743
|
||||
case "year":
|
||||
return 31556926
|
||||
case "week" | "month" | "year":
|
||||
return int(pendulum.now().subtract().start_of(duration).timestamp())
|
||||
case "alltime":
|
||||
return int(pendulum.now().timestamp())
|
||||
|
||||
|
||||
+110
-5
@@ -1,11 +1,15 @@
|
||||
from collections import defaultdict
|
||||
import copy
|
||||
from typing import Any, Callable, TypeVar, Protocol, List
|
||||
|
||||
from pprint import pprint
|
||||
from typing import Any, Callable, TypeVar, List
|
||||
from app.db.userdata import ScrobbleTable
|
||||
from app.models.stats import StatItem
|
||||
from app.models.track import Track
|
||||
from app.models.album import Album
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.tracks import TrackStore
|
||||
from app.utils.dates import seconds_to_time_string
|
||||
|
||||
|
||||
def get_artists_in_period(start_time: int, end_time: int):
|
||||
@@ -160,10 +164,8 @@ def calculate_new_artists(current_artists: List[dict[str, Any]], timestamp: int)
|
||||
Calculate the number of new artists based on the current and all previous scrobbles.
|
||||
"""
|
||||
current_artists_set = set(artist["artisthash"] for artist in current_artists)
|
||||
all_records = ScrobbleTable.get_all(0, None)
|
||||
trackhashes = set(
|
||||
record.trackhash for record in all_records if record.timestamp < timestamp
|
||||
)
|
||||
all_records = ScrobbleTable.get_all_in_period(0, timestamp)
|
||||
trackhashes = set(record.trackhash for record in all_records)
|
||||
|
||||
previous_artists_set = set()
|
||||
|
||||
@@ -186,3 +188,106 @@ def calculate_new_albums(current_albums: List[Album], previous_albums: List[Albu
|
||||
previous_albums_set = set(album.albumhash for album in previous_albums)
|
||||
|
||||
return len(current_albums_set - previous_albums_set)
|
||||
|
||||
|
||||
def get_track_group_stats(tracks: list[Track], is_album: bool = False):
|
||||
if len(tracks) == 0:
|
||||
return []
|
||||
|
||||
played_tracks = [track for track in tracks if track.playcount > 0]
|
||||
unplayed_count = len(tracks) - len(played_tracks)
|
||||
|
||||
played_stat = StatItem(
|
||||
"played",
|
||||
f"never played",
|
||||
f"{unplayed_count}/{len(tracks)} tracks",
|
||||
)
|
||||
|
||||
play_duration = sum(track.duration for track in played_tracks)
|
||||
play_duration_stat = StatItem(
|
||||
"play_duration",
|
||||
"listened all time",
|
||||
f"{seconds_to_time_string(play_duration)}",
|
||||
)
|
||||
|
||||
try:
|
||||
top_track = max(played_tracks, key=lambda x: x.playduration)
|
||||
except ValueError:
|
||||
top_track = None
|
||||
|
||||
top_track_stat = (
|
||||
StatItem(
|
||||
"toptrack",
|
||||
f"top track ({seconds_to_time_string(top_track.playduration)} listened)",
|
||||
f"{top_track.title}",
|
||||
top_track.image,
|
||||
)
|
||||
if top_track
|
||||
else StatItem(
|
||||
"toptrack",
|
||||
"top track",
|
||||
"—",
|
||||
)
|
||||
)
|
||||
|
||||
albums_map = {}
|
||||
|
||||
for track in tracks:
|
||||
if track.albumhash not in albums_map:
|
||||
albums_map[track.albumhash] = {
|
||||
"playcount": 0,
|
||||
"playduration": 0,
|
||||
"title": track.album,
|
||||
"image": track.image,
|
||||
}
|
||||
|
||||
albums_map[track.albumhash]["playcount"] += 1
|
||||
albums_map[track.albumhash]["playduration"] += track.playduration
|
||||
|
||||
stats = [play_duration_stat, played_stat, top_track_stat]
|
||||
if not is_album:
|
||||
albums = list(albums_map.values())
|
||||
albums.sort(key=lambda x: x["playduration"], reverse=True)
|
||||
|
||||
top_album = albums[0] if albums[0]["playduration"] else None
|
||||
top_album_stat = (
|
||||
StatItem(
|
||||
"topalbum",
|
||||
f"top album ({seconds_to_time_string(top_album['playduration'])} listened)",
|
||||
f"{top_album['title']}",
|
||||
top_album["image"],
|
||||
)
|
||||
if top_album
|
||||
else StatItem(
|
||||
"topalbum",
|
||||
"top album",
|
||||
"—",
|
||||
)
|
||||
)
|
||||
|
||||
stats.append(top_album_stat)
|
||||
|
||||
if is_album:
|
||||
tracktotal: int = max(
|
||||
int(track.extra.get("track_total", 0) or 0) for track in tracks
|
||||
)
|
||||
percentage = (len(tracks) / tracktotal) * 100 if tracktotal > 0 else 101
|
||||
completedness = int(percentage) if percentage <= 100 else "?"
|
||||
|
||||
completeness_stat = (
|
||||
StatItem(
|
||||
"completeness",
|
||||
f"{len(tracks)}/{tracktotal} tracks available",
|
||||
f"{completedness}% complete",
|
||||
)
|
||||
if tracktotal
|
||||
else StatItem(
|
||||
"completeness",
|
||||
f"{len(tracks)}/? tracks available",
|
||||
"?",
|
||||
)
|
||||
)
|
||||
|
||||
stats.append(completeness_stat)
|
||||
|
||||
return stats
|
||||
|
||||
Reference in New Issue
Block a user