From c77d0927c755b998e5f045213efacf3cf1cf6026 Mon Sep 17 00:00:00 2001 From: cwilvx Date: Sun, 4 Aug 2024 10:19:11 +0300 Subject: [PATCH] remove deprecated db mappings + fix: cli password reset + delete old migrations --- TODO.md | 3 +- app/api/album.py | 2 - app/api/plugins/__init__.py | 15 +- app/api/settings.py | 15 +- app/arg_handler.py | 11 +- app/config.py | 3 +- app/db/sqlite/__init__.py | 18 -- app/db/sqlite/albumcolors.py | 66 ---- app/db/sqlite/artistcolors.py | 64 ---- app/db/sqlite/auth.py | 144 --------- app/db/sqlite/favorite.py | 121 ------- app/db/sqlite/lastfm/__init__.py | 0 app/db/sqlite/lastfm/similar_artists.py | 62 ---- app/db/sqlite/logger/tracks.py | 59 ---- app/db/sqlite/playlists.py | 228 ------------- app/db/sqlite/plugins/__init__.py | 83 ----- app/db/sqlite/queries.py | 123 ------- app/db/sqlite/settings.py | 151 --------- app/db/sqlite/tracks.py | 135 -------- app/db/userdata.py | 18 ++ app/lib/populate.py | 4 - app/lib/trackslib.py | 4 - app/lib/watchdogg.py | 4 - app/migrations/__init__.py | 5 +- app/migrations/v1_3_0/__init__.py | 306 ------------------ app/migrations/v1_4_9/__init__.py | 405 ------------------------ app/plugins/lyrics.py | 8 +- app/utils/auth.py | 1 - tests/__init__.py | 0 tests/sqlite/test_sqlite_actions.py | 59 ---- tests/test_utils.py | 34 -- 31 files changed, 40 insertions(+), 2111 deletions(-) delete mode 100644 app/db/sqlite/albumcolors.py delete mode 100644 app/db/sqlite/artistcolors.py delete mode 100644 app/db/sqlite/auth.py delete mode 100644 app/db/sqlite/favorite.py delete mode 100644 app/db/sqlite/lastfm/__init__.py delete mode 100644 app/db/sqlite/lastfm/similar_artists.py delete mode 100644 app/db/sqlite/logger/tracks.py delete mode 100644 app/db/sqlite/playlists.py delete mode 100644 app/db/sqlite/plugins/__init__.py delete mode 100644 app/db/sqlite/queries.py delete mode 100644 app/db/sqlite/settings.py delete mode 100644 app/db/sqlite/tracks.py delete mode 100644 app/migrations/v1_3_0/__init__.py delete mode 100644 app/migrations/v1_4_9/__init__.py delete mode 100644 tests/__init__.py delete mode 100644 tests/sqlite/test_sqlite_actions.py delete mode 100644 tests/test_utils.py diff --git a/TODO.md b/TODO.md index 1f0409bc..1dbe7a85 100644 --- a/TODO.md +++ b/TODO.md @@ -55,4 +55,5 @@ # Bug fixes - Duplicates on search -- Audio stops on ending \ No newline at end of file +- Audio stops on ending +- Port account settings to config on the frontend \ No newline at end of file diff --git a/app/api/album.py b/app/api/album.py index 1eb3660b..7a0cf17e 100644 --- a/app/api/album.py +++ b/app/api/album.py @@ -21,9 +21,7 @@ from app.utils.hashing import create_hash from app.lib.albumslib import sort_by_track_no from app.serializers.album import serialize_for_card_many from app.serializers.track import serialize_tracks -from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb -check_is_fav = favdb.check_is_favorite bp_tag = Tag(name="Album", description="Single album") api = APIBlueprint("album", __name__, url_prefix="/album", abp_tags=[bp_tag]) diff --git a/app/api/plugins/__init__.py b/app/api/plugins/__init__.py index 930bf603..9e86e18c 100644 --- a/app/api/plugins/__init__.py +++ b/app/api/plugins/__init__.py @@ -1,10 +1,8 @@ -from flask import Blueprint, request - from flask_openapi3 import Tag from flask_openapi3 import APIBlueprint from pydantic import BaseModel, Field from app.api.auth import admin_required -from app.db.sqlite.plugins import PluginsMethods +from app.db.userdata import PluginTable bp_tag = Tag(name="Plugins", description="Manage plugins") api = APIBlueprint("plugins", __name__, url_prefix="/plugins", abp_tags=[bp_tag]) @@ -15,8 +13,7 @@ def get_all_plugins(): """ List all plugins """ - plugins = PluginsMethods.get_all_plugins() - + plugins = PluginTable.get_all() return {"plugins": plugins} @@ -37,9 +34,7 @@ def activate_deactivate_plugin(body: PluginActivateBody): Activate/Deactivate plugin """ name = body.plugin - active = 1 if body.active else 0 - - PluginsMethods.plugin_set_active(name, active) + PluginTable.activate(name, body.active) return {"message": "OK"}, 200 @@ -62,7 +57,7 @@ def update_plugin_settings(body: PluginSettingsBody): if not plugin or not settings: return {"error": "Missing plugin or settings"}, 400 - PluginsMethods.update_plugin_settings(plugin_name=plugin, settings=settings) - plugin = PluginsMethods.get_plugin_by_name(plugin) + PluginTable.update_settings(plugin, settings) + plugin = PluginTable.get_by_name(plugin) return {"status": "success", "settings": plugin.settings} diff --git a/app/api/settings.py b/app/api/settings.py index 1e65f3d8..989ca298 100644 --- a/app/api/settings.py +++ b/app/api/settings.py @@ -1,18 +1,14 @@ from dataclasses import asdict from typing import Any -from flask import request from flask_openapi3 import Tag from flask_openapi3 import APIBlueprint from pydantic import BaseModel, Field from app.api.auth import admin_required -from app.db.sqlite.plugins import PluginsMethods as pdb -from app.db.sqlite.tracks import SQLiteTrackMethods as trackdb from app.db.userdata import PluginTable from app.lib.index import index_everything -from app.lib.watchdogg import Watcher as WatchDog from app.logger import log -from app.settings import Info, Paths, SessionVarKeys +from app.settings import Info, SessionVarKeys from app.store.albums import AlbumStore from app.store.artists import ArtistStore from app.store.tracks import TrackStore @@ -49,6 +45,7 @@ def reload_everything(instance_key: str): except Exception as e: log.error(e) + # CHECKPOINT: TEST SETTINGS API ENDPOINTS # @background @@ -217,10 +214,6 @@ def set_setting(body: SetSettingBody): if key not in mapp: return {"msg": "Invalid key!"}, 400 - sdb.set_setting(key, value) - - flag = mapp[key] - if key == "artist_separators": value = str(value).split(",") value = set(value) @@ -269,7 +262,11 @@ def update_config(body: UpdateConfigBody): Update the config file """ config = UserConfig() + if body.key == "artistSeparators": + body.value = body.value.split(",") + setattr(config, body.key, body.value) + print(getattr(config, body.key)) return { "msg": "Config updated!", diff --git a/app/arg_handler.py b/app/arg_handler.py index f6d6be1b..3424a3b0 100644 --- a/app/arg_handler.py +++ b/app/arg_handler.py @@ -13,11 +13,11 @@ from app.config import UserConfig from app.db.userdata import UserTable from app.logger import log from app.print_help import HELP_MESSAGE +from app.setup.sqlite import setup_sqlite from app.utils.auth import hash_password from app.utils.paths import getFlaskOpenApiPath from app.utils.xdg_utils import get_xdg_config_dir from app.utils.wintools import is_windows -from app.db.sqlite.auth import SQLiteAuthMethods as authdb ALLARGS = settings.ALLARGS ARGS = sys.argv[1:] @@ -209,6 +209,7 @@ class ProcessArgs: if ALLARGS.pswd in ARGS: print("SWING MUSIC v2.0.0 ") print("PASSWORD RECOVERY \n") + setup_sqlite() username: str = "" password: str = "" @@ -221,7 +222,6 @@ class ProcessArgs: sys.exit(0) username = username.strip() - # user = authdb.get_user_by_username(username) user = UserTable.get_by_username(username) if not user: @@ -235,11 +235,6 @@ class ProcessArgs: print("\nOperation cancelled! Exiting ...") sys.exit(0) - password = hash_password(password) - # user = authdb.update_user({"id": user.id, "password": password}) - UserTable.update_one({ - "id": user.id, - "password": password - }) + UserTable.update_one({"id": user.id, "password": hash_password(password)}) sys.exit(0) diff --git a/app/config.py b/app/config.py index bca9c69e..d49be130 100644 --- a/app/config.py +++ b/app/config.py @@ -36,7 +36,8 @@ class UserConfig: # misc enablePeriodicScans: bool = False - scanInterval: int = 60 * 10 # 10 minutes + scanInterval: int = 10 + enableWatchdog: bool = False # plugins enablePlugins: bool = True diff --git a/app/db/sqlite/__init__.py b/app/db/sqlite/__init__.py index 43bd79cd..f5ecf73b 100644 --- a/app/db/sqlite/__init__.py +++ b/app/db/sqlite/__init__.py @@ -1,21 +1,3 @@ """ This module contains the functions to interact with the SQLite database. """ - -import sqlite3 -from sqlite3 import Connection as SqlConn - - -def create_connection(db_file: str) -> SqlConn: - """ - Creates a connection to the specified database. - """ - conn = sqlite3.connect(db_file) - return conn - - -def create_tables(conn: SqlConn, sql_query: str): - """ - Executes the specifiend SQL file to create database tables. - """ - conn.executescript(sql_query) diff --git a/app/db/sqlite/albumcolors.py b/app/db/sqlite/albumcolors.py deleted file mode 100644 index 2e804924..00000000 --- a/app/db/sqlite/albumcolors.py +++ /dev/null @@ -1,66 +0,0 @@ -from sqlite3 import Cursor - -from .utils import SQLiteManager, tuples_to_albums - - -class SQLiteAlbumMethods: - @classmethod - def insert_one_album(cls, cur: Cursor, albumhash: str, colors: str): - """ - Inserts one album into the database - """ - - sql = """INSERT OR REPLACE INTO albums( - albumhash, - colors - ) VALUES(?,?) - """ - - cur.execute(sql, (albumhash, colors)) - lastrowid = cur.lastrowid - - return lastrowid - - @classmethod - def get_all_albums(cls): - with SQLiteManager() as cur: - cur.execute("SELECT * FROM albums") - albums = cur.fetchall() - cur.close() - - if albums is not None: - return albums - - return [] - - @staticmethod - def get_albums_by_albumartist(albumartist: str): - with SQLiteManager() as cur: - cur.execute("SELECT * FROM albums WHERE albumartist=?", (albumartist,)) - albums = cur.fetchall() - cur.close() - - if albums is not None: - return tuples_to_albums(albums) - - return [] - - @staticmethod - def exists(albumhash: str, cur: Cursor = None): - """ - Checks if an album exists in the database. - """ - - sql = "SELECT COUNT(1) FROM albums WHERE albumhash = ?" - - def _exists(cur: Cursor): - cur.execute(sql, (albumhash,)) - count = cur.fetchone()[0] - - return count != 0 - - if cur: - return _exists(cur) - - with SQLiteManager() as cur: - return _exists(cur) diff --git a/app/db/sqlite/artistcolors.py b/app/db/sqlite/artistcolors.py deleted file mode 100644 index 8268ea3f..00000000 --- a/app/db/sqlite/artistcolors.py +++ /dev/null @@ -1,64 +0,0 @@ -""" -Contains methods for reading and writing to the sqlite artists database. -""" - -import json -from sqlite3 import Cursor - -from .utils import SQLiteManager - - -class SQLiteArtistMethods: - @staticmethod - def insert_one_artist(cur: Cursor, artisthash: str, colors: list[str]): - """ - Inserts a single artist into the database. - """ - sql = """INSERT OR REPLACE INTO artists( - artisthash, - colors - ) VALUES(?,?) - """ - colors = json.dumps(colors) - cur.execute(sql, (artisthash, colors)) - - @staticmethod - def get_all_artists(cur_: Cursor = None): - """ - Get all artists from the database and return a generator of Artist objects - """ - sql = """SELECT * FROM artists""" - - if not cur_: - with SQLiteManager() as cur: - cur.execute(sql) - - for artist in cur.fetchall(): - yield artist - - cur.close() - - else: - cur_.execute(sql) - - for artist in cur_.fetchall(): - yield artist - - @staticmethod - def exists(artisthash: str, cur: Cursor = None): - """ - Checks if an artist exists in the database. - """ - sql = "SELECT COUNT(1) FROM artists WHERE artisthash = ?" - - def _exists(cur: Cursor): - cur.execute(sql, (artisthash,)) - count = cur.fetchone()[0] - - return count != 0 - - if cur: - return _exists(cur) - - with SQLiteManager() as cur: - return _exists(cur) diff --git a/app/db/sqlite/auth.py b/app/db/sqlite/auth.py deleted file mode 100644 index dede16b0..00000000 --- a/app/db/sqlite/auth.py +++ /dev/null @@ -1,144 +0,0 @@ -import json -from app.models.user import User -from app.utils.auth import hash_password -from app.db.sqlite.utils import SQLiteManager - - -class SQLiteAuthMethods: - """ - Methods for authenticating users. - """ - - @staticmethod - def insert_user(user: dict[str, str]): - """ - Insert a user into the database. - - :param user: A dict with the username, password and roles. - """ - sql = """INSERT INTO users( - username, - password, - roles - ) VALUES(:username, :password, :roles) - """ - - user_tuple = tuple(user.values()) - - with SQLiteManager(userdata_db=True) as cur: - cur = cur.execute(sql, user_tuple) - userid = cur.lastrowid - return userid - # if userid: - # # sleep - # user = SQLiteAuthMethods.get_user_by_id(userid).todict_simplified() - # cur.close() - # return user - - raise Exception(f"Failed to insert user: {user}") - - @staticmethod - def insert_default_user(): - """ - Inserts the default admin user. - """ - user = { - "username": "admin", - "password": hash_password("admin"), - "roles": json.dumps(["admin"]), - } - return SQLiteAuthMethods.insert_user(user) - - @staticmethod - def insert_guest_user(): - """ - Inserts the default guest user. - """ - user = { - "username": "guest", - "password": hash_password("guest"), - "roles": json.dumps(["guest"]), - } - - return SQLiteAuthMethods.insert_user(user) - - @staticmethod - def update_user(user: dict[str, str]): - """ - Update a user in the database. - - :param user: A dict with the user id and the fields to update. Ommited fields will not be updated. - """ - # get all user dict keys - keys = list(user.keys()) - sql = f"""UPDATE users SET - {', '.join([f"{key} = :{key}" for key in keys if key != 'id'])} - WHERE id = :id - """ - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, user) - cur.close() - - return SQLiteAuthMethods.get_user_by_id(user["id"]).todict() - - @staticmethod - def get_all_users(): - """ - Check if there are any users in the database. - """ - sql = "SELECT * FROM users" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql) - - data = cur.fetchall() - cur.close() - - return [User(*user) for user in data] - - @staticmethod - def get_user_by_username(username: str): - """ - Get a user by username. - """ - sql = "SELECT * FROM users WHERE username = ?" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (username,)) - - data = cur.fetchone() - cur.close() - - if data is not None: - return User(*data) - - return None - - @staticmethod - def get_user_by_id(userid: int): - """ - Get a user by id. - """ - sql = "SELECT * FROM users WHERE id = ?" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (userid,)) - - data = cur.fetchone() - cur.close() - - if data is not None: - return User(*data) - - return None - - @staticmethod - def delete_user_by_username(username: str): - """ - Delete a user by username. - """ - sql = "DELETE FROM users WHERE id = ?" - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (3,)) - cur.close() diff --git a/app/db/sqlite/favorite.py b/app/db/sqlite/favorite.py deleted file mode 100644 index dfff2872..00000000 --- a/app/db/sqlite/favorite.py +++ /dev/null @@ -1,121 +0,0 @@ -from datetime import datetime - -from flask_jwt_extended import current_user -from app.models import FavType -from .utils import SQLiteManager - - -class SQLiteFavoriteMethods: - """THis class contains methods for interacting with the favorites table.""" - - @classmethod - def check_is_favorite(cls, itemhash: str, fav_type: str): - """ - Checks if an item is favorited. - """ - userid = current_user["id"] - - sql = """SELECT * FROM favorites WHERE hash = ? AND type = ? AND userid = ?""" - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (itemhash, fav_type, userid)) - item = cur.fetchone() - cur.close() - return item is not None - - @classmethod - def insert_one_favorite(cls, fav_type: str, fav_hash: str): - """ - Inserts a single favorite into the database. - """ - # try to find the favorite in the database, if it exists, don't insert it - if cls.check_is_favorite(fav_hash, fav_type): - return - - sql = """INSERT INTO favorites(type, hash, timestamp, userid) VALUES(?,?,?,?)""" - current_timestamp = int(datetime.now().timestamp()) - with SQLiteManager(userdata_db=True) as cur: - userid = current_user["id"] - cur.execute(sql, (fav_type, fav_hash, current_timestamp, userid)) - cur.close() - - @classmethod - def get_all(cls) -> list[tuple]: - """ - Returns a list of all favorites. - """ - sql = """SELECT * FROM favorites WHERE userid = ?""" - with SQLiteManager(userdata_db=True) as cur: - userid = current_user["id"] - cur.execute(sql, (userid,)) - favs = cur.fetchall() - cur.close() - return [fav for fav in favs if fav[1] != ""] - - @classmethod - def get_favorites(cls, fav_type: str, userid: int = None) -> list[tuple]: - """ - Returns a list of favorite tracks. - - If userid is None, all favorites are returned. - """ - sql = """SELECT * FROM favorites WHERE type = ?""" - params = (fav_type,) - - if not userid: - sql += " AND userid = ?" - params = (fav_type, userid) - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, params) - all_favs = cur.fetchall() - cur.close() - return all_favs - - @classmethod - def get_fav_tracks(cls, userid: int = None) -> list[tuple]: - """ - Returns a list of favorite tracks. - """ - return cls.get_favorites(FavType.track, userid) - - @classmethod - def get_fav_albums(cls) -> list[tuple]: - """ - Returns a list of favorite albums. - """ - userid = current_user["id"] - return cls.get_favorites(FavType.album, userid) - - @classmethod - def get_fav_artists(cls) -> list[tuple]: - """ - Returns a list of favorite artists. - """ - userid = current_user["id"] - return cls.get_favorites(FavType.artist, userid) - - @classmethod - def delete_favorite(cls, fav_type: str, fav_hash: str): - """ - Deletes a favorite from the database. - """ - sql = """DELETE FROM favorites WHERE hash = ? AND type = ? AND userid = ?""" - - with SQLiteManager(userdata_db=True) as cur: - userid = current_user["id"] - cur.execute(sql, (fav_hash, fav_type, userid)) - cur.close() - - @classmethod - def get_track_count(cls) -> int: - """ - Returns the number of favorite tracks. - """ - sql = """SELECT COUNT(*) FROM favorites WHERE type = ? AND userid = ?""" - - with SQLiteManager(userdata_db=True) as cur: - userid = current_user["id"] - cur.execute(sql, (FavType.track, userid)) - count = cur.fetchone()[0] - cur.close() - return count diff --git a/app/db/sqlite/lastfm/__init__.py b/app/db/sqlite/lastfm/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/app/db/sqlite/lastfm/similar_artists.py b/app/db/sqlite/lastfm/similar_artists.py deleted file mode 100644 index 53a45b70..00000000 --- a/app/db/sqlite/lastfm/similar_artists.py +++ /dev/null @@ -1,62 +0,0 @@ -from app.models.lastfm import SimilarArtist - -from ..utils import SQLiteManager - - -class SQLiteLastFMSimilarArtists: - """ - This class contains methods for interacting with the lastfm_similar_artists table. - """ - - @classmethod - def insert_one(cls, artist: SimilarArtist): - """ - Inserts a single artist into the database. - """ - sql = """INSERT OR REPLACE INTO lastfm_similar_artists(artisthash, similar_artists) VALUES(?,?)""" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (artist.artisthash, artist.similar_artists)) - cur.close() - - @classmethod - def get_similar_artists_for(cls, artisthash: str): - """ - Returns a list of similar artists. - """ - sql = """SELECT * FROM lastfm_similar_artists WHERE artisthash = ?""" - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (artisthash,)) - similar_artists = cur.fetchone() - cur.close() - - if similar_artists is None: - return None - - return SimilarArtist(artisthash, similar_artists[2]) - - @classmethod - def get_all(cls): - """ - Returns a list of all similar artists. - """ - sql = """SELECT * FROM lastfm_similar_artists""" - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql) - similar_artists = cur.fetchall() - cur.close() - - for a in similar_artists: - yield SimilarArtist(a[1], a[2]) - - @classmethod - def exists(cls, artisthash: str): - """ - Checks if an artist exists in the database by counting the number of rows - """ - sql = """SELECT COUNT(*) FROM lastfm_similar_artists WHERE artisthash = ?""" - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (artisthash,)) - count = cur.fetchone()[0] - cur.close() - return count > 0 diff --git a/app/db/sqlite/logger/tracks.py b/app/db/sqlite/logger/tracks.py deleted file mode 100644 index c95c123b..00000000 --- a/app/db/sqlite/logger/tracks.py +++ /dev/null @@ -1,59 +0,0 @@ -from flask_jwt_extended import current_user -from app.db.sqlite.utils import SQLiteManager -from app.models.logger import TrackLog as TrackLog -from app.utils.auth import get_current_userid - - -class SQLiteTrackLogger: - @classmethod - def insert_track(cls, trackhash: str, duration: int, source: str, timestamp: int): - """ - Inserts a track play record into the database - """ - - userid = get_current_userid() - with SQLiteManager(userdata_db=True) as cur: - sql = """INSERT OR REPLACE INTO track_logger( - trackhash, - duration, - timestamp, - source, - userid - ) VALUES(?,?,?,?,?) - """ - - cur.execute( - sql, (trackhash, duration, timestamp, source, userid) - ) - lastrowid = cur.lastrowid - - return lastrowid - - @classmethod - def get_all(cls): - """ - Returns all track play records from the database - """ - - with SQLiteManager(userdata_db=True) as cur: - userid = get_current_userid() - sql = f"""SELECT * FROM track_logger WHERE userid = {userid} ORDER BY timestamp DESC""" - - cur.execute(sql) - rows = cur.fetchall() - - return rows - - @classmethod - def get_recently_played(cls, start: int = 0, limit: int = 100): - """ - Returns a list of recently played tracks - """ - - with SQLiteManager(userdata_db=True) as cur: - sql = f"""SELECT * FROM track_logger WHERE userid = {current_user['id']} ORDER BY timestamp DESC LIMIT ?,?""" - - cur.execute(sql, (start, limit)) - rows = cur.fetchall() - - return [TrackLog(*row) for row in rows] diff --git a/app/db/sqlite/playlists.py b/app/db/sqlite/playlists.py deleted file mode 100644 index b4cca33e..00000000 --- a/app/db/sqlite/playlists.py +++ /dev/null @@ -1,228 +0,0 @@ -import json -from collections import OrderedDict - -from flask_jwt_extended import current_user - -from app.db.sqlite.utils import SQLiteManager, tuple_to_playlist, tuples_to_playlists -from app.utils.dates import create_new_date - - -class SQLitePlaylistMethods: - """ - This class contains methods for interacting with the playlists table. - """ - - @staticmethod - def update_last_updated(playlist_id: int): - """Updates the last updated date of a playlist.""" - sql = """UPDATE playlists SET last_updated = ? WHERE id = ?""" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (create_new_date(), playlist_id)) - - @staticmethod - def insert_one_playlist(playlist: dict): - # banner_pos, - # has_gif, - sql = """INSERT INTO playlists( - image, - last_updated, - name, - settings, - trackhashes, - userid - ) VALUES(:image, :last_updated, :name, :settings, :trackhashes, :userid) - """ - - playlist["userid"] = current_user["id"] - playlist = OrderedDict(sorted(playlist.items())) - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, playlist) - pid = cur.lastrowid - cur.close() - - p_tuple = (pid, *playlist.values()) - return tuple_to_playlist(p_tuple) - - @staticmethod - def count_playlist_by_name(name: str): - sql = f"SELECT COUNT(*) FROM playlists WHERE name = ? and userid = {current_user['id']}" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (name,)) - - data = cur.fetchone() - cur.close() - - return int(data[0]) - - @staticmethod - def get_all_playlists(): - with SQLiteManager(userdata_db=True) as cur: - userid = 1 - - try: - userid = current_user["id"] - except RuntimeError: - # Catch this error raised during migration execution - pass - - cur.execute(f"SELECT * FROM playlists WHERE userid = {userid}") - playlists = cur.fetchall() - cur.close() - - if playlists is not None: - return tuples_to_playlists(playlists) - - return [] - - @staticmethod - def get_playlist_by_id(playlist_id: int): - sql = f"SELECT * FROM playlists WHERE id = ? and userid = {current_user['id']}" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (playlist_id,)) - - data = cur.fetchone() - cur.close() - - if data is not None: - return tuple_to_playlist(data) - - return None - - # FIXME: Extract the "add_track_to_playlist" method to use it for both the artisthash and trackhash lists. - - @classmethod - def add_item_to_json_list(cls, playlist_id: int, field: str, items: set[str]): - """ - Adds a string item to a json dumped list using a playlist id and field name. - Takes the playlist ID, a field name, an item to add to the field. - """ - userid = 1 - - try: - userid = current_user["id"] - except RuntimeError: - # Catch this error raised during migration execution - pass - - sql = f"SELECT {field} FROM playlists WHERE id = ? and userid = {userid}" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (playlist_id,)) - data = cur.fetchone() - - if data is not None: - db_items: list[str] = json.loads(data[0]) - - # Remove duplicates, without changing the order. - for item in items: - if item in db_items: - items.remove(item) - - db_items.extend(items) - - sql = f"UPDATE playlists SET {field} = ? WHERE id = ?" - cur.execute(sql, (json.dumps(db_items), playlist_id)) - return len(items) - - cls.update_last_updated(playlist_id) - - @classmethod - def add_tracks_to_playlist(cls, playlist_id: int, trackhashes: list[str]): - """ - Adds trackhashes to a playlist - """ - return cls.add_item_to_json_list(playlist_id, "trackhashes", trackhashes) - - @classmethod - def update_playlist(cls, playlist_id: int, playlist: dict): - sql = f"""UPDATE playlists SET - image = ?, - last_updated = ?, - name = ?, - settings = ? - WHERE id = ? and userid = {current_user['id']} - """ - - del playlist["id"] - del playlist["trackhashes"] - playlist["settings"] = json.dumps(playlist["settings"]) - - playlist = OrderedDict(sorted(playlist.items())) - params = (*playlist.values(), playlist_id) - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, params) - - cls.update_last_updated(playlist_id) - - @classmethod - def update_settings(cls, playlist_id: int, settings: dict): - sql = f"""UPDATE playlists SET settings = ? WHERE id = ? and userid = {current_user['id']}""" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (json.dumps(settings), playlist_id)) - - cls.update_last_updated(playlist_id) - - @staticmethod - def delete_playlist(pid: str): - sql = f"DELETE FROM playlists WHERE id = ? and userid = {current_user['id']}" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (pid,)) - - @staticmethod - def remove_banner(playlistid: int): - sql = f"""UPDATE playlists SET image = NULL WHERE id = ? and userid = {current_user['id']}""" - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(sql, (playlistid,)) - - @classmethod - def remove_tracks_from_playlist(cls, playlistid: int, tracks: list[dict[str, int]]): - """ - Removes tracks from a playlist by trackhash and position. - """ - - sql = """UPDATE playlists SET trackhashes = ? WHERE id = ?""" - userid = 1 - - try: - userid = current_user["id"] - except RuntimeError: - # Catch this error raised during migration execution - pass - - with SQLiteManager(userdata_db=True) as cur: - cur.execute( - f"SELECT trackhashes FROM playlists WHERE id = ? and userid = {userid}", - (playlistid,), - ) - data = cur.fetchone() - - if data is None: - return - - trackhashes: list[str] = json.loads(data[0]) - to_remove = [] - - for track in tracks: - # { - # trackhash: str; - # index: int; - # } - index = trackhashes.index(track["trackhash"]) - - if index == track["index"]: - to_remove.append(track["trackhash"]) - - for trackhash in to_remove: - trackhashes.remove(trackhash) - - cur.execute(sql, (json.dumps(trackhashes), playlistid)) - - cls.update_last_updated(playlistid) diff --git a/app/db/sqlite/plugins/__init__.py b/app/db/sqlite/plugins/__init__.py deleted file mode 100644 index 0afc5d43..00000000 --- a/app/db/sqlite/plugins/__init__.py +++ /dev/null @@ -1,83 +0,0 @@ -import json - -from app.models.plugins import Plugin - -from ..utils import SQLiteManager - - -def plugin_tuple_to_obj(plugin_tuple: tuple) -> Plugin: - return Plugin( - name=plugin_tuple[1], - active=bool(plugin_tuple[3]), - settings=json.loads(plugin_tuple[4]), - ) - - -class PluginsMethods: - @classmethod - def insert_plugin(cls, plugin: Plugin): - """ - Inserts one plugin into the database - """ - - sql = """INSERT OR IGNORE INTO plugins( - name, - description, - active, - settings - ) VALUES(?,?,?,?) - """ - - with SQLiteManager(userdata_db=True) as cur: - cur.execute( - sql, - ( - plugin.name, - plugin.description, - int(plugin.active), - json.dumps(plugin.settings), - ), - ) - lastrowid = cur.lastrowid - - return lastrowid - - - @classmethod - def get_all_plugins(cls): - with SQLiteManager(userdata_db=True) as cur: - cur.execute("SELECT * FROM plugins") - plugins = cur.fetchall() - cur.close() - - if plugins is not None: - return [plugin_tuple_to_obj(plugin) for plugin in plugins] - - return [] - - @classmethod - def plugin_set_active(cls, name: str, active: int): - with SQLiteManager(userdata_db=True) as cur: - cur.execute("UPDATE plugins SET active=? WHERE name=?", (active, name)) - cur.close() - - @classmethod - def update_plugin_settings(cls, plugin_name: str, settings: dict): - with SQLiteManager(userdata_db=True) as cur: - cur.execute( - "UPDATE plugins SET settings=? WHERE name=?", - (json.dumps(settings), plugin_name), - ) - cur.close() - - @classmethod - def get_plugin_by_name(cls, name: str): - with SQLiteManager(userdata_db=True) as cur: - cur.execute("SELECT * FROM plugins WHERE name=?", (name,)) - plugin = cur.fetchone() - cur.close() - - if plugin is not None: - return plugin_tuple_to_obj(plugin) - - return None diff --git a/app/db/sqlite/queries.py b/app/db/sqlite/queries.py deleted file mode 100644 index 547b4ffe..00000000 --- a/app/db/sqlite/queries.py +++ /dev/null @@ -1,123 +0,0 @@ -""" -This file contains the SQL queries to create the database tables. -""" - -CREATE_USERDATA_TABLES = """ -CREATE TABLE IF NOT EXISTS playlists ( - id integer PRIMARY KEY, - image text, - last_updated text not null, - name text not null, - settings text, - trackhashes text, - userid integer not null, - constraint fk_users foreign key (userid) references users(id) on delete cascade -); - -CREATE TABLE IF NOT EXISTS settings ( - id integer PRIMARY KEY, - root_dirs text NOT NULL, - exclude_dirs text, - artist_separators text NOT NULL default '/,;', - extract_feat integer NOT NULL DEFAULT 1, - remove_prod integer NOT NULL DEFAULT 1, - clean_album_title integer NOT NULL DEFAULT 1, - remove_remaster integer NOT NULL DEFAULT 1, - merge_albums integer NOT NULL DEFAULT 0, - show_albums_as_singles integer NOT NULL DEFAULT 0 -); - -CREATE TABLE IF NOT EXISTS lastfm_similar_artists ( - id integer PRIMARY KEY, - artisthash text NOT NULL, - similar_artists text NOT NULL, - UNIQUE (artisthash) -); - -CREATE TABLE IF NOT EXISTS plugins ( - id integer PRIMARY KEY, - name text NOT NULL UNIQUE, - description text NOT NULL, - active integer NOT NULL DEFAULT 0, - settings text -); - -CREATE TABLE IF NOT EXISTS track_logger ( - id integer PRIMARY KEY, - trackhash text NOT NULL, - duration integer NOT NULL, - timestamp integer NOT NULL, - source text, - userid integer NOT NULL DEFAULT 1, - constraint fk_users foreign key (userid) references users(id) on delete cascade -); - -CREATE TABLE IF NOT EXISTS users ( - id integer PRIMARY KEY, - username text NOT NULL UNIQUE, - firstname text, - lastname text, - password text NOT NULL, - email text, - image text, - roles text NOT NULL DEFAULT '["user"]' -) -""" - -CREATE_APPDB_TABLES = """ -CREATE TABLE IF NOT EXISTS tracks ( - id integer PRIMARY KEY, - album text NOT NULL, - albumartist text NOT NULL, - albumhash text NOT NULL, - artist text NOT NULL, - bitrate integer NOT NULL, - copyright text, - date integer NOT NULL, - disc integer NOT NULL, - duration integer NOT NULL, - filepath text NOT NULL, - folder text NOT NULL, - genre text, - title text NOT NULL, - track integer NOT NULL, - trackhash text NOT NULL, - last_mod float NOT NULL, - UNIQUE (filepath) -); - -CREATE TABLE IF NOT EXISTS albums ( - id integer PRIMARY KEY, - albumhash text NOT NULL, - colors text NOT NULL, - UNIQUE (albumhash) -); - -CREATE TABLE IF NOT EXISTS artists ( - id integer PRIMARY KEY, - artisthash text NOT NULL, - colors text, - bio text, - UNIQUE (artisthash) -); - -CREATE TABLE IF NOT EXISTS folders ( - id integer PRIMARY KEY, - path text NOT NULL, - trackcount integer NOT NULL -); -""" - -# changed from migrations to dbmigrations in v1.3.0 -# to avoid conflicts with the previous migrations. - -CREATE_MIGRATIONS_TABLE = """ -CREATE TABLE IF NOT EXISTS dbmigrations ( - id integer PRIMARY KEY, - version integer NOT NULL DEFAULT 0 -); - -INSERT INTO dbmigrations (version) -SELECT -1 -WHERE NOT EXISTS (SELECT 1 FROM dbmigrations); -""" diff --git a/app/db/sqlite/settings.py b/app/db/sqlite/settings.py deleted file mode 100644 index 0121fa9f..00000000 --- a/app/db/sqlite/settings.py +++ /dev/null @@ -1,151 +0,0 @@ -from pprint import pprint -from typing import Any - -from app.config import UserConfig -from app.db.sqlite.utils import SQLiteManager -from app.utils.wintools import win_replace_slash - - -# class SettingsSQLMethods: -# """ -# Methods for interacting with the settings table. -# """ - -# @staticmethod -# def get_all_settings(): -# """ -# Gets all settings from the database. -# """ - -# sql = "SELECT * FROM settings WHERE id = 1" - -# with SQLiteManager(userdata_db=True) as cur: -# cur.execute(sql) -# settings = cur.fetchone() -# cur.close() - -# # if root_dirs not set -# if settings is None: -# return [] - -# # omit id, root_dirs, and exclude_dirs -# return settings[3:] - -# @staticmethod -# def get_root_dirs() -> list[str]: -# """ -# Gets custom root directories from the database. -# """ - -# sql = "SELECT root_dirs FROM settings" - -# with SQLiteManager(userdata_db=True) as cur: -# cur.execute(sql) -# dirs = cur.fetchall() -# cur.close() - -# dirs = [_dir[0] for _dir in dirs] -# return [win_replace_slash(d) for d in dirs] - -# @staticmethod -# def add_root_dirs(dirs: list[str]): -# """ -# Add custom root directories to the database. -# """ - -# sql = "INSERT INTO settings (root_dirs) VALUES (?)" -# existing_dirs = SettingsSQLMethods.get_root_dirs() - -# dirs = [_dir for _dir in dirs if _dir not in existing_dirs] - -# if len(dirs) == 0: -# return - -# with SQLiteManager(userdata_db=True) as cur: -# for _dir in dirs: -# cur.execute(sql, (_dir,)) - -# @staticmethod -# def remove_root_dirs(dirs: list[str]): -# """ -# Remove custom root directories from the database. -# """ - -# sql = "DELETE FROM settings WHERE root_dirs = ?" - -# with SQLiteManager(userdata_db=True) as cur: -# for _dir in dirs: -# cur.execute(sql, (_dir,)) - -# # Not currently used anywhere, to be used later -# @staticmethod -# def add_excluded_dirs(dirs: list[str]): -# """ -# Add custom exclude directories to the database. -# """ - -# sql = "INSERT INTO settings (exclude_dirs) VALUES (?)" - -# with SQLiteManager(userdata_db=True) as cur: -# cur.executemany(sql, dirs) - -# @staticmethod -# def remove_excluded_dirs(dirs: list[str]): -# """ -# Remove custom exclude directories from the database. -# """ - -# sql = "DELETE FROM settings WHERE exclude_dirs = ?" - -# with SQLiteManager(userdata_db=True) as cur: -# cur.executemany(sql, dirs) - -# @staticmethod -# def get_excluded_dirs() -> list[str]: -# """ -# Gets custom exclude directories from the database. -# """ - -# sql = "SELECT exclude_dirs FROM settings" - -# with SQLiteManager(userdata_db=True) as cur: -# cur.execute(sql) -# dirs = cur.fetchall() -# return [_dir[0] for _dir in dirs] - -# @staticmethod -# def get_settings() -> dict[str, Any]: -# pass - -# @staticmethod -# def set_setting(key: str, value: Any): -# sql = f"UPDATE settings SET {key} = :value WHERE id = 1" - -# if type(value) == bool: -# value = str(int(value)) - -# with SQLiteManager(userdata_db=True) as cur: -# cur.execute(sql, {"value": value}) - - -# def load_settings(): -# # s = SettingsSQLMethods.get_all_settings() -# config = UserConfig() - - # try: - # db_separators: str = s[0] - # db_separators = db_separators.replace(" ", "") - # separators = db_separators.split(",") - # separators = set(separators) - # except IndexError: - # separators = {";", "/"} - - # SessionVars.ARTIST_SEPARATORS = config.artistSeparators - - # # boolean settings - # SessionVars.EXTRACT_FEAT = bool(s[1]) - # SessionVars.REMOVE_PROD = bool(s[2]) - # SessionVars.CLEAN_ALBUM_TITLE = bool(s[3]) - # SessionVars.REMOVE_REMASTER_FROM_TRACK = bool(s[4]) - # SessionVars.MERGE_ALBUM_VERSIONS = bool(s[5]) - # SessionVars.SHOW_ALBUMS_AS_SINGLES = bool(s[6]) diff --git a/app/db/sqlite/tracks.py b/app/db/sqlite/tracks.py deleted file mode 100644 index 0324641b..00000000 --- a/app/db/sqlite/tracks.py +++ /dev/null @@ -1,135 +0,0 @@ -""" -Contains the SQLiteTrackMethods class which contains methods for -interacting with the tracks table. -""" - -from collections import OrderedDict -from sqlite3 import Cursor - -from app.db.sqlite.utils import tuple_to_track, tuples_to_tracks - -from .utils import SQLiteManager -from app.utils.unicode import handle_unicode - - -class SQLiteTrackMethods: - """ - This class contains all methods for interacting with the tracks table. - """ - - @classmethod - def insert_one_track(cls, track: dict, cur: Cursor): - """ - Inserts a single track into the database. - """ - sql = """INSERT OR REPLACE INTO tracks( - album, - albumartist, - albumhash, - artist, - bitrate, - copyright, - date, - disc, - duration, - filepath, - folder, - genre, - last_mod, - title, - track, - trackhash - ) VALUES(:album, :albumartist, :albumhash, :artist, :bitrate, :copyright, - :date, :disc, :duration, :filepath, :folder, :genre, :last_mod, :title, :track, :trackhash) - """ - - track = OrderedDict(sorted(track.items())) - - track["artist"] = track["artists"] - track["albumartist"] = track["albumartists"] - - del track["artists"] - del track["albumartists"] - - try: - cur.execute(sql, track) - except UnicodeEncodeError: - # for each of the values in the track, call handle_unicode on it - for key, value in track.items(): - track[key] = handle_unicode(value) - - cur.execute(sql, track) - - @classmethod - def insert_many_tracks(cls, tracks: list[dict]): - """ - Inserts a list of tracks into the database. - """ - - with SQLiteManager() as cur: - for track in tracks: - cls.insert_one_track(track, cur) - - @staticmethod - def get_all_tracks(): - """ - Get all tracks from the database and return a generator of Track objects - or an empty list. - """ - with SQLiteManager() as cur: - cur.execute("SELECT * FROM tracks") - rows = cur.fetchall() - - if rows is not None: - return tuples_to_tracks(rows) - - return [] - - @staticmethod - def get_track_by_trackhash(trackhash: str): - """ - Gets a track using its trackhash. Returns a Track object or None. - """ - with SQLiteManager() as cur: - cur.execute("SELECT * FROM tracks WHERE trackhash=?", (trackhash,)) - row = cur.fetchone() - - if row is not None: - return tuple_to_track(row) - - return None - - @staticmethod - def get_track_by_albumhash(albumhash: str): - """ - Gets a track using its albumhash. Returns a Track object or None. - """ - with SQLiteManager() as cur: - cur.execute("SELECT * FROM tracks WHERE albumhash=?", (albumhash,)) - row = cur.fetchone() - - if row is not None: - return tuple_to_track(row) - - return None - - @staticmethod - def remove_tracks_by_filepaths(filepaths: str | set[str]): - """ - Removes a track or tracks from the database using their filepaths. - """ - if isinstance(filepaths, str): - filepaths = {filepaths} - - with SQLiteManager() as cur: - for filepath in filepaths: - cur.execute("DELETE FROM tracks WHERE filepath=?", (filepath,)) - - @staticmethod - def remove_tracks_not_in_folders(folders: set[str]): - sql = "DELETE FROM tracks WHERE folder NOT IN ({})".format( - ",".join("?" * len(folders)) - ) - - with SQLiteManager() as cur: - cur.execute(sql, tuple(folders)) diff --git a/app/db/userdata.py b/app/db/userdata.py index 9f3bea2f..0c5f13bc 100644 --- a/app/db/userdata.py +++ b/app/db/userdata.py @@ -20,6 +20,7 @@ from app.db.utils import ( favorites_to_dataclass, playlist_to_dataclass, playlists_to_dataclasses, + plugin_to_dataclass, plugin_to_dataclasses, similar_artist_to_dataclass, similar_artists_to_dataclass, @@ -110,6 +111,23 @@ class PluginTable(Base): def get_all(cls): return plugin_to_dataclasses(cls.all()) + @classmethod + def activate(cls, name: str, value: bool): + return cls.execute( + update(cls).where(cls.name == name).values(active=value), commit=True + ) + + @classmethod + def get_by_name(cls, name: str): + result = cls.execute(select(cls).where(cls.name == name)) + return plugin_to_dataclass(result.fetchone()) + + @classmethod + def update_settings(cls, name: str, settings: dict[str, Any]): + return cls.execute( + update(cls).where(cls.name == name).values(settings=settings), commit=True + ) + class SimilarArtistTable(Base): __tablename__ = "notlastfm_similar_artists" diff --git a/app/lib/populate.py b/app/lib/populate.py index 0b12565c..703b9a76 100644 --- a/app/lib/populate.py +++ b/app/lib/populate.py @@ -6,7 +6,6 @@ from requests import ConnectionError as RequestConnectionError from requests import ReadTimeout from app import settings -from app.db.sqlite.tracks import SQLiteTrackMethods from app.lib.artistlib import CheckArtistImages from app.lib.colorlib import ProcessAlbumColors, ProcessArtistColors from app.lib.errors import PopulateCancelledError @@ -22,9 +21,6 @@ from app.utils.progressbar import tqdm from app.db.userdata import SimilarArtistTable -get_all_tracks = SQLiteTrackMethods.get_all_tracks -insert_many_tracks = SQLiteTrackMethods.insert_many_tracks -remove_tracks_by_filepaths = SQLiteTrackMethods.remove_tracks_by_filepaths POPULATE_KEY = "" diff --git a/app/lib/trackslib.py b/app/lib/trackslib.py index c8a7a74e..438116a6 100644 --- a/app/lib/trackslib.py +++ b/app/lib/trackslib.py @@ -6,10 +6,6 @@ import os from app.lib.pydub.pydub import AudioSegment from app.lib.pydub.pydub.silence import detect_leading_silence, detect_silence - -from app.db.sqlite.tracks import SQLiteTrackMethods as trackdb -from app.store.tracks import TrackStore -from app.utils.progressbar import tqdm from app.utils.threading import ThreadWithReturnValue diff --git a/app/lib/watchdogg.py b/app/lib/watchdogg.py index 8d7b9955..0b577596 100644 --- a/app/lib/watchdogg.py +++ b/app/lib/watchdogg.py @@ -12,11 +12,7 @@ from watchdog.observers import Observer from app import settings from app.config import UserConfig -from app.db.sqlite.albumcolors import SQLiteAlbumMethods as aldb - -# from app.db.sqlite.settings import SettingsSQLMethods as sdb from app.db.sqlite.tracks import SQLiteManager -from app.db.sqlite.tracks import SQLiteTrackMethods as db from app.lib.colorlib import process_color from app.lib.taglib import extract_thumb, get_tags from app.logger import log diff --git a/app/migrations/__init__.py b/app/migrations/__init__.py index 54b51bf5..caa338f0 100644 --- a/app/migrations/__init__.py +++ b/app/migrations/__init__.py @@ -5,13 +5,10 @@ Reads and applies the latest database migrations. """ import inspect -import sys from types import ModuleType # from app.db.sqlite.migrations import MigrationManager from app.db.metadata import MigrationTable -from app.logger import log -from app.migrations import v1_3_0, v1_4_9 from app.migrations.base import Migration @@ -41,7 +38,7 @@ def apply_migrations(): migrations past that index are applied and the new length is stored as the new migration index. """ - modules = [v1_3_0, v1_4_9] + modules = [] migrations = [get_all_migrations(m) for m in modules] # index = MigrationManager.get_index() diff --git a/app/migrations/v1_3_0/__init__.py b/app/migrations/v1_3_0/__init__.py deleted file mode 100644 index 9b20b6b0..00000000 --- a/app/migrations/v1_3_0/__init__.py +++ /dev/null @@ -1,306 +0,0 @@ -import json -import os -import shutil -import time -from collections import OrderedDict -from sqlite3 import OperationalError -from typing import Generator - -from app.db.sqlite.utils import SQLiteManager -from app.migrations.base import Migration -from app.settings import Paths -from app.utils.decorators import coroutine -from app.utils.hashing import create_hash - -# playlists table -# --------------- -# 0: id -# 1: banner_pos -# 2: has_gif -# 3: image -# 4: last_updated -# 5: name -# 6: trackhashes - - -class m1_RemoveSmallThumbnailFolder(Migration): - """ - Removes the small thumbnail folder. - - Because we are added a new folder "original" in the same directory, and the small thumbs folder is used to check if an album's thumbnail is already extracted. - - So we need to remove it, to force the app to extract thumbnails for all albums. - """ - - @staticmethod - def migrate(): - thumbs_sm_path = Paths.get_sm_thumb_path() - thumbs_lg_path = Paths.get_lg_thumb_path() - - for path in [thumbs_sm_path, thumbs_lg_path]: - if os.path.exists(path): - shutil.rmtree(path) - - for path in [thumbs_sm_path, thumbs_lg_path]: - os.makedirs(path, exist_ok=True) - - -class m2_RemovePlaylistArtistHashes(Migration): - """ - removes the artisthashes column from the playlists table. - """ - - @staticmethod - def migrate(): - # remove artisthashes column - sql = "ALTER TABLE playlists DROP COLUMN artisthashes" - - with SQLiteManager(userdata_db=True) as cur: - try: - cur.execute(sql) - except OperationalError: - pass - - cur.close() - - -class m3_AddSettingsToPlaylistTable(Migration): - """ - adds the settings column and removes the banner_pos and has_gif columns - to the playlists table. - """ - - @staticmethod - def migrate(): - select_playlists_sql = "SELECT * FROM playlists" - - with SQLiteManager(userdata_db=True) as cur: - create_playlist_table_sql = """CREATE TABLE IF NOT EXISTS playlists ( - id integer PRIMARY KEY, - image text, - last_updated text not null, - name text not null, - settings text, - trackhashes text - );""" - - insert_playlist_sql = """INSERT INTO playlists( - image, - last_updated, - name, - settings, - trackhashes - ) VALUES(:image, :last_updated, :name, :settings, :trackhashes) - """ - - cur.execute(select_playlists_sql) - - # load all playlists - playlists = cur.fetchall() - - # drop old playlists table - cur.execute("DROP TABLE playlists") - - # create new playlists table - cur.execute(create_playlist_table_sql) - - def transform_playlists(pipeline: Generator, playlists: tuple): - for playlist in playlists: - # create dict that matches the new schema - p = { - "id": playlist[0], - "name": playlist[5], - "image": playlist[3], - "trackhashes": playlist[6], - "last_updated": playlist[4], - "settings": json.dumps( - { - "has_gif": False, - "banner_pos": playlist[1], - "square_img": False, - "pinned": False, - } - ), - } - - pipeline.send(p) - - @coroutine - def insert_playlist(): - while True: - playlist = yield - p = OrderedDict(sorted(playlist.items())) - cur.execute(insert_playlist_sql, p) - - # insert playlists using a coroutine - # (my first coroutine) - pipeline = insert_playlist() - transform_playlists(pipeline, playlists) - pipeline.close() - - cur.close() - - -class m4_AddLastUpdatedToTrackTable(Migration): - """ - adds the last modified column to the tracks table. - """ - - @staticmethod - def migrate(): - # add last_mod column and default to current timestamp - timestamp = time.time() - sql = f"ALTER TABLE tracks ADD COLUMN last_mod text not null DEFAULT '{timestamp}'" - - with SQLiteManager() as cur: - try: - cur.execute(sql) - except OperationalError: - pass - - cur.close() - - -class m5_MovePlaylistsAndFavoritesTo10BitHashes(Migration): - """ - moves the playlists and favorites to 10 bit hashes. - """ - - @staticmethod - def migrate(): - def get_track_data_by_hash(trackhash: str, tracks: list[tuple]) -> tuple: - for track in tracks: - # trackhash is the 15th bit hash - if track[15] == trackhash: - # return artist, album, title - return track[4], track[1], track[13] - - def get_track_by_albumhash(albumhash: str, tracks: list[tuple]) -> tuple: - for track in tracks: - # albumhash is the 3rd bit hash - if track[3] == albumhash: - # return album, albumartist - return track[1], track[2] - - _base = "SELECT * FROM" - fetch_playlists_sql = f"{_base} playlists" - fetch_tracks_sql = f"{_base} tracks" - - update_playlist_hashes_sql = ( - "UPDATE playlists SET trackhashes = :trackhashes WHERE id = :id" - ) - fetch_favorites_sql = f"{_base} favorites" - update_fav_sql = "UPDATE favorites SET hash = :hash WHERE id = :id" - remove_fav_sql = "DELETE FROM favorites WHERE id = :id" - - db_tracks = [] - - # read tracks from db - with SQLiteManager() as cur: - cur.execute(fetch_tracks_sql) - db_tracks.extend(cur.fetchall()) - cur.close() - - # update playlists - with SQLiteManager(userdata_db=True) as cur: - cur.execute(fetch_playlists_sql) - playlists = cur.fetchall() - - # for each playlist - for p in playlists: - pid = p[0] - - # load trackhashes - trackhashes: list[str] = json.loads(p[5]) - - for index, t in enumerate(trackhashes): - (artist, album, title) = get_track_data_by_hash(t, db_tracks) - - # create new hash - new_hash = create_hash(artist, album, title, decode=True, limit=10) - trackhashes[index] = new_hash - - # convert to string - trackhashes = json.dumps(trackhashes) - - # save to db - cur.execute( - update_playlist_hashes_sql, {"trackhashes": trackhashes, "id": pid} - ) - - cur.close() - - # update favorites - with SQLiteManager(userdata_db=True) as cur: - cur.execute(fetch_favorites_sql) - favorites = cur.fetchall() - - # for each favorite - for f in favorites: - fid = f[0] - - fhash: str = f[1] - ftype: str = f[2] # "track" || "album" - - if ftype == "album": - (album, albumartist) = get_track_by_albumhash(fhash, db_tracks) - - # create new hash - new_hash = create_hash(album, albumartist, decode=True, limit=10) - - # save to db - cur.execute(update_fav_sql, {"hash": new_hash, "id": fid}) - continue - - if ftype == "track": - (artist, album, title) = get_track_data_by_hash(fhash, db_tracks) - - # create new hash - new_hash = create_hash(artist, album, title, decode=True, limit=10) - - # save to db - cur.execute(update_fav_sql, {"hash": new_hash, "id": fid}) - continue - - # remove favorites that are not track or album. ie. artists - cur.execute(remove_fav_sql, {"id": fid}) - - cur.close() - - -class m6_RemoveAllTracks(Migration): - """ - removes all tracks from the tracks table. - """ - - @staticmethod - def migrate(): - sql = "DELETE FROM tracks" - - with SQLiteManager() as cur: - cur.execute(sql) - cur.close() - - -class m7_UpdateAppSettingsTable(Migration): - @staticmethod - def migrate(): - drop_table_sql = "DROP TABLE settings" - create_table_sql = """ - CREATE TABLE IF NOT EXISTS settings ( - id integer PRIMARY KEY, - root_dirs text NOT NULL, - exclude_dirs text, - artist_separators text NOT NULL default '/,;', - extract_feat integer NOT NULL DEFAULT 1, - remove_prod integer NOT NULL DEFAULT 1, - clean_album_title integer NOT NULL DEFAULT 1, - remove_remaster integer NOT NULL DEFAULT 1, - merge_albums integer NOT NULL DEFAULT 0, - show_albums_as_singles integer NOT NULL DEFAULT 0 - ); - """ - - with SQLiteManager(userdata_db=True) as cur: - cur.execute(drop_table_sql) - cur.execute(create_table_sql) diff --git a/app/migrations/v1_4_9/__init__.py b/app/migrations/v1_4_9/__init__.py deleted file mode 100644 index be10c97b..00000000 --- a/app/migrations/v1_4_9/__init__.py +++ /dev/null @@ -1,405 +0,0 @@ -import os -import shutil -import sqlite3 -from time import time -from app.db.sqlite.utils import SQLiteManager -from app.migrations.base import Migration -from app.settings import Paths - -import hashlib -from unidecode import unidecode - -from app.db.sqlite.tracks import SQLiteTrackMethods as tdb -from app.db.sqlite.playlists import SQLitePlaylistMethods as pdb -from app.db.sqlite.logger.tracks import SQLiteTrackLogger as ldb -from app.utils.hashing import create_hash - - -def create_sha256_hash(*args: str, decode=False, limit=10) -> str: - """ - This function creates a case-insensitive, non-alphanumeric chars ignoring hash from the given arguments. - - Example use case: - - Creating computable IDs for duplicate artists. eg. Juice WRLD and Juice Wrld should have the same ID. - - :param args: The arguments to hash. - :param decode: Whether to decode the arguments before hashing. - :param limit: The number of characters to return. - - :return: The hash. - """ - - def remove_non_alnum(token: str) -> str: - token = token.lower().strip().replace(" ", "") - t = "".join(t for t in token if t.isalnum()) - - if t == "": - return token - - return t - - str_ = "".join(remove_non_alnum(t) for t in args) - - if decode: - str_ = unidecode(str_) - - str_ = str_.encode("utf-8") - str_ = hashlib.sha256(str_).hexdigest() - return str_[-limit:] - - -def create_sha1_hash(*args: str, decode=False, limit=10) -> str: - """ - This function creates a case-insensitive, non-alphanumeric chars ignoring hash from the given arguments. - - Example use case: - - Creating computable IDs for duplicate artists. eg. Juice WRLD and Juice Wrld should have the same ID. - - :param args: The arguments to hash. - :param decode: Whether to decode the arguments before hashing. - :param limit: The number of characters to return. - - :return: The hash. - """ - - def remove_non_alnum(token: str) -> str: - token = token.lower().strip().replace(" ", "") - t = "".join(t for t in token if t.isalnum()) - - if t == "": - return token - - return t - - str_ = "".join(remove_non_alnum(t) for t in args) - - if decode: - str_ = unidecode(str_) - - str_ = str_.encode("utf-8") - str_ = hashlib.sha1(str_).hexdigest() - - return ( - str_[: limit // 2] + str_[-limit // 2 :] - if limit % 2 == 0 - else str_[: limit // 2] + str_[-limit // 2 - 1 :] - ) - - -class _1AddTimestampToFavoritesTable(Migration): - """ - Adds a timestamp column to the favorites table. - """ - - @staticmethod - def migrate(): - # INFO: add timestamp column with automatic current timestamp - sql = f"ALTER TABLE favorites ADD COLUMN timestamp INTEGER NOT NULL DEFAULT 0" - - # INFO: execute the sql - with SQLiteManager(userdata_db=True) as cur: - table_exists = cur.execute( - "select count(*) from pragma_table_info('favorites') where name = 'timestamp'" - ) - - table_exists = table_exists.fetchone() - - if table_exists[0] == 1: - return - - # INFO: Add the timestamp column to the favorites table - timestamp = int(time()) - cur.execute(sql) - cur.execute(f"UPDATE favorites SET timestamp = {timestamp}") - - -class _2DeleteOriginalThumbnails(Migration): - """ - Original thumbnails are too large and are not needed. - """ - - # TODO: Implement this migration - - @staticmethod - def migrate(): - imgpath = Paths.get_thumbs_path() - og_imgpath = os.path.join(imgpath, "original") - - if os.path.exists(og_imgpath): - shutil.rmtree(og_imgpath) - - -class _3MoveScrobbleToUserId1(Migration): - """ - Updates all track logs from user id = 0 to user id = 1 - """ - - @staticmethod - def migrate(): - sql = """ - UPDATE track_logger SET userid = 1 WHERE userid = 0; - ALTER TABLE track_logger RENAME TO _track_logger; - CREATE TABLE IF NOT EXISTS track_logger ( - id integer PRIMARY KEY, - trackhash text NOT NULL, - duration integer NOT NULL, - timestamp integer NOT NULL, - source text, - userid integer NOT NULL DEFAULT 1, - constraint fk_users foreign key (userid) references users(id) on delete cascade - ); - - INSERT INTO track_logger SELECT * FROM _track_logger; - DROP TABLE _track_logger; - """ - # INFO: Move the scrobble table to the user id 1 - with SQLiteManager(userdata_db=True) as cur: - cur.executescript(sql) - cur.close() - - -class _4AddUserIdToFavoritesTable(Migration): - """ - Adds a userid column to the favorites table. - """ - - @staticmethod - def migrate(): - # check if userid column exists - exists_sql = ( - "select count(*) from pragma_table_info('favorites') where name = 'userid'" - ) - sql = """ - ALTER TABLE favorites ADD userid INTEGER NOT NULL DEFAULT 1; - ALTER TABLE favorites RENAME TO _favorites; - - CREATE TABLE IF NOT EXISTS favorites ( - id integer PRIMARY KEY, - hash text not null, - type text not null, - timestamp integer not null default 0, - userid integer not null, - constraint fk_users foreign key (userid) references users(id) on delete cascade - ); - - INSERT INTO favorites SELECT * FROM _favorites; - DROP TABLE _favorites; - """ - - with SQLiteManager(userdata_db=True) as cur: - data = cur.execute(exists_sql) - data = data.fetchone() - - if data[0] == 1: - return # INFO: column already exists - - cur.executescript(sql) - - -class _5AddUserIdToPlaylistsTable(Migration): - """ - Adds a userid column to the playlists table. - """ - - @staticmethod - def migrate(): - # check if userid column exists - exists_sql = ( - "select count(*) from pragma_table_info('playlists') where name = 'userid'" - ) - - # Add the userid column to the playlists table - # Rename the old table to _playlists - # Create a new playlists table with the userid column - # Then, copy the data from the old table to the new table - # Finally, drop the old table - sql = """ - ALTER TABLE playlists ADD userid INTEGER NOT NULL DEFAULT 1; - ALTER TABLE playlists RENAME TO _playlists; - CREATE TABLE IF NOT EXISTS playlists ( - id integer PRIMARY KEY, - image text, - last_updated text not null, - name text not null, - settings text, - trackhashes text, - userid integer not null, - constraint fk_users foreign key (userid) references users(id) on delete cascade - ); - - INSERT INTO playlists SELECT * FROM _playlists; - DROP TABLE _playlists; - """ - - with SQLiteManager(userdata_db=True) as cur: - # INFO: Check if the column already exists - data = cur.execute(exists_sql) - data = data.fetchone() - - # INFO: If the column already exists, return - if data[0] == 1: - return # INFO: column already exists - - # INFO: Execute the sql - cur.executescript(sql) - - -class _6MoveHashesToSha1(Migration): - """ - Moves the 10 bit item hashes from sha256 to sha1 which is - faster and more lenient on less powerful devices. - - Thanks to [@tcsenpai](https:github.com/tcsenpai) for the contribution. - """ - - # enabled: bool = False - - # pass - - # INFO: Apparentlly, every single table is affected by this migration. - # NOTE: Use generators to avoid memory issues. - - @classmethod - def port_track(cls, trackhash: str): - # get the track with the track hash - track = tdb.get_track_by_trackhash(trackhash) - - if track is None: - return - - title = track.og_title - if track.trackhash != trackhash: - # raise ValueError("Track hash mismatch") - print("Track hash mismatch") - title = track.title - else: - print("Porting track: ", track.title) - - # return the new hash - finalhash = create_sha1_hash( - ", ".join(a.name for a in track.artists), - track.og_album, - title, - ) - - if finalhash != create_hash( - ", ".join(a.name for a in track.artists), track.og_album, title - ): - raise ValueError("Hash mismatch") - - @classmethod - def port_album(cls, albumhash: str): - # get the first track with the album hash - track = tdb.get_track_by_albumhash(albumhash) - - if track is None: - return - - # return the new hash - return create_sha1_hash( - track.og_album, - ", ".join(a.name for a in track.albumartists), - ) - - @classmethod - def port_artist(cls, artisthash: str): - # find all tracks with the artist hash - tracks = [t for t in cls.tracks if artisthash in t.artist_hashes] - - if len(tracks) == 0: - return - - # find the artist name - artist = [ - a.name - for a in tracks[0].artists - if create_sha256_hash(a.name, decode=True) == artisthash - ][0] - - # return the new hash - return create_sha1_hash(artist, decode=True) - - @classmethod - def migrate_favorites(cls): - with SQLiteManager(userdata_db=True) as cur: - # read all favorites - data = cur.execute("SELECT * FROM favorites") - data = data.fetchall() - - for track in cls.tracks: - track.artist_hashes = "-".join( - [create_sha256_hash(a.name, decode=True) for a in track.artists] - ) - - for entry in data: - # hash is the 2nd column in the table - hash = entry[1] - - # entry type is the 3rd column in the table - if entry[2] == "track": - newhash = cls.port_track(hash) - - if newhash: - cur.execute( - f"UPDATE favorites SET hash = '{newhash}' WHERE hash = '{hash}' AND type = 'track'" - ) - - elif entry[2] == "album": - newhash = cls.port_album(hash) - - if newhash: - cur.execute( - f"UPDATE favorites SET hash = '{newhash}' WHERE hash = '{hash}' AND type = 'album'" - ) - - elif entry[2] == "artist": - newhash = cls.port_artist(hash) - - if newhash: - cur.execute( - f"UPDATE favorites SET hash = '{newhash}' WHERE hash = '{hash}' AND type = 'artist'" - ) - - @classmethod - def migrate_playlists(cls): - playlists = pdb.get_all_playlists() - - for playlist in playlists: - # remove previous hashes - to_remove = [ - {"trackhash": trackhash, "index": index} - for index, trackhash in enumerate(playlist.trackhashes) - ] - pdb.remove_tracks_from_playlist(playlist.id, to_remove) - - # add new hashes - newhashes = [ - cls.port_track(trackhash) for trackhash in playlist.trackhashes - ] - newhashes = [h for h in newhashes if h is not None] - pdb.add_tracks_to_playlist(playlist.id, newhashes) - - print("Ported playlist: ", playlist.name) - print("Total tracks: ", len(newhashes)) - - @classmethod - def migrate_scrobble(cls): - # read all logs - logs = ldb.get_all() - - with SQLiteManager(userdata_db=True) as cur: - # for each log, port the hash - for log in logs: - newhash = cls.port_track(log[1]) - - if newhash: - cur.execute( - f"UPDATE track_logger SET trackhash = '{newhash}' WHERE trackhash = '{log[1]}'" - ) - - @classmethod - def migrate(cls): - cls.tracks = list(tdb.get_all_tracks()) - cls.migrate_favorites() - # cls.migrate_playlists() - # cls.migrate_scrobble() diff --git a/app/plugins/lyrics.py b/app/plugins/lyrics.py index 413ad766..05c16005 100644 --- a/app/plugins/lyrics.py +++ b/app/plugins/lyrics.py @@ -7,7 +7,7 @@ from typing import List, Optional import requests from unidecode import unidecode -from app.db.sqlite.plugins import PluginsMethods +from app.db.userdata import PluginTable from app.plugins import Plugin, plugin_method from app.settings import Paths @@ -190,15 +190,13 @@ class LyricsProvider(LRCProvider): class Lyrics(Plugin): def __init__(self) -> None: - plugin = PluginsMethods.get_plugin_by_name("lyrics_finder") + plugin = PluginTable.get_by_name("lyrics_finder") if not plugin: return name = plugin.name - description = plugin.description - - super().__init__(name, description) + super().__init__(name, "Musixmatch lyrics finder") self.provider = LyricsProvider() diff --git a/app/utils/auth.py b/app/utils/auth.py index 983cd83f..2aaaf060 100644 --- a/app/utils/auth.py +++ b/app/utils/auth.py @@ -14,7 +14,6 @@ def hash_password(password: str) -> str: :return: The hashed password. """ - return hashlib.pbkdf2_hmac( "sha256", password.encode("utf-8"), UserConfig().serverId.encode("utf-8"), 100000 ).hex() diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/sqlite/test_sqlite_actions.py b/tests/sqlite/test_sqlite_actions.py deleted file mode 100644 index 141d5a0d..00000000 --- a/tests/sqlite/test_sqlite_actions.py +++ /dev/null @@ -1,59 +0,0 @@ -import json -import sqlite3 -import os -from app.db.sqlite.artistcolors import SQLiteArtistMethods -from app.db.sqlite.queries import CREATE_APPDB_TABLES - -from app.db.sqlite.utils import SQLiteManager - -db_path = "test.db" - - -def test_sqlite_manager(): - with SQLiteManager(test_db_path=db_path) as cur: - for query in CREATE_APPDB_TABLES.split(";"): - cur.execute(query) - - cur.execute( - "INSERT INTO tracks (album, albumartist, albumhash, artist, bitrate, copyright, date, disc, duration, filepath, folder, genre, last_mod, title, track, trackhash) VALUES ('Dummy Album', 'Dummy Album Artist', 'dummyalbumhash', 'Dummy Artist', 320, 'Dummy Copyright', 1630454400, 1, 180, '/path/to/dummy/file.mp3', '/path/to/dummy/folder', 'Dummy Genre', 1630454400.5, 'Dummy Title', 1, 'dummytrackhash');" - ) - - cur.execute("SELECT * FROM tracks") - result = cur.fetchone() - assert result[7] == 1630454400 - - # Test using a connection - with SQLiteManager(conn=sqlite3.connect(db_path)) as cur: - cur.execute("SELECT * FROM tracks") - result = cur.fetchone() - assert result[7] == 1630454400 - - -def test_insert_one_artist(): - color1 = "rgb(0, 0, 0)" - color2 = "rgb(255, 255, 255)" - - with SQLiteManager(test_db_path=db_path) as cur: - SQLiteArtistMethods.insert_one_artist(cur, "artisthash1", [color1, color2]) - cur.execute("SELECT * FROM artists WHERE artisthash=?", ("artisthash1",)) - - result = cur.fetchone() - assert result[1:] == ("artisthash1", json.dumps([color1, color2]), None) - - -def test_get_all_artists(): - with SQLiteManager(test_db_path=db_path) as cur: - artists = SQLiteArtistMethods.get_all_artists(cur) - - # assert that that the generator is not empty and that for each tuple has 4 elements - - try: - while True: - artist = next(artists) - assert len(artist) == 4 - except StopIteration: - pass - - -def test_remove_test_db(): - os.remove(db_path) diff --git a/tests/test_utils.py b/tests/test_utils.py deleted file mode 100644 index 01c72d13..00000000 --- a/tests/test_utils.py +++ /dev/null @@ -1,34 +0,0 @@ -# from hypothesis import given -from app.utils.parsers import parse_feat_from_title - - -def test_extract_featured_artists_from_title(): - test_titles = [ - "Own it (Featuring Ed Sheeran & Stormzy)", - "Own it (Featuring Ed Sheeran and Stormzy)", - "Autograph (On my line)(Feat. Lil Peep)(Deluxe)", - "Why so sad? (with Juice Wrld, Lil Peep)", - "Why so sad? (with Juice Wrld/Lil Peep)", - "Simmer (with Burna Boy)", - "Simmer (without Burna Boy)", - ] - - results = [ - ["Ed Sheeran", "Stormzy"], - ["Ed Sheeran", "Stormzy"], - ["Lil Peep"], - ["Juice Wrld", "Lil Peep"], - ["Juice Wrld", "Lil Peep"], - ["Burna Boy"], - [], - ] - - for title, expected in zip(test_titles, results): - assert parse_feat_from_title(title)[0] == expected - - -# === HYPOTHESIS GHOSTWRITER TESTS === - -# @given(__dir=st.text(), full=st.booleans()) -# def test_fuzz_run_fast_scandir(__dir: str, full) -> None: -# app.utils.run_fast_scandir(_dir=__dir, full=full)