diff --git a/app/api/playlist.py b/app/api/playlist.py index 1f2720d0..a65c6714 100644 --- a/app/api/playlist.py +++ b/app/api/playlist.py @@ -12,13 +12,14 @@ from flask_openapi3 import Tag from flask_openapi3 import APIBlueprint, FileStorage from app import models -from app.db.sqlite.playlists import SQLitePlaylistMethods +from app.db.libdata import TrackTable +from app.db.userdata import PlaylistTable from app.lib import playlistlib from app.lib.albumslib import sort_by_track_no from app.lib.home.recentlyadded import get_recently_added_playlist from app.lib.home.recentlyplayed import get_recently_played_playlist +from app.models.playlist import Playlist from app.serializers.playlist import serialize_for_card -from app.store.tracks import TrackStore from app.utils.dates import create_new_date, date_string_to_time_passed from app.utils.remove_duplicates import remove_duplicates from app.settings import Paths @@ -26,8 +27,6 @@ from app.settings import Paths tag = Tag(name="Playlists", description="Get and manage playlists") api = APIBlueprint("playlists", __name__, url_prefix="/playlists", abp_tags=[tag]) -PL = SQLitePlaylistMethods - class SendAllPlaylistsQuery(BaseModel): no_images: bool = Field(False, description="Whether to include images") @@ -38,7 +37,7 @@ def send_all_playlists(query: SendAllPlaylistsQuery): """ Gets all the playlists. """ - playlists = PL.get_all_playlists() + playlists = PlaylistTable.get_all() playlists = list(playlists) for playlist in playlists: @@ -63,18 +62,21 @@ def insert_playlist(name: str, image: str = None): "image": image, "last_updated": create_new_date(), "name": name, - "trackhashes": json.dumps([]), - "settings": json.dumps( - { - "has_gif": False, - "banner_pos": 50, - "square_img": True if image else False, - "pinned": False, - } - ), + "trackhashes": [], + "settings": { + "has_gif": False, + "banner_pos": 50, + "square_img": True if image else False, + "pinned": False, + }, } - return PL.insert_one_playlist(playlist) + rowid = PlaylistTable.add_one(playlist) + if rowid: + playlist["id"] = rowid + return Playlist(**playlist) + + return None class CreatePlaylistBody(BaseModel): @@ -88,9 +90,10 @@ def create_playlist(body: CreatePlaylistBody): Creates a new playlist. Accepts POST method with a JSON body. """ - existing_playlist_count = PL.count_playlist_by_name(body.name) + # existing_playlist_count = PL.count_playlist_by_name(body.name) + exists = PlaylistTable.check_exists_by_name(body.name) - if existing_playlist_count > 0: + if exists: return {"error": "Playlist already exists"}, 409 playlist = insert_playlist(body.name) @@ -105,8 +108,7 @@ def get_path_trackhashes(path: str): """ Returns a list of trackhashes in a folder. """ - tracks = TrackStore.get_tracks_in_path(path) - tracks = sorted(tracks, key=lambda t: t.last_mod) + tracks = TrackTable.get_tracks_in_path(path) return [t.trackhash for t in tracks] @@ -114,17 +116,17 @@ def get_album_trackhashes(albumhash: str): """ Returns a list of trackhashes in an album. """ - tracks = TrackStore.get_tracks_by_albumhash(albumhash) + tracks = TrackTable.get_tracks_by_albumhash(albumhash) tracks = sort_by_track_no(tracks) - return [t["trackhash"] for t in tracks] + return [t.trackhash for t in tracks] def get_artist_trackhashes(artisthash: str): """ Returns a list of trackhashes for an artist. """ - tracks = TrackStore.get_tracks_by_artisthash(artisthash) + tracks = TrackTable.get_tracks_by_artisthash(artisthash) return [t.trackhash for t in tracks] @@ -164,10 +166,11 @@ def add_item_to_playlist(path: PlaylistIDPath, body: AddItemToPlaylistBody): else: trackhashes = [] - insert_count = PL.add_tracks_to_playlist(int(playlist_id), trackhashes) + # insert_count = PL.add_tracks_to_playlist(int(playlist_id), trackhashes) + PlaylistTable.append_to_playlist(int(playlist_id), trackhashes) - if insert_count == 0: - return {"error": "Item already exists in playlist"}, 409 + # if insert_count == 0: + # return {"error": "Item already exists in playlist"}, 409 return {"msg": "Done"}, 200 @@ -179,11 +182,10 @@ class GetPlaylistQuery(BaseModel): def format_custom_playlist(playlist: models.Playlist, tracks: list[models.Track]): duration = sum(t.duration for t in tracks) - playlist.set_duration(duration) - playlist = serialize_for_card(playlist) + playlist.duration = duration return { - "info": playlist, + "info": serialize_for_card(playlist), "tracks": tracks, } @@ -209,19 +211,20 @@ def get_playlist(path: PlaylistIDPath, query: GetPlaylistQuery): playlist, tracks = handler() return format_custom_playlist(playlist, tracks) - playlist = PL.get_playlist_by_id(int(playlistid)) + # playlist = PL.get_playlist_by_id(int(playlistid)) + playlist = PlaylistTable.get_by_id(playlistid) if playlist is None: return {"msg": "Playlist not found"}, 404 - tracks = TrackStore.get_tracks_by_trackhashes(list(playlist.trackhashes)) - + # tracks = TrackStore.get_tracks_by_trackhashes(list(playlist.trackhashes)) + tracks = TrackTable.get_tracks_by_trackhashes(playlist.trackhashes) tracks = remove_duplicates(tracks) + duration = sum(t.duration for t in tracks) playlist.last_updated = date_string_to_time_passed(playlist.last_updated) - playlist.set_duration(duration) - playlist.set_count(len(tracks)) + playlist.duration = duration if not playlist.has_image: playlist.images = playlistlib.get_first_4_images(tracks) @@ -247,7 +250,8 @@ def update_playlist_info(path: PlaylistIDPath, form: UpdatePlaylistForm): Update playlist """ playlistid = path.playlistid - db_playlist = PL.get_playlist_by_id(playlistid) + # db_playlist = PL.get_playlist_by_id(playlistid) + db_playlist = PlaylistTable.get_by_id(playlistid) if db_playlist is None: return {"error": "Playlist not found"}, 404 @@ -286,7 +290,8 @@ def update_playlist_info(path: PlaylistIDPath, form: UpdatePlaylistForm): p_tuple = (*playlist.values(),) - PL.update_playlist(playlistid, playlist) + # PL.update_playlist(playlistid, playlist) + PlaylistTable.update_one(playlistid, playlist) playlist = models.Playlist(*p_tuple) playlist.last_updated = date_string_to_time_passed(playlist.last_updated) @@ -301,7 +306,9 @@ def pin_unpin_playlist(path: PlaylistIDPath): """ Pin playlist. """ - playlist = PL.get_playlist_by_id(path.playlistid) + # playlist = PL.get_playlist_by_id(path.playlistid) + + playlist = PlaylistTable.get_by_id(path.playlistid) if playlist is None: return {"error": "Playlist not found"}, 404 @@ -313,8 +320,8 @@ def pin_unpin_playlist(path: PlaylistIDPath): except KeyError: settings["pinned"] = True - PL.update_settings(path.playlistid, settings) - + # PL.update_settings(path.playlistid, settings) + PlaylistTable.update_settings(path.playlistid, settings) return {"msg": "Done"}, 200 @@ -323,12 +330,14 @@ def remove_playlist_image(path: PlaylistIDPath): """ Clear playlist image. """ - playlist = PL.get_playlist_by_id(path.playlistid) + # playlist = PL.get_playlist_by_id(path.playlistid) + playlist = PlaylistTable.get_by_id(path.playlistid) if playlist is None: return {"error": "Playlist not found"}, 404 - PL.remove_banner(path.playlistid) + # PL.remove_banner(path.playlistid) + PlaylistTable.remove_image(path.playlistid) playlist.image = None playlist.thumb = None @@ -346,7 +355,8 @@ def remove_playlist(path: PlaylistIDPath): """ Delete playlist """ - PL.delete_playlist(path.playlistid) + # PL.delete_playlist(path.playlistid) + PlaylistTable.remove_one(path.playlistid) return {"msg": "Done"}, 200 @@ -368,15 +378,12 @@ def remove_tracks_from_playlist( # index: int; # } - PL.remove_tracks_from_playlist(path.playlistid, body.tracks) + # PL.remove_tracks_from_playlist(path.playlistid, body.tracks) + PlaylistTable.remove_from_playlist(path.playlistid, body.tracks) return {"msg": "Done"}, 200 -def playlist_name_exists(name: str) -> bool: - return PL.count_playlist_by_name(name) > 0 - - class SavePlaylistAsItemBody(BaseModel): itemtype: str = Field(..., description="The type of item", example="tracks") playlist_name: str = Field(..., description="The name of the playlist") @@ -394,7 +401,7 @@ def save_item_as_playlist(body: SavePlaylistAsItemBody): playlist_name = body.playlist_name itemhash = body.itemhash - if playlist_name_exists(playlist_name): + if PlaylistTable.check_exists_by_name(playlist_name): return {"error": "Playlist already exists"}, 409 if itemtype == "tracks": @@ -437,8 +444,9 @@ def save_item_as_playlist(body: SavePlaylistAsItemBody): img, str(playlist.id), "image/webp", filename=filename ) - PL.add_tracks_to_playlist(playlist.id, trackhashes) - playlist.set_count(len(trackhashes)) + # PL.add_tracks_to_playlist(playlist.id, trackhashes) + PlaylistTable.append_to_playlist(playlist.id, trackhashes) + playlist.count = len(trackhashes) images = playlistlib.get_first_4_images(trackhashes=trackhashes) playlist.images = [img["image"] for img in images] diff --git a/app/db/__init__.py b/app/db/__init__.py index 853a0eac..4b1c4366 100644 --- a/app/db/__init__.py +++ b/app/db/__init__.py @@ -78,6 +78,10 @@ class Base(MappedAsDataclass, DeclarativeBase): with DbManager(commit=True) as conn: conn.execute(delete(cls)) + @classmethod + def remove_one(cls, id: int): + cls.execute(delete(cls).where(cls.id == id), commit=True) + @classmethod def all(cls): return cls.execute(select(cls)) diff --git a/app/db/userdata.py b/app/db/userdata.py index 1ca026a5..1e9407f0 100644 --- a/app/db/userdata.py +++ b/app/db/userdata.py @@ -22,6 +22,8 @@ from app.db.utils import ( albums_to_dataclasses, artists_to_dataclasses, favorites_to_dataclass, + playlist_to_dataclass, + playlists_to_dataclasses, plugin_to_dataclasses, similar_artist_to_dataclass, similar_artists_to_dataclass, @@ -168,7 +170,6 @@ class FavoritesTable(Base): JSON(), nullable=True, default_factory=dict ) - @classmethod def get_all(cls): with DbManager() as conn: @@ -274,3 +275,109 @@ class ScrobbleTable(Base): ) return tracklog_to_dataclasses(result.fetchall()) + + +class PlaylistTable(Base): + __tablename__ = "playlist" + + id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column(String(), index=True) + last_updated: Mapped[int] = mapped_column(Integer()) + image: Mapped[str] = mapped_column(String(), nullable=True) + userid: Mapped[int] = mapped_column( + Integer(), ForeignKey("user.id", ondelete="cascade") + ) + settings: Mapped[dict[str, Any]] = mapped_column(JSON()) + trackhashes: Mapped[list[str]] = mapped_column(JSON(), default_factory=list) + extra: Mapped[dict[str, Any]] = mapped_column( + JSON(), nullable=True, default_factory=dict + ) + + @classmethod + def get_all(cls): + result = cls.all() + return playlists_to_dataclasses(result) + + @classmethod + def add_one(cls, playlist: dict[str, Any]): + playlist["userid"] = get_current_userid() + result = cls.insert_one(playlist) + return result.lastrowid + + @classmethod + def check_exists_by_name(cls, name: str): + result = cls.execute( + select(cls).where((cls.name == name) & (cls.userid == get_current_userid())) + ) + return result.fetchone() is not None + + @classmethod + def append_to_playlist(cls, id: int, trackhashes: list[str]): + print("type(trackhashes):", type(trackhashes)) + return cls.execute( + update(cls) + .where((cls.id == id) & (cls.userid == get_current_userid())) + .values(trackhashes=cls.trackhashes + trackhashes), + commit=True, + ) + + @classmethod + def remove_from_playlist(cls, id: int, trackhashes: list[dict[str, Any]]): + # CHECKPOINT: Properly remove tracks from a playlist + # Without messing up the order in case of duplicates + tracks = cls.execute( + select(cls.trackhashes).where( + (cls.id == id) & (cls.userid == get_current_userid()) + ) + ) + + results = tracks.fetchone() + if results: + dbhashes: list[str] = results[0] + + for item in trackhashes: + if dbhashes.index(item["trackhash"]) == item["index"]: + dbhashes.remove(item["trackhash"]) + + return cls.execute( + update(cls) + .where((cls.id == id) & (cls.userid == get_current_userid())) + .values(trackhashes=dbhashes), + commit=True, + ) + + @classmethod + def get_by_id(cls, id: int): + result = cls.execute( + select(cls).where((cls.id == id) & (cls.userid == get_current_userid())) + ) + result = result.fetchone() + if result: + return playlist_to_dataclass(result) + + @classmethod + def update_one(cls, id: int, playlist: dict[str, Any]): + return cls.execute( + update(cls) + .where((cls.id == id) & (cls.userid == get_current_userid())) + .values(playlist), + commit=True, + ) + + @classmethod + def update_settings(cls, id: int, settings: dict[str, Any]): + return cls.execute( + update(cls) + .where((cls.id == id) & (cls.userid == get_current_userid())) + .values(settings=settings), + commit=True, + ) + + @classmethod + def remove_image(cls, id: int): + return cls.execute( + update(cls) + .where((cls.id == id) & (cls.userid == get_current_userid())) + .values(image=None), + commit=True, + ) diff --git a/app/db/utils.py b/app/db/utils.py index 0ed92c2c..550a7537 100644 --- a/app/db/utils.py +++ b/app/db/utils.py @@ -4,6 +4,7 @@ from app.models import Album as AlbumModel, Artist as ArtistModel, Track as Trac from app.models.favorite import Favorite from app.models.lastfm import SimilarArtist from app.models.logger import TrackLog +from app.models.playlist import Playlist from app.models.plugins import Plugin from app.models.user import User @@ -75,9 +76,20 @@ def plugin_to_dataclass(entry: Any): def plugin_to_dataclasses(entries: Any): return [plugin_to_dataclass(entry) for entry in entries] + def tracklog_to_dataclass(entry: Any): entry_dict = entry._asdict() return TrackLog(**entry_dict) + def tracklog_to_dataclasses(entries: Any): - return [tracklog_to_dataclass(entry) for entry in entries] \ No newline at end of file + return [tracklog_to_dataclass(entry) for entry in entries] + + +def playlist_to_dataclass(entry: Any): + entry_dict = entry._asdict() + return Playlist(**entry_dict) + + +def playlists_to_dataclasses(entries: Any): + return [playlist_to_dataclass(entry) for entry in entries] diff --git a/app/models/playlist.py b/app/models/playlist.py index 34e95c3f..d413010d 100644 --- a/app/models/playlist.py +++ b/app/models/playlist.py @@ -2,6 +2,7 @@ import dataclasses import json from dataclasses import dataclass from pathlib import Path +from typing import Any from app import settings @@ -14,10 +15,12 @@ class Playlist: image: str | None last_updated: str name: str - settings: str | dict - trackhashes: str | list[str] + settings: dict + userid: int + trackhashes: list[str] + extra: dict[str, Any] = dataclasses.field(default_factory=dict) - thumb: str | None = "" + thumb: str = "" count: int = 0 duration: int = 0 has_image: bool = False @@ -25,11 +28,11 @@ class Playlist: pinned: bool = False def __post_init__(self): - self.trackhashes = json.loads(str(self.trackhashes)) - self.count = len(self.trackhashes) + # self.trackhashes = json.loads(str(self.trackhashes)) + # self.count = len(self.trackhashes) - if isinstance(self.settings, str): - self.settings = dict(json.loads(self.settings)) + # if isinstance(self.settings, str): + # self.settings = dict(json.loads(self.settings)) self.pinned = self.settings.get("pinned", False) self.has_image = ( @@ -42,11 +45,11 @@ class Playlist: self.image = "None" self.thumb = "None" - def set_duration(self, duration: int): - self.duration = duration + # def set_duration(self, duration: int): + # self.duration = duration - def set_count(self, count: int): - self.count = count + # def set_count(self, count: int): + # self.count = count def clear_lists(self): """