Reorganize repository structure for better organization

- Move backend code to swingmusic/ folder
- Move client applications to root level (swingmusic-android, swingmusic-desktop, swingmusic-webclient)
- Remove intermediate backend/ and clients/ folders
- Update README with new folder structure and setup instructions
- Clean and organized repository layout
This commit is contained in:
Tomas Dvorak
2026-03-17 22:34:34 +01:00
parent 17e859dd2f
commit 4c04287800
206 changed files with 14 additions and 7 deletions
-66
View File
@@ -1,66 +0,0 @@
from typing import Any
from sqlalchemy import (
delete,
func,
insert,
select,
)
from sqlalchemy.orm import DeclarativeBase, MappedAsDataclass
from swingmusic.db.engine import DbEngine
class Base(MappedAsDataclass, DeclarativeBase):
"""
Base class for all database models.
It has methods common to all tables. eg. `insert_one`, `insert_many`, `remove_all`, `remove_one`, `all`, `count`.
"""
@classmethod
def execute(cls, stmt: Any, commit: bool = False):
with DbEngine.manager(commit=commit) as session:
result = session.execute(stmt.execution_options(yield_per=100))
if commit:
session.commit()
yield result
@classmethod
def insert_many(cls, items: list[dict[str, Any]]):
"""
Inserts multiple items into the database.
"""
return next(cls.execute(insert(cls).values(items), commit=True))
@classmethod
def insert_one(cls, item: dict[str, Any]):
"""
Inserts a single item into the database.
"""
return cls.insert_many([item])
@classmethod
def remove_all(cls):
return next(cls.execute(delete(cls), commit=True))
@classmethod
def remove_one(cls, id: int):
return next(cls.execute(delete(cls).where(cls.id == id), commit=True))
@classmethod
def all(cls):
return next(cls.execute(select(cls).execution_options(yield_per=100)))
@classmethod
def count(cls):
return next(cls.execute(select(func.count()).select_from(cls))).scalar()
def create_all_tables():
"""
Creates all the tables that build on the Base class.
"""
Base().metadata.create_all(DbEngine.engine)
-78
View File
@@ -1,78 +0,0 @@
from contextlib import contextmanager
from sqlalchemy import Engine, create_engine, event
from sqlalchemy.orm import sessionmaker
from swingmusic.settings import Paths
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA synchronous=NORMAL")
cursor.execute("PRAGMA cache_size=10000")
cursor.execute("PRAGMA foreign_keys=ON")
cursor.execute("PRAGMA temp_store=FILE")
cursor.execute("PRAGMA mmap_size=0")
cursor.close()
class classproperty(property):
"""
A class property decorator.
"""
def __get__(self, owner_self, owner_cls):
if self.fget:
return self.fget(owner_cls)
class DbEngine:
"""
The database engine instance.
"""
_engine: Engine | None = None
@classproperty
def engine(cls) -> Engine:
if not cls._engine:
cls._engine = create_engine(
f"sqlite+pysqlite:///{Paths().app_db_path}",
echo=False,
max_overflow=20,
pool_size=10,
)
return cls._engine
@classmethod
@contextmanager
def manager(cls, commit: bool = False):
"""
This context manager manages access to the database.
When the context manager is entered, it returns a session object that can be used to execute SQL statements.
If the `commit` parameter is set to `True`, the context manager will commit the transaction when it exits.
"""
Session = sessionmaker(cls.engine)
try:
with Session() as session:
yield session
if commit:
session.commit()
# yield session.execution_options(preserve_rowcount=True, yield_per=100)
# yield conn.execution_options(preserve_rowcount=True, yield_per=100)
except Exception as e:
session.rollback()
raise e
finally:
if commit:
session.commit()
session.close()
# del conn
# cls.engine.clear_compiled_cache()
-81
View File
@@ -1,81 +0,0 @@
from swingmusic.config import UserConfig
from swingmusic.db import Base
from swingmusic.db.utils import track_to_dataclass, tracks_to_dataclasses
from swingmusic.db.engine import DbEngine
from sqlalchemy import JSON, Integer, String, delete, select
from sqlalchemy.orm import Mapped, mapped_column
from typing import Any, Optional
class TrackTable(Base):
__tablename__ = "track"
id: Mapped[int] = mapped_column(init=False, primary_key=True)
album: Mapped[str] = mapped_column(String())
albumartists: Mapped[str] = mapped_column(String())
albumhash: Mapped[str] = mapped_column(String(), index=True)
artists: Mapped[str] = mapped_column(String())
bitrate: Mapped[int] = mapped_column(Integer())
copyright: Mapped[Optional[str]] = mapped_column(String())
date: Mapped[int] = mapped_column(Integer(), nullable=True)
disc: Mapped[int] = mapped_column(Integer())
duration: Mapped[int] = mapped_column(Integer())
filepath: Mapped[str] = mapped_column(String(), index=True, unique=True)
folder: Mapped[str] = mapped_column(String(), index=True)
genres: Mapped[Optional[str]] = mapped_column(String())
last_mod: Mapped[float] = mapped_column(Integer())
title: Mapped[str] = mapped_column(String())
track: Mapped[int] = mapped_column(Integer())
trackhash: Mapped[str] = mapped_column(String(), index=True)
lastplayed: Mapped[int] = mapped_column(Integer(), default=0)
playcount: Mapped[int] = mapped_column(Integer(), default=0)
playduration: Mapped[int] = mapped_column(Integer(), default=0)
extra: Mapped[Optional[dict[str, Any]]] = mapped_column(
JSON(), default_factory=dict
)
@classmethod
def get_all(cls):
with DbEngine.manager() as conn:
config = UserConfig()
result = conn.execute(select(cls).execution_options(yield_per=100))
for i in result.scalars():
d = i.__dict__
del d["_sa_instance_state"]
yield track_to_dataclass(d, config)
@classmethod
def get_tracks_by_filepaths(cls, filepaths: list[str]):
with DbEngine.manager() as conn:
result = conn.execute(
select(TrackTable)
.where(TrackTable.filepath.in_(filepaths))
.order_by(TrackTable.last_mod)
)
return tracks_to_dataclasses(result.fetchall())
@classmethod
def get_tracks_in_path(cls, path: str):
with DbEngine.manager() as conn:
result = conn.execute(
select(TrackTable)
.where(TrackTable.filepath.contains(path))
.order_by(TrackTable.last_mod)
)
clean = []
for row in result.fetchall():
d = row[0].__dict__
del d["_sa_instance_state"]
clean.append(d)
return tracks_to_dataclasses(clean)
@classmethod
def remove_tracks_by_filepaths(cls, filepaths: set[str]):
with DbEngine.manager(commit=True) as conn:
conn.execute(delete(TrackTable).where(TrackTable.filepath.in_(filepaths)))
-35
View File
@@ -1,35 +0,0 @@
from swingmusic.db import Base
from sqlalchemy import Integer, insert, select, update
from sqlalchemy.orm import Mapped, mapped_column
from swingmusic.db.engine import DbEngine
class MigrationTable(Base):
__tablename__ = "dbmigration"
id: Mapped[int] = mapped_column(primary_key=True)
version: Mapped[int] = mapped_column(Integer())
@classmethod
def set_version(cls, version: int):
with DbEngine.manager(commit=True) as conn:
result = conn.execute(
update(cls).where(cls.id == 1).values(version=version)
)
if result.rowcount == 0:
conn.execute(insert(cls).values(id=1, version=version))
@classmethod
def get_version(cls):
with DbEngine.manager() as conn:
result = conn.execute(select(cls.version).where(cls.id == 1))
result = result.fetchone()
if result:
return result[0]
return -1
File diff suppressed because it is too large Load Diff
-3
View File
@@ -1,3 +0,0 @@
"""
This module contains the functions to interact with the SQLite database.
"""
-31
View File
@@ -1,31 +0,0 @@
"""
Reads and saves the latest database migrations version.
"""
from swingmusic.db.sqlite.utils import SQLiteManager
class MigrationManager:
@staticmethod
def get_index() -> int:
"""
Returns the latest databases migrations index.
"""
sql = "SELECT * FROM dbmigrations"
with SQLiteManager() as cur:
cur.execute(sql)
ver = int(cur.fetchone()[1])
cur.close()
return ver
# 👇 Setters 👇
@staticmethod
def set_index(version: int):
"""
Updates the databases migrations index.
"""
sql = "UPDATE dbmigrations SET version = ? WHERE id = 1"
with SQLiteManager() as cur:
cur.execute(sql, (version,))
cur.close()
-122
View File
@@ -1,122 +0,0 @@
"""
Helper functions for use with the SQLite database.
"""
import sqlite3
from sqlite3 import Connection, Cursor
import time
from typing import Optional
from swingmusic.models import Album, Playlist, Track
from swingmusic import settings
def tuple_to_track(track: tuple):
"""
Takes a tuple and returns a Track object
"""
return Track(*track[1:]) # rowid is removed from the tuple
def tuples_to_tracks(tracks: list[tuple]):
"""
Takes a list of tuples and returns a generator that yields a Track object for each tuple
"""
for track in tracks:
yield tuple_to_track(track)
def tuple_to_album(album: tuple):
"""
Takes a tuple and returns an Album object
"""
return Album(*album[1:]) # rowid is removed from the tuple
def tuples_to_albums(albums: list[tuple]):
"""
Takes a list of tuples and returns a generator that yields an album object for each tuple
"""
for album in albums:
yield tuple_to_album(album)
def tuple_to_playlist(playlist: tuple):
"""
Takes a tuple and returns a Playlist object
"""
return Playlist(*playlist)
def tuples_to_playlists(playlists: list[tuple]):
"""
Takes a list of tuples and returns a list of Playlist objects
"""
for playlist in playlists:
yield tuple_to_playlist(playlist)
class SQLiteManager:
"""
This is a context manager that handles the connection and cursor
for you. It also commits and closes the connection when you're done.
"""
def __init__(
self,
conn: Optional[Connection] = None,
userdata_db=False,
test_db_path: str = None,
) -> None:
"""
When a connection is passed in, don't close the connection, because it's
a connection to the search database [in memory db].
"""
self.conn = conn
self.CLOSE_CONN = True
self.userdata_db = userdata_db
self.test_db_path = test_db_path
if conn:
self.conn = conn
self.CLOSE_CONN = False
def __enter__(self) -> Cursor:
if self.conn is not None:
cur = self.conn.cursor()
cur.execute("PRAGMA foreign_keys = ON")
return cur
if self.test_db_path:
db_path = self.test_db_path
else:
db_path = settings.Paths().app_db_path
if self.userdata_db:
db_path = settings.Paths().userdata_db_path
self.conn = sqlite3.connect(
db_path,
timeout=15,
)
cur = self.conn.cursor()
cur.execute("PRAGMA foreign_keys = ON")
return cur
def __exit__(self, exc_type, exc_value, exc_traceback):
trial_count = 0
while trial_count < 10:
try:
self.conn.commit()
if self.CLOSE_CONN:
self.conn.close()
return
except sqlite3.OperationalError:
trial_count += 1
time.sleep(3)
self.conn.close()
-748
View File
@@ -1,748 +0,0 @@
from dataclasses import asdict
import datetime
import json
from typing import Any, Iterable, Literal
from sqlalchemy import (
JSON,
Boolean,
ForeignKey,
Integer,
String,
and_,
delete,
func,
insert,
select,
update,
)
from sqlalchemy.orm import Mapped, mapped_column
from swingmusic.db.engine import DbEngine
from swingmusic.db.utils import (
favorite_to_dataclass,
favorites_to_dataclass,
playlist_to_dataclass,
plugin_to_dataclass,
similar_artist_to_dataclass,
tracklog_to_dataclass,
user_to_dataclass,
)
from swingmusic.db import Base
from swingmusic.models.mix import Mix
from swingmusic.utils.auth import get_current_userid, hash_password
class UserTable(Base):
__tablename__ = "user"
id: Mapped[int] = mapped_column(primary_key=True)
image: Mapped[str] = mapped_column(String(), nullable=True)
password: Mapped[str] = mapped_column(String())
username: Mapped[str] = mapped_column(String(), index=True)
roles: Mapped[list[str]] = mapped_column(JSON(), default_factory=lambda: [])
extra: Mapped[dict[str, Any]] = mapped_column(
JSON(), nullable=True, default_factory=dict
)
@classmethod
def get_all(cls):
result = cls.execute(select(cls))
for i in next(result).scalars():
yield user_to_dataclass(i)
@classmethod
def insert_default_user(cls):
user = {
"username": "admin",
"password": hash_password("admin"),
"roles": ["admin"],
}
return cls.insert_one(user)
@classmethod
def insert_guest_user(cls):
user = {
"username": "guest",
"password": hash_password("guest"),
"roles": ["guest"],
}
return cls.insert_one(user)
@classmethod
def get_by_id(cls, id: int):
result = cls.execute(select(cls).where(cls.id == id))
res = next(result).scalar()
if res:
return user_to_dataclass(res)
@classmethod
def get_by_username(cls, username: str):
res = cls.execute(select(cls).where(cls.username == username))
res = next(res).scalar()
if res:
return user_to_dataclass(res)
@classmethod
def update_one(cls, user: dict[str, Any]):
return next(
cls.execute(
update(cls).where(cls.id == user["id"]).values(user), commit=True
)
)
@classmethod
def remove_by_username(cls, username: str):
return next(
cls.execute(delete(cls).where(cls.username == username), commit=True)
)
class PluginTable(Base):
__tablename__ = "plugin"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(), unique=True)
active: Mapped[bool] = mapped_column(Boolean())
settings: Mapped[dict[str, Any]] = mapped_column(JSON())
extra: Mapped[dict[str, Any]] = mapped_column(JSON(), nullable=True)
@classmethod
def get_all(cls):
result = cls.execute(select(cls))
for i in next(result).scalars():
yield plugin_to_dataclass(i)
@classmethod
def activate(cls, name: str, value: bool):
return next(
cls.execute(
update(cls).where(cls.name == name).values(active=value), commit=True
)
)
@classmethod
def get_by_name(cls, name: str):
result = cls.execute(select(cls).where(cls.name == name))
res = next(result).scalar()
if res:
return plugin_to_dataclass(res)
@classmethod
def update_settings(cls, name: str, settings: dict[str, Any]):
return next(
cls.execute(
update(cls).where(cls.name == name).values(settings=settings),
commit=True,
)
)
class SimilarArtistTable(Base):
__tablename__ = "notlastfm_similar_artists"
id: Mapped[int] = mapped_column(Integer(), primary_key=True)
artisthash: Mapped[str] = mapped_column(String(), index=True)
similar_artists: Mapped[dict[str, str]] = mapped_column(JSON())
@classmethod
def get_all(cls):
result = cls.execute(select(cls).execution_options(yield_per=100))
for i in next(result).scalars():
yield similar_artist_to_dataclass(i)
@classmethod
def exists(cls, artisthash: str):
"""
Check whether an artisthash exists in the database.
"""
with DbEngine.manager() as conn:
result = conn.execute(
select(cls.artisthash)
.where(cls.artisthash == artisthash)
.execution_options(yield_per=100)
)
return len(result.scalars().all()) > 0
@classmethod
def get_by_hash(cls, artisthash: str):
"""
Get a single artist by hash.
"""
result = cls.execute(select(cls).where(cls.artisthash == artisthash))
res = next(result).scalar()
if res:
return similar_artist_to_dataclass(res)
class FavoritesTable(Base):
__tablename__ = "favorite"
id: Mapped[int] = mapped_column(primary_key=True)
hash: Mapped[str] = mapped_column(String(), unique=True)
type: Mapped[str] = mapped_column(String(), index=True)
timestamp: Mapped[int] = mapped_column(Integer(), index=True)
userid: Mapped[int] = mapped_column(
Integer(), ForeignKey("user.id", ondelete="cascade"), default=1, index=True
)
extra: Mapped[dict[str, Any]] = mapped_column(
JSON(), nullable=True, default_factory=dict
)
@classmethod
def get_all(cls, with_user: bool = False):
with DbEngine.manager() as conn:
if with_user:
result = conn.execute(
select(cls).where(cls.userid == get_current_userid())
)
else:
result = conn.execute(select(cls))
for i in result.scalars():
yield favorite_to_dataclass(i)
@classmethod
def insert_item(cls, item: dict[str, Any]):
# guard against hash collisions for different item types
item["hash"] = f"{item['type']}_{item['hash']}"
if item.get("timestamp") is None:
item["timestamp"] = int(datetime.datetime.now().timestamp())
if item.get("userid") is None:
item["userid"] = get_current_userid()
return next(cls.execute(insert(cls).values(item), commit=True))
@classmethod
def remove_item(cls, item: dict[str, Any]):
return next(
cls.execute(
delete(cls).where(
(cls.hash == item["hash"])
| (cls.hash == f"{item['type']}_{item['hash']}")
),
commit=True,
)
)
@classmethod
def check_exists(cls, hash: str, type: str):
result = cls.execute(
select(cls).where((cls.hash == hash) | (cls.hash == f"{type}_{hash}"))
)
return next(result).scalar() is not None
@classmethod
def get_by_hash(cls, hash: str, type: str):
result = cls.execute(
select(cls).where((cls.hash == hash) | (cls.hash == f"{type}_{hash}"))
)
return next(result).scalars().all()
@classmethod
def get_all_of_type(cls, type: str, start: int, limit: int):
result = cls.execute(
select(cls)
# .select_from(join(table, cls, field == cls.hash))
.where(and_(cls.type == type, cls.userid == get_current_userid()))
.order_by(cls.timestamp.desc())
.offset(start)
# INFO: If start is 0, fetch all so we can get the total count
.limit(limit if start != 0 else None)
)
res = next(result).scalars().all()
if start == 0:
# if limit == -1, return all
if limit == -1:
limit = len(res)
return res[:limit], len(res)
return res, -1
@classmethod
def get_fav_tracks(cls, start: int, limit: int):
result, total = cls.get_all_of_type("track", start, limit)
return favorites_to_dataclass(result), total
@classmethod
def get_fav_albums(cls, start: int, limit: int):
result, total = cls.get_all_of_type("album", start, limit)
return favorites_to_dataclass(result), total
@classmethod
def get_fav_artists(cls, start: int, limit: int):
result, total = cls.get_all_of_type("artist", start, limit)
return favorites_to_dataclass(result), total
@classmethod
def count_favs_in_period(cls, start_time: int, end_time: int):
result = cls.execute(
select(func.count(cls.id))
.where((cls.userid == get_current_userid()))
.where(and_(cls.timestamp >= start_time, cls.timestamp <= end_time))
)
res = next(result).scalar()
if res:
return res
return 0
@classmethod
def count_tracks(cls):
result = cls.execute(select(func.count(cls.id)).where(cls.type == "track"))
return next(result).scalar()
@classmethod
def get_last_trackhash(cls):
result = cls.execute(
select(cls.hash).where(cls.type == "track").order_by(cls.timestamp.desc())
)
return next(result).scalar()
class ScrobbleTable(Base):
__tablename__ = "scrobble"
id: Mapped[int] = mapped_column(primary_key=True)
trackhash: Mapped[str] = mapped_column(String(), index=True)
duration: Mapped[int] = mapped_column(Integer())
timestamp: Mapped[int] = mapped_column(Integer())
source: Mapped[str] = mapped_column(String())
userid: Mapped[int] = mapped_column(
Integer(), ForeignKey("user.id", ondelete="cascade"), index=True
)
extra: Mapped[dict[str, Any]] = mapped_column(
JSON(), nullable=True, default_factory=dict
)
@classmethod
def add(cls, item: dict[str, Any]):
if item.get("userid") is None:
item["userid"] = get_current_userid()
return cls.insert_one(item)
@classmethod
def get_all(cls, start: int, limit: int | None = None, userid: int | None = None):
result = cls.execute(
select(cls)
.where(cls.userid == (userid if userid else get_current_userid()))
.order_by(cls.timestamp.desc())
.offset(start)
.limit(limit)
.execution_options(yield_per=100)
)
for i in next(result).scalars():
yield tracklog_to_dataclass(i)
@classmethod
def get_all_in_period(cls, start_time: int, end_time: int, userid: int | None):
# UserId will be None if function is called from the API
# In that case, we use the request userid
if userid is None:
userid = get_current_userid()
result = cls.execute(
select(cls)
.where(cls.userid == userid)
.where(and_(cls.timestamp >= start_time, cls.timestamp <= end_time))
.order_by(cls.timestamp.desc())
.execution_options(yield_per=100)
)
for i in next(result).scalars():
yield tracklog_to_dataclass(i)
@classmethod
def get_last_entry(cls, userid: int):
result = cls.execute(
select(cls).where(cls.userid == userid).order_by(cls.timestamp.desc())
)
res = next(result).scalar()
if res:
return tracklog_to_dataclass(res)
class PlaylistTable(Base):
__tablename__ = "playlist"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(), index=True)
last_updated: Mapped[int] = mapped_column(Integer())
image: Mapped[str] = mapped_column(String(), nullable=True)
userid: Mapped[int] = mapped_column(
Integer(), ForeignKey("user.id", ondelete="cascade")
)
settings: Mapped[dict[str, Any]] = mapped_column(JSON())
trackhashes: Mapped[list[str]] = mapped_column(JSON(), default_factory=list)
extra: Mapped[dict[str, Any]] = mapped_column(
JSON(), nullable=True, default_factory=dict
)
@classmethod
def get_all(cls, current_user: bool = True):
if current_user:
result = cls.execute(
select(cls)
.where(cls.userid == get_current_userid())
.execution_options(yield_per=100)
)
else:
result = cls.execute(select(cls).execution_options(yield_per=100))
for i in next(result).scalars():
yield playlist_to_dataclass(i)
@classmethod
def add_one(cls, playlist: dict[str, Any]):
playlist["userid"] = get_current_userid()
result = cls.insert_one(playlist)
return result.lastrowid
@classmethod
def check_exists_by_name(cls, name: str):
result = cls.execute(
select(cls).where((cls.name == name) & (cls.userid == get_current_userid()))
)
return next(result).scalar() is not None
@classmethod
def append_to_playlist(cls, id: int, trackhashes: list[str]):
dbtrackhashes = cls.get_trackhashes(id) or []
trackhashes = list(set(dbtrackhashes).union(set(trackhashes)))
return next(
cls.execute(
update(cls)
.where((cls.id == id) & (cls.userid == get_current_userid()))
.values(trackhashes=trackhashes),
commit=True,
)
)
@classmethod
def get_trackhashes(cls, id: int):
result = cls.execute(
select(cls.trackhashes).where(
(cls.id == id) & (cls.userid == get_current_userid())
)
)
return next(result).scalar()
@classmethod
def remove_from_playlist(cls, id: int, trackhashes: list[dict[str, Any]]):
# INFO: Get db trackhashes
dbtrackhashes = cls.get_trackhashes(id)
if dbtrackhashes:
for item in trackhashes:
if dbtrackhashes.index(item["trackhash"]) == item["index"]:
dbtrackhashes.remove(item["trackhash"])
return next(
cls.execute(
update(cls)
.where((cls.id == id) & (cls.userid == get_current_userid()))
.values(trackhashes=dbtrackhashes),
commit=True,
)
)
@classmethod
def get_by_id(cls, id: int):
result = cls.execute(
select(cls).where((cls.id == id) & (cls.userid == get_current_userid()))
)
result = next(result).scalar()
if result:
return playlist_to_dataclass(result)
@classmethod
def update_one(cls, id: int, playlist: dict[str, Any]):
return next(
cls.execute(
update(cls)
.where((cls.id == id) & (cls.userid == get_current_userid()))
.values(playlist),
commit=True,
)
)
@classmethod
def update_settings(cls, id: int, settings: dict[str, Any]):
return next(
cls.execute(
update(cls)
.where((cls.id == id) & (cls.userid == get_current_userid()))
.values(settings=settings),
commit=True,
)
)
@classmethod
def remove_image(cls, id: int):
return next(
cls.execute(
update(cls)
.where((cls.id == id) & (cls.userid == get_current_userid()))
.values(image=None),
commit=True,
)
)
class LibDataTable(Base):
__tablename__ = "artistdata"
id: Mapped[int] = mapped_column(primary_key=True)
itemhash: Mapped[str] = mapped_column(String(), unique=True, index=True)
itemtype: Mapped[str] = mapped_column(String())
color: Mapped[str] = mapped_column(String(), nullable=True)
bio: Mapped[str] = mapped_column(String(), nullable=True)
info: Mapped[dict[str, Any]] = mapped_column(JSON(), nullable=True)
extra: Mapped[dict[str, Any]] = mapped_column(
JSON(), nullable=True, default_factory=dict
)
@classmethod
def update_one(cls, hash: str, data: dict[str, Any]):
return next(
cls.execute(
update(cls).where(cls.itemhash == hash).values(data), commit=True
)
)
@classmethod
def find_one(cls, hash: str, type: Literal["album", "artist"]):
result = cls.execute(
select(cls).where((cls.itemhash == type + hash) & (cls.itemtype == type))
)
return next(result).scalar()
@classmethod
def get_all_colors(cls, type: str) -> Iterable[dict[str, str]]:
result = cls.execute(select(cls).where(cls.itemtype == type))
for i in next(result).scalars():
yield {"itemhash": i.itemhash.replace(type, ""), "color": i.color}
class MixTable(Base):
__tablename__ = "mix"
id: Mapped[int] = mapped_column(primary_key=True)
mixid: Mapped[str] = mapped_column(String(), index=True)
title: Mapped[str] = mapped_column(String())
description: Mapped[str] = mapped_column(String())
timestamp: Mapped[int] = mapped_column(Integer())
sourcehash: Mapped[str] = mapped_column(String(), unique=True, index=True)
userid: Mapped[int] = mapped_column(
Integer(), ForeignKey("user.id", ondelete="cascade"), index=True
)
saved: Mapped[bool] = mapped_column(Boolean(), default=False)
tracks: Mapped[list[str]] = mapped_column(JSON(), default_factory=list)
extra: Mapped[dict[str, Any]] = mapped_column(
JSON(), nullable=True, default_factory=dict
)
@classmethod
def get_all(cls, with_userid: bool = False):
if with_userid:
result = cls.execute(
select(cls)
.where(cls.userid == get_current_userid())
.order_by(cls.timestamp.desc())
)
else:
result = cls.execute(select(cls).order_by(cls.timestamp.desc()))
for i in next(result).scalars():
yield Mix.mix_to_dataclass(i)
@classmethod
def get_by_sourcehash(cls, sourcehash: str):
result = cls.execute(select(cls).where(cls.sourcehash == sourcehash))
res = next(result).scalar()
if res:
return Mix.mix_to_dataclass(res)
@classmethod
def get_by_mixid(cls, mixid: str):
result = cls.execute(select(cls).where(cls.mixid == mixid))
res = next(result).scalar()
if res:
return Mix.mix_to_dataclass(res)
@classmethod
def insert_one(cls, mix: Mix):
mixdict = asdict(mix)
mixdict["mixid"] = mix.id
del mixdict["id"]
return next(cls.execute(insert(cls).values(mixdict), commit=True))
@classmethod
def update_one(cls, mixid: str, mix: Mix):
mixdict = asdict(mix)
mixdict["mixid"] = mix.id
del mixdict["id"]
return next(
cls.execute(
update(cls)
.where(
and_(
cls.mixid == mixid,
cls.sourcehash == mix.sourcehash,
cls.userid == get_current_userid(),
)
)
.values(mixdict),
commit=True,
)
)
@classmethod
def save_artist_mix(cls, sourcehash: str):
"""
Toggles the saved status of an artist mix.
"""
mix = cls.get_by_sourcehash(sourcehash)
if not mix:
return False
mix.saved = not mix.saved
cls.update_one(mix.id, mix)
return mix.saved
@classmethod
def get_saved_track_mixes(cls):
"""
Return all mixes that have the extra.trackmix_saved set to True.
"""
result = cls.execute(select(cls).where(cls.extra.c.trackmix_saved == True))
# return Mix.mixes_to_dataclasses(result.fetchall())
for i in next(result).scalars():
yield Mix.mix_to_dataclass(i)
@classmethod
def save_track_mix(cls, sourcehash: str):
"""
Toggles the property extra.trackmix_saved to True.
"""
mix = cls.get_by_sourcehash(sourcehash)
if not mix:
return False
mix.extra["trackmix_saved"] = not mix.extra.get("trackmix_saved", False)
cls.update_one(mix.id, mix)
return mix.extra["trackmix_saved"]
class CollectionTable(Base):
# INFO: table name was kept as page to avoid breaking existing data
__tablename__ = "page"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(), index=True)
userid: Mapped[int] = mapped_column(
Integer(), ForeignKey("user.id", ondelete="cascade"), index=True
)
items: Mapped[list[dict[str, Any]]] = mapped_column(JSON(), default_factory=list)
extra: Mapped[dict[str, Any]] = mapped_column(
JSON(), nullable=True, default_factory=dict
)
@classmethod
def to_dict(cls, entry: Any) -> dict[str, Any]:
d = entry.__dict__
del d["_sa_instance_state"]
return d
@classmethod
def get_all(cls):
result = cls.execute(select(cls).where(cls.userid == get_current_userid()))
for i in next(result).scalars():
yield cls.to_dict(i)
@classmethod
def get_by_id(cls, id: int):
result = cls.execute(
select(cls).where(and_(cls.id == id, cls.userid == get_current_userid()))
)
res = next(result).scalar()
if res:
return cls.to_dict(res)
@classmethod
def delete_by_id(cls, id: int):
return next(
cls.execute(
delete(cls).where(
and_(cls.id == id, cls.userid == get_current_userid())
),
commit=True,
)
)
@classmethod
def update_items(cls, id: int, items: list[dict[str, Any]]):
return next(
cls.execute(
update(cls)
.where(and_(cls.id == id, cls.userid == get_current_userid()))
.values(items=items),
commit=True,
)
)
@classmethod
def update_one(cls, payload: dict[str, Any]):
return next(
cls.execute(
update(cls)
.where(
and_(cls.id == payload["id"], cls.userid == get_current_userid())
)
.values(payload),
commit=True,
)
)
-98
View File
@@ -1,98 +0,0 @@
from typing import Any
from swingmusic.config import UserConfig
from swingmusic.models import Album as AlbumModel, Artist as ArtistModel, Track as TrackModel
from swingmusic.models.favorite import Favorite
from swingmusic.models.lastfm import SimilarArtist
from swingmusic.models.logger import TrackLog
from swingmusic.models.playlist import Playlist
from swingmusic.models.plugins import Plugin
from swingmusic.models.user import User
def row_to_dict(row: Any):
d = row.__dict__
del d["_sa_instance_state"]
return d
def track_to_dataclass(track: dict, config: UserConfig):
return TrackModel(**track, config=config)
def tracks_to_dataclasses(tracks: Any):
return [track_to_dataclass(track, UserConfig()) for track in tracks]
def album_to_dataclass(album: Any):
return AlbumModel(**album._asdict())
def albums_to_dataclasses(albums: Any):
return [album_to_dataclass(album) for album in albums]
def artist_to_dataclass(artist: Any):
return ArtistModel(**artist._asdict())
def artists_to_dataclasses(artists: Any):
return [artist_to_dataclass(artist) for artist in artists]
# SECTION: User data helpers
def similar_artist_to_dataclass(entry: Any):
entry_dict = row_to_dict(entry)
del entry_dict["id"]
return SimilarArtist(**entry_dict)
def similar_artists_to_dataclass(entries: Any):
return [similar_artist_to_dataclass(entry) for entry in entries]
def favorite_to_dataclass(entry: Any):
entry_dict = row_to_dict(entry)
del entry_dict["id"]
return Favorite(**entry_dict)
def favorites_to_dataclass(entries: Any):
return [favorite_to_dataclass(entry) for entry in entries]
def user_to_dataclass(entry: Any):
return User(**row_to_dict(entry))
# def user_to_dataclasses(entries: Any):
# return [user_to_dataclass(entry) for entry in entries]
def plugin_to_dataclass(entry: Any):
entry_dict = row_to_dict(entry)
del entry_dict["id"]
return Plugin(**entry_dict)
def plugin_to_dataclasses(entries: Any):
return [plugin_to_dataclass(entry) for entry in entries]
def tracklog_to_dataclass(entry: Any):
return TrackLog(**row_to_dict(entry))
def tracklog_to_dataclasses(entries: Any):
return [tracklog_to_dataclass(entry) for entry in entries]
def playlist_to_dataclass(entry: Any):
entry_dict = row_to_dict(entry)
return Playlist(**entry_dict)
def playlists_to_dataclasses(entries: Any):
return [playlist_to_dataclass(entry) for entry in entries]