mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
start porting: playlists endpoints
This commit is contained in:
+53
-45
@@ -12,13 +12,14 @@ from flask_openapi3 import Tag
|
||||
from flask_openapi3 import APIBlueprint, FileStorage
|
||||
|
||||
from app import models
|
||||
from app.db.sqlite.playlists import SQLitePlaylistMethods
|
||||
from app.db.libdata import TrackTable
|
||||
from app.db.userdata import PlaylistTable
|
||||
from app.lib import playlistlib
|
||||
from app.lib.albumslib import sort_by_track_no
|
||||
from app.lib.home.recentlyadded import get_recently_added_playlist
|
||||
from app.lib.home.recentlyplayed import get_recently_played_playlist
|
||||
from app.models.playlist import Playlist
|
||||
from app.serializers.playlist import serialize_for_card
|
||||
from app.store.tracks import TrackStore
|
||||
from app.utils.dates import create_new_date, date_string_to_time_passed
|
||||
from app.utils.remove_duplicates import remove_duplicates
|
||||
from app.settings import Paths
|
||||
@@ -26,8 +27,6 @@ from app.settings import Paths
|
||||
tag = Tag(name="Playlists", description="Get and manage playlists")
|
||||
api = APIBlueprint("playlists", __name__, url_prefix="/playlists", abp_tags=[tag])
|
||||
|
||||
PL = SQLitePlaylistMethods
|
||||
|
||||
|
||||
class SendAllPlaylistsQuery(BaseModel):
|
||||
no_images: bool = Field(False, description="Whether to include images")
|
||||
@@ -38,7 +37,7 @@ def send_all_playlists(query: SendAllPlaylistsQuery):
|
||||
"""
|
||||
Gets all the playlists.
|
||||
"""
|
||||
playlists = PL.get_all_playlists()
|
||||
playlists = PlaylistTable.get_all()
|
||||
playlists = list(playlists)
|
||||
|
||||
for playlist in playlists:
|
||||
@@ -63,18 +62,21 @@ def insert_playlist(name: str, image: str = None):
|
||||
"image": image,
|
||||
"last_updated": create_new_date(),
|
||||
"name": name,
|
||||
"trackhashes": json.dumps([]),
|
||||
"settings": json.dumps(
|
||||
{
|
||||
"trackhashes": [],
|
||||
"settings": {
|
||||
"has_gif": False,
|
||||
"banner_pos": 50,
|
||||
"square_img": True if image else False,
|
||||
"pinned": False,
|
||||
}
|
||||
),
|
||||
},
|
||||
}
|
||||
|
||||
return PL.insert_one_playlist(playlist)
|
||||
rowid = PlaylistTable.add_one(playlist)
|
||||
if rowid:
|
||||
playlist["id"] = rowid
|
||||
return Playlist(**playlist)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class CreatePlaylistBody(BaseModel):
|
||||
@@ -88,9 +90,10 @@ def create_playlist(body: CreatePlaylistBody):
|
||||
|
||||
Creates a new playlist. Accepts POST method with a JSON body.
|
||||
"""
|
||||
existing_playlist_count = PL.count_playlist_by_name(body.name)
|
||||
# existing_playlist_count = PL.count_playlist_by_name(body.name)
|
||||
exists = PlaylistTable.check_exists_by_name(body.name)
|
||||
|
||||
if existing_playlist_count > 0:
|
||||
if exists:
|
||||
return {"error": "Playlist already exists"}, 409
|
||||
|
||||
playlist = insert_playlist(body.name)
|
||||
@@ -105,8 +108,7 @@ def get_path_trackhashes(path: str):
|
||||
"""
|
||||
Returns a list of trackhashes in a folder.
|
||||
"""
|
||||
tracks = TrackStore.get_tracks_in_path(path)
|
||||
tracks = sorted(tracks, key=lambda t: t.last_mod)
|
||||
tracks = TrackTable.get_tracks_in_path(path)
|
||||
return [t.trackhash for t in tracks]
|
||||
|
||||
|
||||
@@ -114,17 +116,17 @@ def get_album_trackhashes(albumhash: str):
|
||||
"""
|
||||
Returns a list of trackhashes in an album.
|
||||
"""
|
||||
tracks = TrackStore.get_tracks_by_albumhash(albumhash)
|
||||
tracks = TrackTable.get_tracks_by_albumhash(albumhash)
|
||||
tracks = sort_by_track_no(tracks)
|
||||
|
||||
return [t["trackhash"] for t in tracks]
|
||||
return [t.trackhash for t in tracks]
|
||||
|
||||
|
||||
def get_artist_trackhashes(artisthash: str):
|
||||
"""
|
||||
Returns a list of trackhashes for an artist.
|
||||
"""
|
||||
tracks = TrackStore.get_tracks_by_artisthash(artisthash)
|
||||
tracks = TrackTable.get_tracks_by_artisthash(artisthash)
|
||||
return [t.trackhash for t in tracks]
|
||||
|
||||
|
||||
@@ -164,10 +166,11 @@ def add_item_to_playlist(path: PlaylistIDPath, body: AddItemToPlaylistBody):
|
||||
else:
|
||||
trackhashes = []
|
||||
|
||||
insert_count = PL.add_tracks_to_playlist(int(playlist_id), trackhashes)
|
||||
# insert_count = PL.add_tracks_to_playlist(int(playlist_id), trackhashes)
|
||||
PlaylistTable.append_to_playlist(int(playlist_id), trackhashes)
|
||||
|
||||
if insert_count == 0:
|
||||
return {"error": "Item already exists in playlist"}, 409
|
||||
# if insert_count == 0:
|
||||
# return {"error": "Item already exists in playlist"}, 409
|
||||
|
||||
return {"msg": "Done"}, 200
|
||||
|
||||
@@ -179,11 +182,10 @@ class GetPlaylistQuery(BaseModel):
|
||||
def format_custom_playlist(playlist: models.Playlist, tracks: list[models.Track]):
|
||||
duration = sum(t.duration for t in tracks)
|
||||
|
||||
playlist.set_duration(duration)
|
||||
playlist = serialize_for_card(playlist)
|
||||
playlist.duration = duration
|
||||
|
||||
return {
|
||||
"info": playlist,
|
||||
"info": serialize_for_card(playlist),
|
||||
"tracks": tracks,
|
||||
}
|
||||
|
||||
@@ -209,19 +211,20 @@ def get_playlist(path: PlaylistIDPath, query: GetPlaylistQuery):
|
||||
playlist, tracks = handler()
|
||||
return format_custom_playlist(playlist, tracks)
|
||||
|
||||
playlist = PL.get_playlist_by_id(int(playlistid))
|
||||
# playlist = PL.get_playlist_by_id(int(playlistid))
|
||||
playlist = PlaylistTable.get_by_id(playlistid)
|
||||
|
||||
if playlist is None:
|
||||
return {"msg": "Playlist not found"}, 404
|
||||
|
||||
tracks = TrackStore.get_tracks_by_trackhashes(list(playlist.trackhashes))
|
||||
|
||||
# tracks = TrackStore.get_tracks_by_trackhashes(list(playlist.trackhashes))
|
||||
tracks = TrackTable.get_tracks_by_trackhashes(playlist.trackhashes)
|
||||
tracks = remove_duplicates(tracks)
|
||||
|
||||
duration = sum(t.duration for t in tracks)
|
||||
playlist.last_updated = date_string_to_time_passed(playlist.last_updated)
|
||||
|
||||
playlist.set_duration(duration)
|
||||
playlist.set_count(len(tracks))
|
||||
playlist.duration = duration
|
||||
|
||||
if not playlist.has_image:
|
||||
playlist.images = playlistlib.get_first_4_images(tracks)
|
||||
@@ -247,7 +250,8 @@ def update_playlist_info(path: PlaylistIDPath, form: UpdatePlaylistForm):
|
||||
Update playlist
|
||||
"""
|
||||
playlistid = path.playlistid
|
||||
db_playlist = PL.get_playlist_by_id(playlistid)
|
||||
# db_playlist = PL.get_playlist_by_id(playlistid)
|
||||
db_playlist = PlaylistTable.get_by_id(playlistid)
|
||||
|
||||
if db_playlist is None:
|
||||
return {"error": "Playlist not found"}, 404
|
||||
@@ -286,7 +290,8 @@ def update_playlist_info(path: PlaylistIDPath, form: UpdatePlaylistForm):
|
||||
|
||||
p_tuple = (*playlist.values(),)
|
||||
|
||||
PL.update_playlist(playlistid, playlist)
|
||||
# PL.update_playlist(playlistid, playlist)
|
||||
PlaylistTable.update_one(playlistid, playlist)
|
||||
|
||||
playlist = models.Playlist(*p_tuple)
|
||||
playlist.last_updated = date_string_to_time_passed(playlist.last_updated)
|
||||
@@ -301,7 +306,9 @@ def pin_unpin_playlist(path: PlaylistIDPath):
|
||||
"""
|
||||
Pin playlist.
|
||||
"""
|
||||
playlist = PL.get_playlist_by_id(path.playlistid)
|
||||
# playlist = PL.get_playlist_by_id(path.playlistid)
|
||||
|
||||
playlist = PlaylistTable.get_by_id(path.playlistid)
|
||||
|
||||
if playlist is None:
|
||||
return {"error": "Playlist not found"}, 404
|
||||
@@ -313,8 +320,8 @@ def pin_unpin_playlist(path: PlaylistIDPath):
|
||||
except KeyError:
|
||||
settings["pinned"] = True
|
||||
|
||||
PL.update_settings(path.playlistid, settings)
|
||||
|
||||
# PL.update_settings(path.playlistid, settings)
|
||||
PlaylistTable.update_settings(path.playlistid, settings)
|
||||
return {"msg": "Done"}, 200
|
||||
|
||||
|
||||
@@ -323,12 +330,14 @@ def remove_playlist_image(path: PlaylistIDPath):
|
||||
"""
|
||||
Clear playlist image.
|
||||
"""
|
||||
playlist = PL.get_playlist_by_id(path.playlistid)
|
||||
# playlist = PL.get_playlist_by_id(path.playlistid)
|
||||
playlist = PlaylistTable.get_by_id(path.playlistid)
|
||||
|
||||
if playlist is None:
|
||||
return {"error": "Playlist not found"}, 404
|
||||
|
||||
PL.remove_banner(path.playlistid)
|
||||
# PL.remove_banner(path.playlistid)
|
||||
PlaylistTable.remove_image(path.playlistid)
|
||||
|
||||
playlist.image = None
|
||||
playlist.thumb = None
|
||||
@@ -346,7 +355,8 @@ def remove_playlist(path: PlaylistIDPath):
|
||||
"""
|
||||
Delete playlist
|
||||
"""
|
||||
PL.delete_playlist(path.playlistid)
|
||||
# PL.delete_playlist(path.playlistid)
|
||||
PlaylistTable.remove_one(path.playlistid)
|
||||
|
||||
return {"msg": "Done"}, 200
|
||||
|
||||
@@ -368,15 +378,12 @@ def remove_tracks_from_playlist(
|
||||
# index: int;
|
||||
# }
|
||||
|
||||
PL.remove_tracks_from_playlist(path.playlistid, body.tracks)
|
||||
# PL.remove_tracks_from_playlist(path.playlistid, body.tracks)
|
||||
PlaylistTable.remove_from_playlist(path.playlistid, body.tracks)
|
||||
|
||||
return {"msg": "Done"}, 200
|
||||
|
||||
|
||||
def playlist_name_exists(name: str) -> bool:
|
||||
return PL.count_playlist_by_name(name) > 0
|
||||
|
||||
|
||||
class SavePlaylistAsItemBody(BaseModel):
|
||||
itemtype: str = Field(..., description="The type of item", example="tracks")
|
||||
playlist_name: str = Field(..., description="The name of the playlist")
|
||||
@@ -394,7 +401,7 @@ def save_item_as_playlist(body: SavePlaylistAsItemBody):
|
||||
playlist_name = body.playlist_name
|
||||
itemhash = body.itemhash
|
||||
|
||||
if playlist_name_exists(playlist_name):
|
||||
if PlaylistTable.check_exists_by_name(playlist_name):
|
||||
return {"error": "Playlist already exists"}, 409
|
||||
|
||||
if itemtype == "tracks":
|
||||
@@ -437,8 +444,9 @@ def save_item_as_playlist(body: SavePlaylistAsItemBody):
|
||||
img, str(playlist.id), "image/webp", filename=filename
|
||||
)
|
||||
|
||||
PL.add_tracks_to_playlist(playlist.id, trackhashes)
|
||||
playlist.set_count(len(trackhashes))
|
||||
# PL.add_tracks_to_playlist(playlist.id, trackhashes)
|
||||
PlaylistTable.append_to_playlist(playlist.id, trackhashes)
|
||||
playlist.count = len(trackhashes)
|
||||
|
||||
images = playlistlib.get_first_4_images(trackhashes=trackhashes)
|
||||
playlist.images = [img["image"] for img in images]
|
||||
|
||||
@@ -78,6 +78,10 @@ class Base(MappedAsDataclass, DeclarativeBase):
|
||||
with DbManager(commit=True) as conn:
|
||||
conn.execute(delete(cls))
|
||||
|
||||
@classmethod
|
||||
def remove_one(cls, id: int):
|
||||
cls.execute(delete(cls).where(cls.id == id), commit=True)
|
||||
|
||||
@classmethod
|
||||
def all(cls):
|
||||
return cls.execute(select(cls))
|
||||
|
||||
+108
-1
@@ -22,6 +22,8 @@ from app.db.utils import (
|
||||
albums_to_dataclasses,
|
||||
artists_to_dataclasses,
|
||||
favorites_to_dataclass,
|
||||
playlist_to_dataclass,
|
||||
playlists_to_dataclasses,
|
||||
plugin_to_dataclasses,
|
||||
similar_artist_to_dataclass,
|
||||
similar_artists_to_dataclass,
|
||||
@@ -168,7 +170,6 @@ class FavoritesTable(Base):
|
||||
JSON(), nullable=True, default_factory=dict
|
||||
)
|
||||
|
||||
|
||||
@classmethod
|
||||
def get_all(cls):
|
||||
with DbManager() as conn:
|
||||
@@ -274,3 +275,109 @@ class ScrobbleTable(Base):
|
||||
)
|
||||
|
||||
return tracklog_to_dataclasses(result.fetchall())
|
||||
|
||||
|
||||
class PlaylistTable(Base):
|
||||
__tablename__ = "playlist"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
name: Mapped[str] = mapped_column(String(), index=True)
|
||||
last_updated: Mapped[int] = mapped_column(Integer())
|
||||
image: Mapped[str] = mapped_column(String(), nullable=True)
|
||||
userid: Mapped[int] = mapped_column(
|
||||
Integer(), ForeignKey("user.id", ondelete="cascade")
|
||||
)
|
||||
settings: Mapped[dict[str, Any]] = mapped_column(JSON())
|
||||
trackhashes: Mapped[list[str]] = mapped_column(JSON(), default_factory=list)
|
||||
extra: Mapped[dict[str, Any]] = mapped_column(
|
||||
JSON(), nullable=True, default_factory=dict
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_all(cls):
|
||||
result = cls.all()
|
||||
return playlists_to_dataclasses(result)
|
||||
|
||||
@classmethod
|
||||
def add_one(cls, playlist: dict[str, Any]):
|
||||
playlist["userid"] = get_current_userid()
|
||||
result = cls.insert_one(playlist)
|
||||
return result.lastrowid
|
||||
|
||||
@classmethod
|
||||
def check_exists_by_name(cls, name: str):
|
||||
result = cls.execute(
|
||||
select(cls).where((cls.name == name) & (cls.userid == get_current_userid()))
|
||||
)
|
||||
return result.fetchone() is not None
|
||||
|
||||
@classmethod
|
||||
def append_to_playlist(cls, id: int, trackhashes: list[str]):
|
||||
print("type(trackhashes):", type(trackhashes))
|
||||
return cls.execute(
|
||||
update(cls)
|
||||
.where((cls.id == id) & (cls.userid == get_current_userid()))
|
||||
.values(trackhashes=cls.trackhashes + trackhashes),
|
||||
commit=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def remove_from_playlist(cls, id: int, trackhashes: list[dict[str, Any]]):
|
||||
# CHECKPOINT: Properly remove tracks from a playlist
|
||||
# Without messing up the order in case of duplicates
|
||||
tracks = cls.execute(
|
||||
select(cls.trackhashes).where(
|
||||
(cls.id == id) & (cls.userid == get_current_userid())
|
||||
)
|
||||
)
|
||||
|
||||
results = tracks.fetchone()
|
||||
if results:
|
||||
dbhashes: list[str] = results[0]
|
||||
|
||||
for item in trackhashes:
|
||||
if dbhashes.index(item["trackhash"]) == item["index"]:
|
||||
dbhashes.remove(item["trackhash"])
|
||||
|
||||
return cls.execute(
|
||||
update(cls)
|
||||
.where((cls.id == id) & (cls.userid == get_current_userid()))
|
||||
.values(trackhashes=dbhashes),
|
||||
commit=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_by_id(cls, id: int):
|
||||
result = cls.execute(
|
||||
select(cls).where((cls.id == id) & (cls.userid == get_current_userid()))
|
||||
)
|
||||
result = result.fetchone()
|
||||
if result:
|
||||
return playlist_to_dataclass(result)
|
||||
|
||||
@classmethod
|
||||
def update_one(cls, id: int, playlist: dict[str, Any]):
|
||||
return cls.execute(
|
||||
update(cls)
|
||||
.where((cls.id == id) & (cls.userid == get_current_userid()))
|
||||
.values(playlist),
|
||||
commit=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def update_settings(cls, id: int, settings: dict[str, Any]):
|
||||
return cls.execute(
|
||||
update(cls)
|
||||
.where((cls.id == id) & (cls.userid == get_current_userid()))
|
||||
.values(settings=settings),
|
||||
commit=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def remove_image(cls, id: int):
|
||||
return cls.execute(
|
||||
update(cls)
|
||||
.where((cls.id == id) & (cls.userid == get_current_userid()))
|
||||
.values(image=None),
|
||||
commit=True,
|
||||
)
|
||||
|
||||
@@ -4,6 +4,7 @@ from app.models import Album as AlbumModel, Artist as ArtistModel, Track as Trac
|
||||
from app.models.favorite import Favorite
|
||||
from app.models.lastfm import SimilarArtist
|
||||
from app.models.logger import TrackLog
|
||||
from app.models.playlist import Playlist
|
||||
from app.models.plugins import Plugin
|
||||
from app.models.user import User
|
||||
|
||||
@@ -75,9 +76,20 @@ def plugin_to_dataclass(entry: Any):
|
||||
def plugin_to_dataclasses(entries: Any):
|
||||
return [plugin_to_dataclass(entry) for entry in entries]
|
||||
|
||||
|
||||
def tracklog_to_dataclass(entry: Any):
|
||||
entry_dict = entry._asdict()
|
||||
return TrackLog(**entry_dict)
|
||||
|
||||
|
||||
def tracklog_to_dataclasses(entries: Any):
|
||||
return [tracklog_to_dataclass(entry) for entry in entries]
|
||||
|
||||
|
||||
def playlist_to_dataclass(entry: Any):
|
||||
entry_dict = entry._asdict()
|
||||
return Playlist(**entry_dict)
|
||||
|
||||
|
||||
def playlists_to_dataclasses(entries: Any):
|
||||
return [playlist_to_dataclass(entry) for entry in entries]
|
||||
|
||||
+14
-11
@@ -2,6 +2,7 @@ import dataclasses
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from app import settings
|
||||
|
||||
@@ -14,10 +15,12 @@ class Playlist:
|
||||
image: str | None
|
||||
last_updated: str
|
||||
name: str
|
||||
settings: str | dict
|
||||
trackhashes: str | list[str]
|
||||
settings: dict
|
||||
userid: int
|
||||
trackhashes: list[str]
|
||||
extra: dict[str, Any] = dataclasses.field(default_factory=dict)
|
||||
|
||||
thumb: str | None = ""
|
||||
thumb: str = ""
|
||||
count: int = 0
|
||||
duration: int = 0
|
||||
has_image: bool = False
|
||||
@@ -25,11 +28,11 @@ class Playlist:
|
||||
pinned: bool = False
|
||||
|
||||
def __post_init__(self):
|
||||
self.trackhashes = json.loads(str(self.trackhashes))
|
||||
self.count = len(self.trackhashes)
|
||||
# self.trackhashes = json.loads(str(self.trackhashes))
|
||||
# self.count = len(self.trackhashes)
|
||||
|
||||
if isinstance(self.settings, str):
|
||||
self.settings = dict(json.loads(self.settings))
|
||||
# if isinstance(self.settings, str):
|
||||
# self.settings = dict(json.loads(self.settings))
|
||||
|
||||
self.pinned = self.settings.get("pinned", False)
|
||||
self.has_image = (
|
||||
@@ -42,11 +45,11 @@ class Playlist:
|
||||
self.image = "None"
|
||||
self.thumb = "None"
|
||||
|
||||
def set_duration(self, duration: int):
|
||||
self.duration = duration
|
||||
# def set_duration(self, duration: int):
|
||||
# self.duration = duration
|
||||
|
||||
def set_count(self, count: int):
|
||||
self.count = count
|
||||
# def set_count(self, count: int):
|
||||
# self.count = count
|
||||
|
||||
def clear_lists(self):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user