diff --git a/app/api/home/__init__.py b/app/api/home/__init__.py index e7fa192a..095c0f44 100644 --- a/app/api/home/__init__.py +++ b/app/api/home/__init__.py @@ -4,7 +4,7 @@ from pydantic import BaseModel, Field from app.api.apischemas import GenericLimitSchema from app.lib.home.recentlyadded import get_recently_added_items -from app.lib.home.recentlyplayed import get_recently_played +from app.lib.home.get_recently_played import get_recently_played from app.store.homepage import HomepageStore bp_tag = Tag(name="Home", description="Homepage items") diff --git a/app/api/plugins/mixes.py b/app/api/plugins/mixes.py index a22bab83..168b6a1c 100644 --- a/app/api/plugins/mixes.py +++ b/app/api/plugins/mixes.py @@ -1,7 +1,9 @@ +from typing import Literal from flask_openapi3 import Tag from flask_openapi3 import APIBlueprint from pydantic import BaseModel, Field +from app.db.userdata import MixTable from app.plugins.mixes import MixesPlugin from app.store.homepage import HomepageStore from app.store.tracks import TrackStore @@ -13,37 +15,43 @@ api = APIBlueprint( ) -@api.post("/track") -def get_track_mix(): - """ - Get a track mix - """ - mixes = MixesPlugin() - track = TrackStore.trackhashmap["9eeee292264ad01b"].get_best() - tracks = mixes.get_track_mix([track]) - - return { - "total": len(tracks), - "tracks": tracks, - } +class GetMixesBody(BaseModel): + mixtype: Literal["artists", "tracks"] = Field(description="The type of mix") -@api.post("/artist") -def get_artist_mix(): - mixes = MixesPlugin() - # return mixes.create_artist_mixes() - # tracks = mixes.get_artist_mix("09306be8039b98ad") +@api.get("/") +def get_artist_mixes(path: GetMixesBody): + srcmixes = MixTable.get_all(with_userid=True) + mixes = [] - # return { - # "total": len(tracks), - # "tracks": tracks, - # } + if path.mixtype == "artists": + mixes = [mix.to_dict(convert_timestamp=True) for mix in srcmixes] + elif path.mixtype == "tracks": + plugin = MixesPlugin() - return "hi" + for mix in srcmixes: + custom_mix = plugin.get_track_mix(mix) + if custom_mix: + mixes.append(custom_mix.to_dict(convert_timestamp=True)) + + seen_mixids = set() + + # filter duplicates by trackshash + final_mixes = [] + for mix in mixes: + # INFO: Ignore duplicates for artist mixes + if mix["id"] in seen_mixids and path.mixtype == "tracks": + continue + + final_mixes.append(mix) + seen_mixids.add(mix["id"]) + + return final_mixes class MixQuery(BaseModel): mixid: str = Field(description="The mix id") + sourcehash: str = Field(description="The sourcehash of the mix") @api.get("/") @@ -58,8 +66,45 @@ def get_mix(query: MixQuery): case _: return {"msg": "Invalid mix ID"}, 400 - mix = HomepageStore.get_mix(mixtype, query.mixid[1:]) - if mix: + # INFO: Check if the mix is already in the homepage store + mix = HomepageStore.get_mix(mixtype, query.mixid) + if mix and mix["sourcehash"] == query.sourcehash: return mix, 200 - return {"msg": "Mix not found"}, 404 + # INF0: Get the mix from the db + mix = MixTable.get_by_sourcehash(query.sourcehash) + + if not mix: + return {"msg": "Mix not found"}, 404 + + if mixtype == "custom_mixes": + plugin = MixesPlugin() + mix = plugin.get_track_mix(mix) + + if not mix: + return {"msg": "Mix not found"}, 404 + + return mix.to_full_dict(), 200 + + +class SaveMixRequest(BaseModel): + mixid: str = Field(description="The id of the mix") + type: str = Field(description="The type of mix") + sourcehash: str = Field(description="The sourcehash of the mix") + + +@api.post("/save") +def save_mix(body: SaveMixRequest): + mix_type = body.type + mix_sourcehash = body.sourcehash + + if mix_type == "artist": + state = MixTable.save_artist_mix(mix_sourcehash) + elif mix_type == "track": + state = MixTable.save_track_mix(mix_sourcehash) + + mix = HomepageStore.find_mix(body.mixid) + + if mix: + mix.saved = state + return {"msg": "Mixes saved"}, 200 diff --git a/app/db/userdata.py b/app/db/userdata.py index 41d34226..b1b80afe 100644 --- a/app/db/userdata.py +++ b/app/db/userdata.py @@ -486,15 +486,22 @@ class MixTable(Base): description: Mapped[str] = mapped_column(String()) timestamp: Mapped[int] = mapped_column(Integer()) sourcehash: Mapped[str] = mapped_column(String(), unique=True, index=True) - tracks: Mapped[list[str]] = mapped_column(JSON(), default_factory=list) + userid: Mapped[int] = mapped_column( + Integer(), ForeignKey("user.id", ondelete="cascade"), index=True + ) saved: Mapped[bool] = mapped_column(Boolean(), default=False) + tracks: Mapped[list[str]] = mapped_column(JSON(), default_factory=list) extra: Mapped[dict[str, Any]] = mapped_column( JSON(), nullable=True, default_factory=dict ) @classmethod - def get_all(cls): - result = cls.execute(select(cls)) + def get_all(cls, with_userid: bool = False): + if with_userid: + result = cls.execute(select(cls).where(cls.userid == get_current_userid())) + else: + result = cls.execute(select(cls)) + return Mix.mixes_to_dataclasses(result.fetchall()) @classmethod @@ -505,6 +512,13 @@ class MixTable(Base): if res: return Mix.mix_to_dataclass(res) + @classmethod + def get_by_mixid(cls, mixid: str): + result = cls.execute(select(cls).where(cls.mixid == mixid)) + res = result.fetchone() + if res: + return Mix.mix_to_dataclass(res) + @classmethod def insert_one(cls, mix: Mix): mixdict = asdict(mix) @@ -512,3 +526,62 @@ class MixTable(Base): del mixdict["id"] return cls.execute(insert(cls).values(mixdict), commit=True) + + @classmethod + def update_one(cls, mixid: str, mix: Mix): + mixdict = asdict(mix) + mixdict["mixid"] = mix.id + del mixdict["id"] + + return cls.execute( + update(cls) + .where( + and_( + cls.mixid == mixid, + cls.sourcehash == mix.sourcehash, + cls.userid == get_current_userid(), + ) + ) + .values(mixdict), + commit=True, + ) + + @classmethod + def save_artist_mix(cls, sourcehash: str): + """ + Toggles the saved status of an artist mix. + """ + + mix = cls.get_by_sourcehash(sourcehash) + + if not mix: + return False + + mix.saved = not mix.saved + cls.update_one(mix.id, mix) + + return mix.saved + + @classmethod + def get_saved_track_mixes(cls): + """ + Return all mixes that have the extra.trackmix_saved set to True. + """ + + result = cls.execute(select(cls).where(cls.extra.c.trackmix_saved == True)) + return Mix.mixes_to_dataclasses(result.fetchall()) + + @classmethod + def save_track_mix(cls, sourcehash: str): + """ + Toggles the property extra.trackmix_saved to True. + """ + + mix = cls.get_by_sourcehash(sourcehash) + if not mix: + return False + + mix.extra["trackmix_saved"] = not mix.extra["trackmix_saved"] + cls.update_one(mix.id, mix) + + return mix.extra["trackmix_saved"] diff --git a/app/lib/home/__init__.py b/app/lib/home/__init__.py new file mode 100644 index 00000000..a762a66d --- /dev/null +++ b/app/lib/home/__init__.py @@ -0,0 +1,303 @@ +from app.store.albums import AlbumStore +from app.store.artists import ArtistStore +from app.store.folder import FolderStore +from app.store.tracks import TrackStore + +from app.models.logger import TrackLog +from app.lib.playlistlib import get_first_4_images +from app.utils.dates import timestamp_to_time_passed +from app.lib.home.recentlyadded import get_recently_added_playlist +from app.db.userdata import FavoritesTable, MixTable, PlaylistTable +from app.lib.home.recentlyplayed import get_recently_played_playlist + +from app.serializers.track import serialize_track +from app.serializers.album import album_serializer +from app.serializers.artist import serialize_for_card +from app.serializers.playlist import serialize_for_card as serialize_playlist + + +def create_items(entries: list[TrackLog], limit: int): + custom_playlists = [ + {"name": "recentlyadded", "handler": get_recently_added_playlist}, + {"name": "recentlyplayed", "handler": get_recently_played_playlist}, + ] + + items = [] + added = set() + + for entry in entries: + if len(items) >= limit: + break + + if entry.source in added: + continue + + added.add(entry.source) + + if entry.type == "mix": + if not entry.type_src: + continue + + # INFO: Get mix from homepage store + from app.store.homepage import HomepageStore + mix = HomepageStore.find_mix(entry.type_src) + + if not mix and entry.type_src.startswith("t"): + # mix is a track mix (not saved in the db) + continue + + if not mix: + # INFO: Get mix from db + mix = MixTable.get_by_mixid(entry.type_src) + + if not mix: + continue + + items.append( + { + "type": "mix", + "hash": entry.type_src, + "timestamp": entry.timestamp, + } + ) + continue + + if entry.type == "album": + album = AlbumStore.albummap.get(entry.type_src) + + if album is None: + continue + + item = { + "type": "album", + "hash": entry.type_src, + "timestamp": entry.timestamp, + } + + items.append(item) + continue + + if entry.type == "artist": + artist = ArtistStore.artistmap.get(entry.type_src) + + if artist is None: + continue + + items.append( + { + "type": "artist", + "hash": entry.type_src, + "timestamp": entry.timestamp, + } + ) + + continue + + if entry.type == "folder": + folder = entry.type_src + + if not folder: + continue + + if not folder.endswith("/"): + folder += "/" + + is_home_dir = entry.type_src == "$home" + + if is_home_dir: + folder = os.path.expanduser("~") + + item = { + "type": "folder", + "hash": entry.type_src, + "timestamp": entry.timestamp, + } + + items.append(item) + continue + + if entry.type == "playlist": + is_custom = entry.type_src in [i["name"] for i in custom_playlists] + + if is_custom: + items.append( + { + "type": "playlist", + "hash": entry.type_src, + "timestamp": entry.timestamp, + "is_custom": True, + } + ) + continue + + playlist = PlaylistTable.get_by_id(entry.type_src) + if playlist is None: + continue + + item = { + "type": "playlist", + "hash": entry.type_src, + "timestamp": entry.timestamp, + } + + items.append(item) + continue + + if entry.type == "favorite": + items.append( + { + "type": "favorite", + "timestamp": entry.timestamp, + } + ) + continue + + t = TrackStore.trackhashmap.get(entry.trackhash) + + if t is None: + continue + + item = { + "type": "track", + "hash": entry.trackhash, + "timestamp": entry.timestamp, + } + items.append(item) + + return items + + +def recover_items(items: list[dict]): + custom_playlists = [ + {"name": "recentlyadded", "handler": get_recently_added_playlist}, + {"name": "recentlyplayed", "handler": get_recently_played_playlist}, + ] + recovered = [] + + for item in items: + recovered_item = None + + if item["type"] == "album": + album = AlbumStore.get_album_by_hash(item["hash"]) + if album is None: + continue + + album = album_serializer( + album, + to_remove={ + "genres", + "date", + "count", + "duration", + "albumartists_hashes", + "og_title", + }, + ) + + recovered_item = { + "type": "album", + "item": album, + } + elif item["type"] == "artist": + artist = ArtistStore.get_artist_by_hash(item["hash"]) + if artist is None: + continue + + recovered_item = { + "type": "artist", + "item": serialize_for_card(artist), + } + elif item["type"] == "folder": + count = FolderStore.count_tracks_containing_paths([item["hash"]]) + + recovered_item = { + "type": "folder", + "item": { + "path": item["hash"], + "count": count[0]["trackcount"], + }, + } + elif item["type"] == "playlist": + if item.get("is_custom"): + playlist, _ = next( + i["handler"]() + for i in custom_playlists + if i["name"] == item["hash"] + ) + playlist.images = [i["image"] for i in playlist.images] + + playlist = serialize_playlist( + playlist, to_remove={"settings", "duration"} + ) + recovered_item = { + "type": "playlist", + "item": playlist, + } + else: + playlist = PlaylistTable.get_by_id(item["hash"]) + if playlist is None: + continue + + tracks = TrackStore.get_tracks_by_trackhashes(playlist.trackhashes) + playlist.clear_lists() + + if not playlist.has_image: + images = get_first_4_images(tracks) + images = [i["image"] for i in images] + playlist.images = images + + recovered_item = { + "type": "playlist", + "item": serialize_playlist(playlist), + } + elif item["type"] == "favorite": + recovered_item = { + "type": "favorite", + "item": { + "count": FavoritesTable.count(), + }, + } + elif item["type"] == "track": + track = TrackStore.trackhashmap.get(item["hash"]) + if track is None: + continue + + recovered_item = { + "type": "track", + "item": serialize_track(track.get_best()), + } + + elif item["type"] == "mix": + from app.store.homepage import HomepageStore + mix = HomepageStore.find_mix(item["hash"]) + if mix is None: + mix = MixTable.get_by_mixid(item["hash"]) + + if mix is None: + continue + + mix = mix.to_dict() + + recovered_item = { + "type": "mix", + "item": mix, + } + + if recovered_item is not None: + helptext = item.get("help_text") or item.get("type") + secondary_text = item.get("secondary_text") + + if "secondary_text" in item: + secondary_text = item["secondary_text"] + elif "timestamp" in item: + secondary_text = timestamp_to_time_passed(item["timestamp"]) + + if helptext: + recovered_item["item"]["help_text"] = helptext + + if secondary_text: + recovered_item["item"]["time"] = secondary_text + + recovered.append(recovered_item) + + return recovered diff --git a/app/lib/home/create_items.py b/app/lib/home/create_items.py new file mode 100644 index 00000000..d10d134d --- /dev/null +++ b/app/lib/home/create_items.py @@ -0,0 +1,158 @@ +from app.db.userdata import MixTable, PlaylistTable +from app.lib.home.recentlyadded import get_recently_added_playlist +from app.lib.home.recentlyplayed import get_recently_played_playlist +from app.models.logger import TrackLog +from app.store.albums import AlbumStore +from app.store.artists import ArtistStore +from app.store.homepage import HomepageStore +from app.store.tracks import TrackStore + + +def create_items(entries: list[TrackLog], limit: int): + custom_playlists = [ + {"name": "recentlyadded", "handler": get_recently_added_playlist}, + {"name": "recentlyplayed", "handler": get_recently_played_playlist}, + ] + + items = [] + added = set() + + for entry in entries: + if len(items) >= limit: + break + + if entry.source in added: + continue + + added.add(entry.source) + + if entry.type == "mix": + if not entry.type_src: + continue + + # INFO: Get mix from homepage store + mix = HomepageStore.find_mix(entry.type_src) + + if not mix and entry.type_src.startswith("t"): + # mix is a track mix (not saved in the db) + continue + + if not mix: + # INFO: Get mix from db + mix = MixTable.get_by_mixid(entry.type_src) + + if not mix: + continue + + items.append( + { + "type": "mix", + "hash": entry.type_src, + "timestamp": entry.timestamp, + } + ) + continue + + if entry.type == "album": + album = AlbumStore.albummap.get(entry.type_src) + + if album is None: + continue + + item = { + "type": "album", + "hash": entry.type_src, + "timestamp": entry.timestamp, + } + + items.append(item) + continue + + if entry.type == "artist": + artist = ArtistStore.artistmap.get(entry.type_src) + + if artist is None: + continue + + items.append( + { + "type": "artist", + "hash": entry.type_src, + "timestamp": entry.timestamp, + } + ) + + continue + + if entry.type == "folder": + folder = entry.type_src + + if not folder: + continue + + if not folder.endswith("/"): + folder += "/" + + is_home_dir = entry.type_src == "$home" + + if is_home_dir: + folder = os.path.expanduser("~") + + item = { + "type": "folder", + "hash": entry.type_src, + "timestamp": entry.timestamp, + } + + items.append(item) + continue + + if entry.type == "playlist": + is_custom = entry.type_src in [i["name"] for i in custom_playlists] + + if is_custom: + items.append( + { + "type": "playlist", + "hash": entry.type_src, + "timestamp": entry.timestamp, + "is_custom": True, + } + ) + continue + + playlist = PlaylistTable.get_by_id(entry.type_src) + if playlist is None: + continue + + item = { + "type": "playlist", + "hash": entry.type_src, + "timestamp": entry.timestamp, + } + + items.append(item) + continue + + if entry.type == "favorite": + items.append( + { + "type": "favorite", + "timestamp": entry.timestamp, + } + ) + continue + + t = TrackStore.trackhashmap.get(entry.trackhash) + + if t is None: + continue + + item = { + "type": "track", + "hash": entry.trackhash, + "timestamp": entry.timestamp, + } + items.append(item) + + return items \ No newline at end of file diff --git a/app/lib/home/get_recently_played.py b/app/lib/home/get_recently_played.py new file mode 100644 index 00000000..79bde7d4 --- /dev/null +++ b/app/lib/home/get_recently_played.py @@ -0,0 +1,47 @@ +from app.db.userdata import ScrobbleTable +from app.lib.home.create_items import create_items +from app.models.logger import TrackLog + + +def get_recently_played( + limit: int, userid: int | None = None, _entries: list[TrackLog] = [] +): + """ + Get the recently played items for the homepage. + + Pass a list of track log entries to use a subset of the scrobble table. + """ + # TODO: Paginate this + items = [] + + BATCH_SIZE = 200 + current_index = 0 + + if len(_entries): + entries = _entries + limit = 1 + else: + entries = ScrobbleTable.get_all(0, BATCH_SIZE, userid=userid) + + max_iterations = 20 + iterations = 0 + + while len(items) < limit and iterations < max_iterations: + items.extend(create_items(entries, limit)) + current_index += BATCH_SIZE + + if len(items) < limit: + entries = ScrobbleTable.get_all( + start=current_index + 1, limit=BATCH_SIZE, userid=userid + ) + if not entries: + break + + iterations += 1 + + if iterations == max_iterations: + print( + f"Warning: Reached maximum iterations ({max_iterations}) while fetching recently played items" + ) + + return items \ No newline at end of file diff --git a/app/lib/home/recentlyplayed.py b/app/lib/home/recentlyplayed.py index a565ab80..bde6816d 100644 --- a/app/lib/home/recentlyplayed.py +++ b/app/lib/home/recentlyplayed.py @@ -1,307 +1,13 @@ from datetime import datetime -import os -from app.db.userdata import FavoritesTable, PlaylistTable, ScrobbleTable -from app.models.logger import TrackLog from app.models.playlist import Playlist -from app.serializers.track import serialize_track -from app.serializers.album import album_serializer from app.lib.playlistlib import get_first_4_images -from app.store.folder import FolderStore from app.utils.dates import ( create_new_date, date_string_to_time_passed, - timestamp_to_time_passed, ) -from app.serializers.artist import serialize_for_card -from app.serializers.playlist import serialize_for_card as serialize_playlist -from app.lib.home.recentlyadded import get_recently_added_playlist -from app.store.albums import AlbumStore from app.store.tracks import TrackStore -from app.store.artists import ArtistStore - - -def get_recently_played( - limit: int, userid: int | None = None, _entries: list[TrackLog] = [] -): - """ - Get the recently played items for the homepage. - - Pass a list of track log entries to use a subset of the scrobble table. - """ - # TODO: Paginate this - items = [] - added = set() - - custom_playlists = [ - {"name": "recentlyadded", "handler": get_recently_added_playlist}, - {"name": "recentlyplayed", "handler": get_recently_played_playlist}, - ] - - def create_items(entries: list[TrackLog]): - for entry in entries: - if len(items) >= limit: - break - - if entry.source in added: - continue - - added.add(entry.source) - - if entry.type == "album": - album = AlbumStore.albummap.get(entry.type_src) - - if album is None: - continue - - item = { - "type": "album", - "hash": entry.type_src, - "timestamp": entry.timestamp, - } - - items.append(item) - continue - - if entry.type == "artist": - artist = ArtistStore.artistmap.get(entry.type_src) - - if artist is None: - continue - - items.append( - { - "type": "artist", - "hash": entry.type_src, - "timestamp": entry.timestamp, - } - ) - - continue - - if entry.type == "folder": - folder = entry.type_src - - if not folder: - continue - - if not folder.endswith("/"): - folder += "/" - - is_home_dir = entry.type_src == "$home" - - if is_home_dir: - folder = os.path.expanduser("~") - - item = { - "type": "folder", - "hash": entry.type_src, - "timestamp": entry.timestamp, - } - - items.append(item) - continue - - if entry.type == "playlist": - is_custom = entry.type_src in [i["name"] for i in custom_playlists] - - if is_custom: - items.append( - { - "type": "playlist", - "hash": entry.type_src, - "timestamp": entry.timestamp, - "is_custom": True, - } - ) - continue - - playlist = PlaylistTable.get_by_id(entry.type_src) - if playlist is None: - continue - - item = { - "type": "playlist", - "hash": entry.type_src, - "timestamp": entry.timestamp, - } - - items.append(item) - continue - - if entry.type == "favorite": - items.append( - { - "type": "favorite", - "timestamp": entry.timestamp, - } - ) - continue - - t = TrackStore.trackhashmap.get(entry.trackhash) - - if t is None: - continue - - item = { - "type": "track", - "hash": entry.trackhash, - "timestamp": entry.timestamp, - } - items.append(item) - - BATCH_SIZE = 200 - current_index = 0 - - if len(_entries): - entries = _entries - limit = 1 - else: - entries = ScrobbleTable.get_all(0, BATCH_SIZE, userid=userid) - - max_iterations = 20 - iterations = 0 - - while len(items) < limit and iterations < max_iterations: - create_items(entries) - current_index += BATCH_SIZE - - if len(items) < limit: - entries = ScrobbleTable.get_all( - start=current_index + 1, limit=BATCH_SIZE, userid=userid - ) - if not entries: - break - - iterations += 1 - - if iterations == max_iterations: - print( - f"Warning: Reached maximum iterations ({max_iterations}) while fetching recently played items" - ) - - return items - - -def recover_items(items: list[dict]): - custom_playlists = [ - {"name": "recentlyadded", "handler": get_recently_added_playlist}, - {"name": "recentlyplayed", "handler": get_recently_played_playlist}, - ] - recovered = [] - - for item in items: - recovered_item = None - - if item["type"] == "album": - album = AlbumStore.get_album_by_hash(item["hash"]) - if album is None: - continue - - album = album_serializer( - album, - to_remove={ - "genres", - "date", - "count", - "duration", - "albumartists_hashes", - "og_title", - }, - ) - - recovered_item = { - "type": "album", - "item": album, - } - elif item["type"] == "artist": - artist = ArtistStore.get_artist_by_hash(item["hash"]) - if artist is None: - continue - - recovered_item = { - "type": "artist", - "item": serialize_for_card(artist), - } - elif item["type"] == "folder": - count = FolderStore.count_tracks_containing_paths([item["hash"]]) - - recovered_item = { - "type": "folder", - "item": { - "path": item["hash"], - "count": count[0]["trackcount"], - }, - } - elif item["type"] == "playlist": - if item.get("is_custom"): - playlist, _ = next( - i["handler"]() - for i in custom_playlists - if i["name"] == item["hash"] - ) - playlist.images = [i["image"] for i in playlist.images] - - playlist = serialize_playlist( - playlist, to_remove={"settings", "duration"} - ) - recovered_item = { - "type": "playlist", - "item": playlist, - } - else: - playlist = PlaylistTable.get_by_id(item["hash"]) - if playlist is None: - continue - - tracks = TrackStore.get_tracks_by_trackhashes(playlist.trackhashes) - playlist.clear_lists() - - if not playlist.has_image: - images = get_first_4_images(tracks) - images = [i["image"] for i in images] - playlist.images = images - - recovered_item = { - "type": "playlist", - "item": serialize_playlist(playlist), - } - elif item["type"] == "favorite": - recovered_item = { - "type": "favorite", - "item": { - "count": FavoritesTable.count(), - }, - } - elif item["type"] == "track": - track = TrackStore.trackhashmap.get(item["hash"]) - if track is None: - continue - - recovered_item = { - "type": "track", - "item": serialize_track(track.get_best()), - } - - if recovered_item is not None: - helptext = item.get("help_text") or item.get("type") - secondary_text = item.get("secondary_text") - - if "secondary_text" in item: - secondary_text = item["secondary_text"] - elif "timestamp" in item: - secondary_text = timestamp_to_time_passed(item["timestamp"]) - - if helptext: - recovered_item["item"]["help_text"] = helptext - - if secondary_text: - recovered_item["item"]["time"] = secondary_text - - recovered.append(recovered_item) - - return recovered def get_recently_played_playlist(limit: int = 100): diff --git a/app/lib/recipes/artistmixes.py b/app/lib/recipes/artistmixes.py index 91a9af81..3ee9d6a1 100644 --- a/app/lib/recipes/artistmixes.py +++ b/app/lib/recipes/artistmixes.py @@ -25,14 +25,11 @@ class ArtistMixes(HomepageRoutine): custom_mixes = [] for _mix in mixes: - custom_mix = mix.get_custom_mix_items(_mix) + custom_mix = mix.get_track_mix(_mix) if custom_mix: custom_mixes.append(custom_mix) - for index, custom_mix in enumerate(custom_mixes): - custom_mix.title = f"Mix {index + 1}" - HomepageStore.set_mixes( custom_mixes, entrykey="custom_mixes", userid=user.id ) diff --git a/app/lib/recipes/recents.py b/app/lib/recipes/recents.py index 12ff558d..a9123d3c 100644 --- a/app/lib/recipes/recents.py +++ b/app/lib/recipes/recents.py @@ -1,7 +1,7 @@ import pprint from app.db.userdata import ScrobbleTable, UserTable from app.lib.home.recentlyadded import get_recently_added_items -from app.lib.home.recentlyplayed import get_recently_played +from app.lib.home.get_recently_played import get_recently_played from app.lib.recipes import HomepageRoutine from app.store.homepage import HomepageStore diff --git a/app/models/logger.py b/app/models/logger.py index e3668ea9..5d67e382 100644 --- a/app/models/logger.py +++ b/app/models/logger.py @@ -27,10 +27,11 @@ class TrackLog: def __post_init__(self): prefix_map = { + "mix:": "mix", "al:": "album", "ar:": "artist", - "pl:": "playlist", "fo:": "folder", + "pl:": "playlist", "favorite": "favorite", } diff --git a/app/models/mix.py b/app/models/mix.py index 84fe41d5..097788df 100644 --- a/app/models/mix.py +++ b/app/models/mix.py @@ -5,7 +5,8 @@ from typing import Any from app.lib.playlistlib import get_first_4_images from app.serializers.track import serialize_tracks from app.store.tracks import TrackStore -from app.utils.dates import seconds_to_time_string +from app.utils.dates import seconds_to_time_string, timestamp_to_time_passed +from app.utils.hashing import create_hash @dataclass @@ -15,6 +16,7 @@ class Mix: description: str tracks: list[str] sourcehash: str + userid: int """ A hash of the tracks used to generate the mix. """ @@ -41,8 +43,13 @@ class Mix: return _dict - def to_dict(self): + def to_dict(self, convert_timestamp: bool = False): item = asdict(self) + item["trackshash"] = create_hash(*self.tracks[:40]) + + if convert_timestamp: + item["time"] = timestamp_to_time_passed(item["timestamp"]) + del item["tracks"] del item["extra"]["albums"] diff --git a/app/plugins/mixes.py b/app/plugins/mixes.py index 83259890..5247f0e0 100644 --- a/app/plugins/mixes.py +++ b/app/plugins/mixes.py @@ -32,7 +32,7 @@ class MixAlreadyExists(Exception): class MixesPlugin(Plugin): MAX_TRACKS_TO_FETCH = 5 MIN_TRACK_MIX_LENGTH = 15 - MIX_TRACKS = 40 + MIX_TRACKS_LENGTH = 40 MIN_DAY_LISTEN_DURATION = 3 * 60 # 3 minutes MIN_WEEK_LISTEN_DURATION = 10 * 60 # 10 minutes @@ -65,7 +65,7 @@ class MixesPlugin(Plugin): return False @plugin_method - def get_track_mix(self, tracks: list[Track], with_help: bool = False): + def get_track_mix_data(self, tracks: list[Track], with_help: bool = False): """ Given a list of tracks, creates a mix by fetching data from the Swing Music Cloud recommendation server. @@ -137,25 +137,25 @@ class MixesPlugin(Plugin): return trackmatches, results["albums"], results["artists"] @plugin_method - def get_artist_mix(self, artisthash: str): - """ - Given an artisthash, creates an artist mix using the - self.MAX_TRACKS_TO_FETCH most listened to tracks. + # def get_artist_mix(self, artisthash: str): + # """ + # Given an artisthash, creates an artist mix using the + # self.MAX_TRACKS_TO_FETCH most listened to tracks. - Returns a tuple of the mix and the sourcehash. - """ - artist = ArtistStore.artistmap[artisthash] - tracks = TrackStore.get_tracks_by_trackhashes(artist.trackhashes) + # Returns a tuple of the mix and the sourcehash. + # """ + # artist = ArtistStore.artistmap[artisthash] + # tracks = TrackStore.get_tracks_by_trackhashes(artist.trackhashes) - tracks = sorted(tracks, key=lambda x: x.playduration, reverse=True) - sourcetracks = tracks[: self.MAX_TRACKS_TO_FETCH] - sourcehash = create_hash(*[t.trackhash for t in sourcetracks]) + # tracks = sorted(tracks, key=lambda x: x.playduration, reverse=True) + # sourcetracks = tracks[: self.MAX_TRACKS_TO_FETCH] + # sourcehash = create_hash(*[t.trackhash for t in sourcetracks]) - if MixTable.get_by_sourcehash(sourcehash): - raise MixAlreadyExists() + # if MixTable.get_by_sourcehash(sourcehash): + # raise MixAlreadyExists() - tracks, albums, artists = self.get_track_mix(tracks[: self.MAX_TRACKS_TO_FETCH]) - return (tracks, albums, artists, sourcehash) + # tracks, albums, artists = self.get_track_mix(tracks[: self.MAX_TRACKS_TO_FETCH]) + # return (tracks, albums, artists, sourcehash) @plugin_method def create_artist_mixes(self, userid: int): @@ -225,7 +225,7 @@ class MixesPlugin(Plugin): ) mix = self.create_artist_mix( - artist, trackhashes[: self.MAX_TRACKS_TO_FETCH] + artist, trackhashes[: self.MAX_TRACKS_TO_FETCH], userid=userid ) if mix: @@ -262,7 +262,9 @@ class MixesPlugin(Plugin): return f"Featuring {tracks[0].artists[0]['name']}" - def create_artist_mix(self, artist: dict[str, str], trackhashes: list[str]): + def create_artist_mix( + self, artist: dict[str, str], trackhashes: list[str], userid: int + ): """ Given an artist dict, creates an artist mix. """ @@ -286,7 +288,7 @@ class MixesPlugin(Plugin): print(db_mix.title) return db_mix - mix_tracks, albums, artists = self.get_track_mix(tracks) + mix_tracks, albums, artists = self.get_track_mix_data(tracks) if len(mix_tracks) < self.MIN_TRACK_MIX_LENGTH: return None @@ -300,11 +302,12 @@ class MixesPlugin(Plugin): mix = Mix( # the a prefix indicates that this is an artist mix - id=f"a{artist['artisthash']}", + id=f"a{userid}{artist['artisthash']}", title=artist["artist"] + " Radio", description=self.get_mix_description(mix_tracks, artist["artisthash"]), tracks=[t.trackhash for t in mix_tracks], sourcehash=sourcehash, + userid=userid, extra={ "type": "artist", "artisthash": artist["artisthash"], @@ -435,31 +438,47 @@ class MixesPlugin(Plugin): """ pass - def get_custom_mix_items(self, mix: Mix): + def get_track_mix(self, mix: Mix): """ Given a mix, returns the excess tracks as a custom mix. """ # INFO: If the mix can't have more than 20 tracks, return None - if len(mix.tracks) <= self.MIX_TRACKS + 20: + if len(mix.tracks) <= self.MIX_TRACKS_LENGTH + 20: return None - tracks = TrackStore.get_tracks_by_trackhashes(mix.tracks[self.MIX_TRACKS :]) + og_track = TrackStore.trackhashmap.get(mix.tracks[0]) - return Mix( - id=f"t{mix.extra['artisthash']}", - title="", # INFO: Will be filled after all mixes are created. + if not og_track: + return None + + og_track = og_track.get_best() + tracks = [og_track] + TrackStore.get_tracks_by_trackhashes( + mix.tracks[self.MIX_TRACKS_LENGTH :] + ) + + trackmix = Mix( + id=f"t{mix.userid}{mix.extra['artisthash']}", + title=og_track.title, description=self.get_mix_description(tracks, mix.extra["artisthash"]), tracks=[t.trackhash for t in tracks], sourcehash=create_hash(*[t.trackhash for t in tracks]), + userid=mix.userid, extra={ "type": "track", + "og_sourcehash": mix.sourcehash, "images": self.get_custom_mix_images(tracks), "artists": None, "albums": None, }, ) + # INFO: Write track mix save state + if mix.extra.get("trackmix_saved"): + trackmix.saved = True + + return trackmix + def get_custom_mix_images(self, tracks: list[Track]): first_album = tracks[0].albumhash @@ -478,12 +497,17 @@ class MixesPlugin(Plugin): if artisthash in seen: continue + artist = ArtistStore.artistmap.get(artisthash) + + if not artist: + continue + seen.add(artisthash) image = { "image": artisthash + ".webp", "type": "artist", - "color": ArtistStore.get_artist_by_hash(artisthash).color, + "color": artist.artist.color, } images.append(image) diff --git a/app/store/homepage.py b/app/store/homepage.py index 289c41c9..07a8cf80 100644 --- a/app/store/homepage.py +++ b/app/store/homepage.py @@ -1,123 +1,16 @@ -from abc import ABC from typing import Any -from app.lib.home.recentlyplayed import recover_items -from app.models.mix import Mix + +from app.store.homepageentries import ( + BecauseYouListenedToArtistHomepageEntry, + GenericRecoverableEntry, + HomepageEntry, + MixHomepageEntry, + RecentlyAddedHomepageEntry, + RecentlyPlayedHomepageEntry, +) from app.utils.auth import get_current_userid -class HomepageEntry(ABC): - """ - Base class for all homepage entries. - - items is a dict of userid to a dict of stuff. - """ - - title: str - description: str - items: dict[int, Any] - - def __init__(self, title: str, description: str): - self.title = title - self.description = description - - def get_items(self, userid: int, limit: int | None = None): - """ - Return usable items for the homepage. - """ - ... - - -class MixHomepageEntry(HomepageEntry): - """ - A homepage entry for mixes. - self.items is a dict of userid to a dict of mixid to mix. - """ - - items: dict[int, dict[str, Mix]] - - def __init__(self, title: str, description: str): - super().__init__(title, description) - self.items = {} - - def get_items(self, userid: int, limit: int | None = None): - items = [] - - for mix in self.items.get(userid, {}).values(): - if limit and len(items) >= limit: - break - - items.append( - { - "type": "mix", - "item": mix.to_dict(), - } - ) - - return { - "title": self.title, - "description": self.description, - "items": items, - } - - -class RecentlyPlayedHomepageEntry(HomepageEntry): - """ - A homepage entry for recently played. - """ - - items: dict[int, list[dict[str, Any]]] - - def __init__(self, title: str, description: str = ""): - super().__init__(title, description) - self.items = {} - - def get_items(self, userid: int, limit: int | None = None): - items = self.items.get(userid, [])[:limit] - - return { - "title": self.title, - "description": self.description, - "items": recover_items(items), - } - - -class RecentlyAddedHomepageEntry(RecentlyPlayedHomepageEntry): - """ - A homepage entry for recently added. - """ - - def get_items(self, userid: int, limit: int | None = None): - return super().get_items(0, limit) - - -class GenericRecoverableEntry(RecentlyPlayedHomepageEntry): - """ - A homepage entry for top streamed. - """ - - # NOTE: This extends RecentlyPlayedHomepageEntry because - # the shape of the data is the same. - pass - - -class BecauseYouListenedToArtistHomepageEntry(RecentlyPlayedHomepageEntry): - """ - A homepage entry for because you listened to artist. - """ - - # SHAPE: {userid: {title: str, items: list[RecoverableItem]}} - items: dict[int, dict[str, Any]] - - def get_items(self, userid: int, limit: int | None = None): - title = self.items.get(userid, {}).get("title") - items = self.items.get(userid, {}).get("items", [])[:limit] - - return { - "title": title, - "items": recover_items(items), - } - - class HomepageStore: """ Stores the homepage items. @@ -159,7 +52,7 @@ class HomepageStore: @classmethod def set_mixes(cls, items: list[Any], entrykey: str, userid: int | None = None): - idmap = {item.id[1:]: item for item in items} + idmap = {item.id: item for item in items} cls.entries[entrykey].items[userid or get_current_userid()] = idmap @classmethod @@ -175,3 +68,14 @@ class HomepageStore: for entry in cls.entries.keys() if len(cls.entries[entry].items) ] + + @classmethod + def find_mix(cls, mixid: str): + mixentries = ["artist_mixes", "custom_mixes"] + + for entry in mixentries: + mix = cls.entries[entry].items.get(get_current_userid(), {}).get(mixid) + if mix: + return mix + + return None diff --git a/app/store/homepageentries.py b/app/store/homepageentries.py new file mode 100644 index 00000000..7343a884 --- /dev/null +++ b/app/store/homepageentries.py @@ -0,0 +1,118 @@ +from abc import ABC +from typing import Any + +from app.lib.home import recover_items +from app.models.mix import Mix + +class HomepageEntry(ABC): + """ + Base class for all homepage entries. + + items is a dict of userid to a dict of stuff. + """ + + title: str + description: str + items: dict[int, Any] + + def __init__(self, title: str, description: str): + self.title = title + self.description = description + + def get_items(self, userid: int, limit: int | None = None): + """ + Return usable items for the homepage. + """ + ... + + +class MixHomepageEntry(HomepageEntry): + """ + A homepage entry for mixes. + self.items is a dict of userid to a dict of mixid to mix. + """ + + items: dict[int, dict[str, Mix]] + + def __init__(self, title: str, description: str): + super().__init__(title, description) + self.items = {} + + def get_items(self, userid: int, limit: int | None = None): + items = [] + + for mix in self.items.get(userid, {}).values(): + if limit and len(items) >= limit: + break + + items.append( + { + "type": "mix", + "item": mix.to_dict(), + } + ) + + return { + "title": self.title, + "description": self.description, + "items": items, + } + + +class RecentlyPlayedHomepageEntry(HomepageEntry): + """ + A homepage entry for recently played. + """ + + items: dict[int, list[dict[str, Any]]] + + def __init__(self, title: str, description: str = ""): + super().__init__(title, description) + self.items = {} + + def get_items(self, userid: int, limit: int | None = None): + items = self.items.get(userid, [])[:limit] + + return { + "title": self.title, + "description": self.description, + "items": recover_items(items), + } + + +class RecentlyAddedHomepageEntry(RecentlyPlayedHomepageEntry): + """ + A homepage entry for recently added. + """ + + def get_items(self, userid: int, limit: int | None = None): + return super().get_items(0, limit) + + +class GenericRecoverableEntry(RecentlyPlayedHomepageEntry): + """ + A homepage entry for top streamed. + """ + + # NOTE: This extends RecentlyPlayedHomepageEntry because + # the shape of the data is the same. + pass + + +class BecauseYouListenedToArtistHomepageEntry(RecentlyPlayedHomepageEntry): + """ + A homepage entry for because you listened to artist. + """ + + # SHAPE: {userid: {title: str, items: list[RecoverableItem]}} + items: dict[int, dict[str, Any]] + + def get_items(self, userid: int, limit: int | None = None): + title = self.items.get(userid, {}).get("title") + items = self.items.get(userid, {}).get("items", [])[:limit] + + return { + "title": title, + "items": recover_items(items), + } +