remove deprecated db mappings

+ fix: cli password reset
+ delete old migrations
This commit is contained in:
cwilvx
2024-08-04 10:19:11 +03:00
parent 04946831ce
commit c77d0927c7
31 changed files with 40 additions and 2111 deletions
+1
View File
@@ -56,3 +56,4 @@
- Duplicates on search
- Audio stops on ending
- Port account settings to config on the frontend
-2
View File
@@ -21,9 +21,7 @@ from app.utils.hashing import create_hash
from app.lib.albumslib import sort_by_track_no
from app.serializers.album import serialize_for_card_many
from app.serializers.track import serialize_tracks
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
check_is_fav = favdb.check_is_favorite
bp_tag = Tag(name="Album", description="Single album")
api = APIBlueprint("album", __name__, url_prefix="/album", abp_tags=[bp_tag])
+5 -10
View File
@@ -1,10 +1,8 @@
from flask import Blueprint, request
from flask_openapi3 import Tag
from flask_openapi3 import APIBlueprint
from pydantic import BaseModel, Field
from app.api.auth import admin_required
from app.db.sqlite.plugins import PluginsMethods
from app.db.userdata import PluginTable
bp_tag = Tag(name="Plugins", description="Manage plugins")
api = APIBlueprint("plugins", __name__, url_prefix="/plugins", abp_tags=[bp_tag])
@@ -15,8 +13,7 @@ def get_all_plugins():
"""
List all plugins
"""
plugins = PluginsMethods.get_all_plugins()
plugins = PluginTable.get_all()
return {"plugins": plugins}
@@ -37,9 +34,7 @@ def activate_deactivate_plugin(body: PluginActivateBody):
Activate/Deactivate plugin
"""
name = body.plugin
active = 1 if body.active else 0
PluginsMethods.plugin_set_active(name, active)
PluginTable.activate(name, body.active)
return {"message": "OK"}, 200
@@ -62,7 +57,7 @@ def update_plugin_settings(body: PluginSettingsBody):
if not plugin or not settings:
return {"error": "Missing plugin or settings"}, 400
PluginsMethods.update_plugin_settings(plugin_name=plugin, settings=settings)
plugin = PluginsMethods.get_plugin_by_name(plugin)
PluginTable.update_settings(plugin, settings)
plugin = PluginTable.get_by_name(plugin)
return {"status": "success", "settings": plugin.settings}
+6 -9
View File
@@ -1,18 +1,14 @@
from dataclasses import asdict
from typing import Any
from flask import request
from flask_openapi3 import Tag
from flask_openapi3 import APIBlueprint
from pydantic import BaseModel, Field
from app.api.auth import admin_required
from app.db.sqlite.plugins import PluginsMethods as pdb
from app.db.sqlite.tracks import SQLiteTrackMethods as trackdb
from app.db.userdata import PluginTable
from app.lib.index import index_everything
from app.lib.watchdogg import Watcher as WatchDog
from app.logger import log
from app.settings import Info, Paths, SessionVarKeys
from app.settings import Info, SessionVarKeys
from app.store.albums import AlbumStore
from app.store.artists import ArtistStore
from app.store.tracks import TrackStore
@@ -49,6 +45,7 @@ def reload_everything(instance_key: str):
except Exception as e:
log.error(e)
# CHECKPOINT: TEST SETTINGS API ENDPOINTS
# @background
@@ -217,10 +214,6 @@ def set_setting(body: SetSettingBody):
if key not in mapp:
return {"msg": "Invalid key!"}, 400
sdb.set_setting(key, value)
flag = mapp[key]
if key == "artist_separators":
value = str(value).split(",")
value = set(value)
@@ -269,7 +262,11 @@ def update_config(body: UpdateConfigBody):
Update the config file
"""
config = UserConfig()
if body.key == "artistSeparators":
body.value = body.value.split(",")
setattr(config, body.key, body.value)
print(getattr(config, body.key))
return {
"msg": "Config updated!",
+3 -8
View File
@@ -13,11 +13,11 @@ from app.config import UserConfig
from app.db.userdata import UserTable
from app.logger import log
from app.print_help import HELP_MESSAGE
from app.setup.sqlite import setup_sqlite
from app.utils.auth import hash_password
from app.utils.paths import getFlaskOpenApiPath
from app.utils.xdg_utils import get_xdg_config_dir
from app.utils.wintools import is_windows
from app.db.sqlite.auth import SQLiteAuthMethods as authdb
ALLARGS = settings.ALLARGS
ARGS = sys.argv[1:]
@@ -209,6 +209,7 @@ class ProcessArgs:
if ALLARGS.pswd in ARGS:
print("SWING MUSIC v2.0.0 ")
print("PASSWORD RECOVERY \n")
setup_sqlite()
username: str = ""
password: str = ""
@@ -221,7 +222,6 @@ class ProcessArgs:
sys.exit(0)
username = username.strip()
# user = authdb.get_user_by_username(username)
user = UserTable.get_by_username(username)
if not user:
@@ -235,11 +235,6 @@ class ProcessArgs:
print("\nOperation cancelled! Exiting ...")
sys.exit(0)
password = hash_password(password)
# user = authdb.update_user({"id": user.id, "password": password})
UserTable.update_one({
"id": user.id,
"password": password
})
UserTable.update_one({"id": user.id, "password": hash_password(password)})
sys.exit(0)
+2 -1
View File
@@ -36,7 +36,8 @@ class UserConfig:
# misc
enablePeriodicScans: bool = False
scanInterval: int = 60 * 10 # 10 minutes
scanInterval: int = 10
enableWatchdog: bool = False
# plugins
enablePlugins: bool = True
-18
View File
@@ -1,21 +1,3 @@
"""
This module contains the functions to interact with the SQLite database.
"""
import sqlite3
from sqlite3 import Connection as SqlConn
def create_connection(db_file: str) -> SqlConn:
"""
Creates a connection to the specified database.
"""
conn = sqlite3.connect(db_file)
return conn
def create_tables(conn: SqlConn, sql_query: str):
"""
Executes the specifiend SQL file to create database tables.
"""
conn.executescript(sql_query)
-66
View File
@@ -1,66 +0,0 @@
from sqlite3 import Cursor
from .utils import SQLiteManager, tuples_to_albums
class SQLiteAlbumMethods:
@classmethod
def insert_one_album(cls, cur: Cursor, albumhash: str, colors: str):
"""
Inserts one album into the database
"""
sql = """INSERT OR REPLACE INTO albums(
albumhash,
colors
) VALUES(?,?)
"""
cur.execute(sql, (albumhash, colors))
lastrowid = cur.lastrowid
return lastrowid
@classmethod
def get_all_albums(cls):
with SQLiteManager() as cur:
cur.execute("SELECT * FROM albums")
albums = cur.fetchall()
cur.close()
if albums is not None:
return albums
return []
@staticmethod
def get_albums_by_albumartist(albumartist: str):
with SQLiteManager() as cur:
cur.execute("SELECT * FROM albums WHERE albumartist=?", (albumartist,))
albums = cur.fetchall()
cur.close()
if albums is not None:
return tuples_to_albums(albums)
return []
@staticmethod
def exists(albumhash: str, cur: Cursor = None):
"""
Checks if an album exists in the database.
"""
sql = "SELECT COUNT(1) FROM albums WHERE albumhash = ?"
def _exists(cur: Cursor):
cur.execute(sql, (albumhash,))
count = cur.fetchone()[0]
return count != 0
if cur:
return _exists(cur)
with SQLiteManager() as cur:
return _exists(cur)
-64
View File
@@ -1,64 +0,0 @@
"""
Contains methods for reading and writing to the sqlite artists database.
"""
import json
from sqlite3 import Cursor
from .utils import SQLiteManager
class SQLiteArtistMethods:
@staticmethod
def insert_one_artist(cur: Cursor, artisthash: str, colors: list[str]):
"""
Inserts a single artist into the database.
"""
sql = """INSERT OR REPLACE INTO artists(
artisthash,
colors
) VALUES(?,?)
"""
colors = json.dumps(colors)
cur.execute(sql, (artisthash, colors))
@staticmethod
def get_all_artists(cur_: Cursor = None):
"""
Get all artists from the database and return a generator of Artist objects
"""
sql = """SELECT * FROM artists"""
if not cur_:
with SQLiteManager() as cur:
cur.execute(sql)
for artist in cur.fetchall():
yield artist
cur.close()
else:
cur_.execute(sql)
for artist in cur_.fetchall():
yield artist
@staticmethod
def exists(artisthash: str, cur: Cursor = None):
"""
Checks if an artist exists in the database.
"""
sql = "SELECT COUNT(1) FROM artists WHERE artisthash = ?"
def _exists(cur: Cursor):
cur.execute(sql, (artisthash,))
count = cur.fetchone()[0]
return count != 0
if cur:
return _exists(cur)
with SQLiteManager() as cur:
return _exists(cur)
-144
View File
@@ -1,144 +0,0 @@
import json
from app.models.user import User
from app.utils.auth import hash_password
from app.db.sqlite.utils import SQLiteManager
class SQLiteAuthMethods:
"""
Methods for authenticating users.
"""
@staticmethod
def insert_user(user: dict[str, str]):
"""
Insert a user into the database.
:param user: A dict with the username, password and roles.
"""
sql = """INSERT INTO users(
username,
password,
roles
) VALUES(:username, :password, :roles)
"""
user_tuple = tuple(user.values())
with SQLiteManager(userdata_db=True) as cur:
cur = cur.execute(sql, user_tuple)
userid = cur.lastrowid
return userid
# if userid:
# # sleep
# user = SQLiteAuthMethods.get_user_by_id(userid).todict_simplified()
# cur.close()
# return user
raise Exception(f"Failed to insert user: {user}")
@staticmethod
def insert_default_user():
"""
Inserts the default admin user.
"""
user = {
"username": "admin",
"password": hash_password("admin"),
"roles": json.dumps(["admin"]),
}
return SQLiteAuthMethods.insert_user(user)
@staticmethod
def insert_guest_user():
"""
Inserts the default guest user.
"""
user = {
"username": "guest",
"password": hash_password("guest"),
"roles": json.dumps(["guest"]),
}
return SQLiteAuthMethods.insert_user(user)
@staticmethod
def update_user(user: dict[str, str]):
"""
Update a user in the database.
:param user: A dict with the user id and the fields to update. Ommited fields will not be updated.
"""
# get all user dict keys
keys = list(user.keys())
sql = f"""UPDATE users SET
{', '.join([f"{key} = :{key}" for key in keys if key != 'id'])}
WHERE id = :id
"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, user)
cur.close()
return SQLiteAuthMethods.get_user_by_id(user["id"]).todict()
@staticmethod
def get_all_users():
"""
Check if there are any users in the database.
"""
sql = "SELECT * FROM users"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql)
data = cur.fetchall()
cur.close()
return [User(*user) for user in data]
@staticmethod
def get_user_by_username(username: str):
"""
Get a user by username.
"""
sql = "SELECT * FROM users WHERE username = ?"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (username,))
data = cur.fetchone()
cur.close()
if data is not None:
return User(*data)
return None
@staticmethod
def get_user_by_id(userid: int):
"""
Get a user by id.
"""
sql = "SELECT * FROM users WHERE id = ?"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (userid,))
data = cur.fetchone()
cur.close()
if data is not None:
return User(*data)
return None
@staticmethod
def delete_user_by_username(username: str):
"""
Delete a user by username.
"""
sql = "DELETE FROM users WHERE id = ?"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (3,))
cur.close()
-121
View File
@@ -1,121 +0,0 @@
from datetime import datetime
from flask_jwt_extended import current_user
from app.models import FavType
from .utils import SQLiteManager
class SQLiteFavoriteMethods:
"""THis class contains methods for interacting with the favorites table."""
@classmethod
def check_is_favorite(cls, itemhash: str, fav_type: str):
"""
Checks if an item is favorited.
"""
userid = current_user["id"]
sql = """SELECT * FROM favorites WHERE hash = ? AND type = ? AND userid = ?"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (itemhash, fav_type, userid))
item = cur.fetchone()
cur.close()
return item is not None
@classmethod
def insert_one_favorite(cls, fav_type: str, fav_hash: str):
"""
Inserts a single favorite into the database.
"""
# try to find the favorite in the database, if it exists, don't insert it
if cls.check_is_favorite(fav_hash, fav_type):
return
sql = """INSERT INTO favorites(type, hash, timestamp, userid) VALUES(?,?,?,?)"""
current_timestamp = int(datetime.now().timestamp())
with SQLiteManager(userdata_db=True) as cur:
userid = current_user["id"]
cur.execute(sql, (fav_type, fav_hash, current_timestamp, userid))
cur.close()
@classmethod
def get_all(cls) -> list[tuple]:
"""
Returns a list of all favorites.
"""
sql = """SELECT * FROM favorites WHERE userid = ?"""
with SQLiteManager(userdata_db=True) as cur:
userid = current_user["id"]
cur.execute(sql, (userid,))
favs = cur.fetchall()
cur.close()
return [fav for fav in favs if fav[1] != ""]
@classmethod
def get_favorites(cls, fav_type: str, userid: int = None) -> list[tuple]:
"""
Returns a list of favorite tracks.
If userid is None, all favorites are returned.
"""
sql = """SELECT * FROM favorites WHERE type = ?"""
params = (fav_type,)
if not userid:
sql += " AND userid = ?"
params = (fav_type, userid)
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, params)
all_favs = cur.fetchall()
cur.close()
return all_favs
@classmethod
def get_fav_tracks(cls, userid: int = None) -> list[tuple]:
"""
Returns a list of favorite tracks.
"""
return cls.get_favorites(FavType.track, userid)
@classmethod
def get_fav_albums(cls) -> list[tuple]:
"""
Returns a list of favorite albums.
"""
userid = current_user["id"]
return cls.get_favorites(FavType.album, userid)
@classmethod
def get_fav_artists(cls) -> list[tuple]:
"""
Returns a list of favorite artists.
"""
userid = current_user["id"]
return cls.get_favorites(FavType.artist, userid)
@classmethod
def delete_favorite(cls, fav_type: str, fav_hash: str):
"""
Deletes a favorite from the database.
"""
sql = """DELETE FROM favorites WHERE hash = ? AND type = ? AND userid = ?"""
with SQLiteManager(userdata_db=True) as cur:
userid = current_user["id"]
cur.execute(sql, (fav_hash, fav_type, userid))
cur.close()
@classmethod
def get_track_count(cls) -> int:
"""
Returns the number of favorite tracks.
"""
sql = """SELECT COUNT(*) FROM favorites WHERE type = ? AND userid = ?"""
with SQLiteManager(userdata_db=True) as cur:
userid = current_user["id"]
cur.execute(sql, (FavType.track, userid))
count = cur.fetchone()[0]
cur.close()
return count
View File
-62
View File
@@ -1,62 +0,0 @@
from app.models.lastfm import SimilarArtist
from ..utils import SQLiteManager
class SQLiteLastFMSimilarArtists:
"""
This class contains methods for interacting with the lastfm_similar_artists table.
"""
@classmethod
def insert_one(cls, artist: SimilarArtist):
"""
Inserts a single artist into the database.
"""
sql = """INSERT OR REPLACE INTO lastfm_similar_artists(artisthash, similar_artists) VALUES(?,?)"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (artist.artisthash, artist.similar_artists))
cur.close()
@classmethod
def get_similar_artists_for(cls, artisthash: str):
"""
Returns a list of similar artists.
"""
sql = """SELECT * FROM lastfm_similar_artists WHERE artisthash = ?"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (artisthash,))
similar_artists = cur.fetchone()
cur.close()
if similar_artists is None:
return None
return SimilarArtist(artisthash, similar_artists[2])
@classmethod
def get_all(cls):
"""
Returns a list of all similar artists.
"""
sql = """SELECT * FROM lastfm_similar_artists"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql)
similar_artists = cur.fetchall()
cur.close()
for a in similar_artists:
yield SimilarArtist(a[1], a[2])
@classmethod
def exists(cls, artisthash: str):
"""
Checks if an artist exists in the database by counting the number of rows
"""
sql = """SELECT COUNT(*) FROM lastfm_similar_artists WHERE artisthash = ?"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (artisthash,))
count = cur.fetchone()[0]
cur.close()
return count > 0
-59
View File
@@ -1,59 +0,0 @@
from flask_jwt_extended import current_user
from app.db.sqlite.utils import SQLiteManager
from app.models.logger import TrackLog as TrackLog
from app.utils.auth import get_current_userid
class SQLiteTrackLogger:
@classmethod
def insert_track(cls, trackhash: str, duration: int, source: str, timestamp: int):
"""
Inserts a track play record into the database
"""
userid = get_current_userid()
with SQLiteManager(userdata_db=True) as cur:
sql = """INSERT OR REPLACE INTO track_logger(
trackhash,
duration,
timestamp,
source,
userid
) VALUES(?,?,?,?,?)
"""
cur.execute(
sql, (trackhash, duration, timestamp, source, userid)
)
lastrowid = cur.lastrowid
return lastrowid
@classmethod
def get_all(cls):
"""
Returns all track play records from the database
"""
with SQLiteManager(userdata_db=True) as cur:
userid = get_current_userid()
sql = f"""SELECT * FROM track_logger WHERE userid = {userid} ORDER BY timestamp DESC"""
cur.execute(sql)
rows = cur.fetchall()
return rows
@classmethod
def get_recently_played(cls, start: int = 0, limit: int = 100):
"""
Returns a list of recently played tracks
"""
with SQLiteManager(userdata_db=True) as cur:
sql = f"""SELECT * FROM track_logger WHERE userid = {current_user['id']} ORDER BY timestamp DESC LIMIT ?,?"""
cur.execute(sql, (start, limit))
rows = cur.fetchall()
return [TrackLog(*row) for row in rows]
-228
View File
@@ -1,228 +0,0 @@
import json
from collections import OrderedDict
from flask_jwt_extended import current_user
from app.db.sqlite.utils import SQLiteManager, tuple_to_playlist, tuples_to_playlists
from app.utils.dates import create_new_date
class SQLitePlaylistMethods:
"""
This class contains methods for interacting with the playlists table.
"""
@staticmethod
def update_last_updated(playlist_id: int):
"""Updates the last updated date of a playlist."""
sql = """UPDATE playlists SET last_updated = ? WHERE id = ?"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (create_new_date(), playlist_id))
@staticmethod
def insert_one_playlist(playlist: dict):
# banner_pos,
# has_gif,
sql = """INSERT INTO playlists(
image,
last_updated,
name,
settings,
trackhashes,
userid
) VALUES(:image, :last_updated, :name, :settings, :trackhashes, :userid)
"""
playlist["userid"] = current_user["id"]
playlist = OrderedDict(sorted(playlist.items()))
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, playlist)
pid = cur.lastrowid
cur.close()
p_tuple = (pid, *playlist.values())
return tuple_to_playlist(p_tuple)
@staticmethod
def count_playlist_by_name(name: str):
sql = f"SELECT COUNT(*) FROM playlists WHERE name = ? and userid = {current_user['id']}"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (name,))
data = cur.fetchone()
cur.close()
return int(data[0])
@staticmethod
def get_all_playlists():
with SQLiteManager(userdata_db=True) as cur:
userid = 1
try:
userid = current_user["id"]
except RuntimeError:
# Catch this error raised during migration execution
pass
cur.execute(f"SELECT * FROM playlists WHERE userid = {userid}")
playlists = cur.fetchall()
cur.close()
if playlists is not None:
return tuples_to_playlists(playlists)
return []
@staticmethod
def get_playlist_by_id(playlist_id: int):
sql = f"SELECT * FROM playlists WHERE id = ? and userid = {current_user['id']}"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (playlist_id,))
data = cur.fetchone()
cur.close()
if data is not None:
return tuple_to_playlist(data)
return None
# FIXME: Extract the "add_track_to_playlist" method to use it for both the artisthash and trackhash lists.
@classmethod
def add_item_to_json_list(cls, playlist_id: int, field: str, items: set[str]):
"""
Adds a string item to a json dumped list using a playlist id and field name.
Takes the playlist ID, a field name, an item to add to the field.
"""
userid = 1
try:
userid = current_user["id"]
except RuntimeError:
# Catch this error raised during migration execution
pass
sql = f"SELECT {field} FROM playlists WHERE id = ? and userid = {userid}"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (playlist_id,))
data = cur.fetchone()
if data is not None:
db_items: list[str] = json.loads(data[0])
# Remove duplicates, without changing the order.
for item in items:
if item in db_items:
items.remove(item)
db_items.extend(items)
sql = f"UPDATE playlists SET {field} = ? WHERE id = ?"
cur.execute(sql, (json.dumps(db_items), playlist_id))
return len(items)
cls.update_last_updated(playlist_id)
@classmethod
def add_tracks_to_playlist(cls, playlist_id: int, trackhashes: list[str]):
"""
Adds trackhashes to a playlist
"""
return cls.add_item_to_json_list(playlist_id, "trackhashes", trackhashes)
@classmethod
def update_playlist(cls, playlist_id: int, playlist: dict):
sql = f"""UPDATE playlists SET
image = ?,
last_updated = ?,
name = ?,
settings = ?
WHERE id = ? and userid = {current_user['id']}
"""
del playlist["id"]
del playlist["trackhashes"]
playlist["settings"] = json.dumps(playlist["settings"])
playlist = OrderedDict(sorted(playlist.items()))
params = (*playlist.values(), playlist_id)
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, params)
cls.update_last_updated(playlist_id)
@classmethod
def update_settings(cls, playlist_id: int, settings: dict):
sql = f"""UPDATE playlists SET settings = ? WHERE id = ? and userid = {current_user['id']}"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (json.dumps(settings), playlist_id))
cls.update_last_updated(playlist_id)
@staticmethod
def delete_playlist(pid: str):
sql = f"DELETE FROM playlists WHERE id = ? and userid = {current_user['id']}"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (pid,))
@staticmethod
def remove_banner(playlistid: int):
sql = f"""UPDATE playlists SET image = NULL WHERE id = ? and userid = {current_user['id']}"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (playlistid,))
@classmethod
def remove_tracks_from_playlist(cls, playlistid: int, tracks: list[dict[str, int]]):
"""
Removes tracks from a playlist by trackhash and position.
"""
sql = """UPDATE playlists SET trackhashes = ? WHERE id = ?"""
userid = 1
try:
userid = current_user["id"]
except RuntimeError:
# Catch this error raised during migration execution
pass
with SQLiteManager(userdata_db=True) as cur:
cur.execute(
f"SELECT trackhashes FROM playlists WHERE id = ? and userid = {userid}",
(playlistid,),
)
data = cur.fetchone()
if data is None:
return
trackhashes: list[str] = json.loads(data[0])
to_remove = []
for track in tracks:
# {
# trackhash: str;
# index: int;
# }
index = trackhashes.index(track["trackhash"])
if index == track["index"]:
to_remove.append(track["trackhash"])
for trackhash in to_remove:
trackhashes.remove(trackhash)
cur.execute(sql, (json.dumps(trackhashes), playlistid))
cls.update_last_updated(playlistid)
-83
View File
@@ -1,83 +0,0 @@
import json
from app.models.plugins import Plugin
from ..utils import SQLiteManager
def plugin_tuple_to_obj(plugin_tuple: tuple) -> Plugin:
return Plugin(
name=plugin_tuple[1],
active=bool(plugin_tuple[3]),
settings=json.loads(plugin_tuple[4]),
)
class PluginsMethods:
@classmethod
def insert_plugin(cls, plugin: Plugin):
"""
Inserts one plugin into the database
"""
sql = """INSERT OR IGNORE INTO plugins(
name,
description,
active,
settings
) VALUES(?,?,?,?)
"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(
sql,
(
plugin.name,
plugin.description,
int(plugin.active),
json.dumps(plugin.settings),
),
)
lastrowid = cur.lastrowid
return lastrowid
@classmethod
def get_all_plugins(cls):
with SQLiteManager(userdata_db=True) as cur:
cur.execute("SELECT * FROM plugins")
plugins = cur.fetchall()
cur.close()
if plugins is not None:
return [plugin_tuple_to_obj(plugin) for plugin in plugins]
return []
@classmethod
def plugin_set_active(cls, name: str, active: int):
with SQLiteManager(userdata_db=True) as cur:
cur.execute("UPDATE plugins SET active=? WHERE name=?", (active, name))
cur.close()
@classmethod
def update_plugin_settings(cls, plugin_name: str, settings: dict):
with SQLiteManager(userdata_db=True) as cur:
cur.execute(
"UPDATE plugins SET settings=? WHERE name=?",
(json.dumps(settings), plugin_name),
)
cur.close()
@classmethod
def get_plugin_by_name(cls, name: str):
with SQLiteManager(userdata_db=True) as cur:
cur.execute("SELECT * FROM plugins WHERE name=?", (name,))
plugin = cur.fetchone()
cur.close()
if plugin is not None:
return plugin_tuple_to_obj(plugin)
return None
-123
View File
@@ -1,123 +0,0 @@
"""
This file contains the SQL queries to create the database tables.
"""
CREATE_USERDATA_TABLES = """
CREATE TABLE IF NOT EXISTS playlists (
id integer PRIMARY KEY,
image text,
last_updated text not null,
name text not null,
settings text,
trackhashes text,
userid integer not null,
constraint fk_users foreign key (userid) references users(id) on delete cascade
);
CREATE TABLE IF NOT EXISTS settings (
id integer PRIMARY KEY,
root_dirs text NOT NULL,
exclude_dirs text,
artist_separators text NOT NULL default '/,;',
extract_feat integer NOT NULL DEFAULT 1,
remove_prod integer NOT NULL DEFAULT 1,
clean_album_title integer NOT NULL DEFAULT 1,
remove_remaster integer NOT NULL DEFAULT 1,
merge_albums integer NOT NULL DEFAULT 0,
show_albums_as_singles integer NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS lastfm_similar_artists (
id integer PRIMARY KEY,
artisthash text NOT NULL,
similar_artists text NOT NULL,
UNIQUE (artisthash)
);
CREATE TABLE IF NOT EXISTS plugins (
id integer PRIMARY KEY,
name text NOT NULL UNIQUE,
description text NOT NULL,
active integer NOT NULL DEFAULT 0,
settings text
);
CREATE TABLE IF NOT EXISTS track_logger (
id integer PRIMARY KEY,
trackhash text NOT NULL,
duration integer NOT NULL,
timestamp integer NOT NULL,
source text,
userid integer NOT NULL DEFAULT 1,
constraint fk_users foreign key (userid) references users(id) on delete cascade
);
CREATE TABLE IF NOT EXISTS users (
id integer PRIMARY KEY,
username text NOT NULL UNIQUE,
firstname text,
lastname text,
password text NOT NULL,
email text,
image text,
roles text NOT NULL DEFAULT '["user"]'
)
"""
CREATE_APPDB_TABLES = """
CREATE TABLE IF NOT EXISTS tracks (
id integer PRIMARY KEY,
album text NOT NULL,
albumartist text NOT NULL,
albumhash text NOT NULL,
artist text NOT NULL,
bitrate integer NOT NULL,
copyright text,
date integer NOT NULL,
disc integer NOT NULL,
duration integer NOT NULL,
filepath text NOT NULL,
folder text NOT NULL,
genre text,
title text NOT NULL,
track integer NOT NULL,
trackhash text NOT NULL,
last_mod float NOT NULL,
UNIQUE (filepath)
);
CREATE TABLE IF NOT EXISTS albums (
id integer PRIMARY KEY,
albumhash text NOT NULL,
colors text NOT NULL,
UNIQUE (albumhash)
);
CREATE TABLE IF NOT EXISTS artists (
id integer PRIMARY KEY,
artisthash text NOT NULL,
colors text,
bio text,
UNIQUE (artisthash)
);
CREATE TABLE IF NOT EXISTS folders (
id integer PRIMARY KEY,
path text NOT NULL,
trackcount integer NOT NULL
);
"""
# changed from migrations to dbmigrations in v1.3.0
# to avoid conflicts with the previous migrations.
CREATE_MIGRATIONS_TABLE = """
CREATE TABLE IF NOT EXISTS dbmigrations (
id integer PRIMARY KEY,
version integer NOT NULL DEFAULT 0
);
INSERT INTO dbmigrations (version)
SELECT -1
WHERE NOT EXISTS (SELECT 1 FROM dbmigrations);
"""
-151
View File
@@ -1,151 +0,0 @@
from pprint import pprint
from typing import Any
from app.config import UserConfig
from app.db.sqlite.utils import SQLiteManager
from app.utils.wintools import win_replace_slash
# class SettingsSQLMethods:
# """
# Methods for interacting with the settings table.
# """
# @staticmethod
# def get_all_settings():
# """
# Gets all settings from the database.
# """
# sql = "SELECT * FROM settings WHERE id = 1"
# with SQLiteManager(userdata_db=True) as cur:
# cur.execute(sql)
# settings = cur.fetchone()
# cur.close()
# # if root_dirs not set
# if settings is None:
# return []
# # omit id, root_dirs, and exclude_dirs
# return settings[3:]
# @staticmethod
# def get_root_dirs() -> list[str]:
# """
# Gets custom root directories from the database.
# """
# sql = "SELECT root_dirs FROM settings"
# with SQLiteManager(userdata_db=True) as cur:
# cur.execute(sql)
# dirs = cur.fetchall()
# cur.close()
# dirs = [_dir[0] for _dir in dirs]
# return [win_replace_slash(d) for d in dirs]
# @staticmethod
# def add_root_dirs(dirs: list[str]):
# """
# Add custom root directories to the database.
# """
# sql = "INSERT INTO settings (root_dirs) VALUES (?)"
# existing_dirs = SettingsSQLMethods.get_root_dirs()
# dirs = [_dir for _dir in dirs if _dir not in existing_dirs]
# if len(dirs) == 0:
# return
# with SQLiteManager(userdata_db=True) as cur:
# for _dir in dirs:
# cur.execute(sql, (_dir,))
# @staticmethod
# def remove_root_dirs(dirs: list[str]):
# """
# Remove custom root directories from the database.
# """
# sql = "DELETE FROM settings WHERE root_dirs = ?"
# with SQLiteManager(userdata_db=True) as cur:
# for _dir in dirs:
# cur.execute(sql, (_dir,))
# # Not currently used anywhere, to be used later
# @staticmethod
# def add_excluded_dirs(dirs: list[str]):
# """
# Add custom exclude directories to the database.
# """
# sql = "INSERT INTO settings (exclude_dirs) VALUES (?)"
# with SQLiteManager(userdata_db=True) as cur:
# cur.executemany(sql, dirs)
# @staticmethod
# def remove_excluded_dirs(dirs: list[str]):
# """
# Remove custom exclude directories from the database.
# """
# sql = "DELETE FROM settings WHERE exclude_dirs = ?"
# with SQLiteManager(userdata_db=True) as cur:
# cur.executemany(sql, dirs)
# @staticmethod
# def get_excluded_dirs() -> list[str]:
# """
# Gets custom exclude directories from the database.
# """
# sql = "SELECT exclude_dirs FROM settings"
# with SQLiteManager(userdata_db=True) as cur:
# cur.execute(sql)
# dirs = cur.fetchall()
# return [_dir[0] for _dir in dirs]
# @staticmethod
# def get_settings() -> dict[str, Any]:
# pass
# @staticmethod
# def set_setting(key: str, value: Any):
# sql = f"UPDATE settings SET {key} = :value WHERE id = 1"
# if type(value) == bool:
# value = str(int(value))
# with SQLiteManager(userdata_db=True) as cur:
# cur.execute(sql, {"value": value})
# def load_settings():
# # s = SettingsSQLMethods.get_all_settings()
# config = UserConfig()
# try:
# db_separators: str = s[0]
# db_separators = db_separators.replace(" ", "")
# separators = db_separators.split(",")
# separators = set(separators)
# except IndexError:
# separators = {";", "/"}
# SessionVars.ARTIST_SEPARATORS = config.artistSeparators
# # boolean settings
# SessionVars.EXTRACT_FEAT = bool(s[1])
# SessionVars.REMOVE_PROD = bool(s[2])
# SessionVars.CLEAN_ALBUM_TITLE = bool(s[3])
# SessionVars.REMOVE_REMASTER_FROM_TRACK = bool(s[4])
# SessionVars.MERGE_ALBUM_VERSIONS = bool(s[5])
# SessionVars.SHOW_ALBUMS_AS_SINGLES = bool(s[6])
-135
View File
@@ -1,135 +0,0 @@
"""
Contains the SQLiteTrackMethods class which contains methods for
interacting with the tracks table.
"""
from collections import OrderedDict
from sqlite3 import Cursor
from app.db.sqlite.utils import tuple_to_track, tuples_to_tracks
from .utils import SQLiteManager
from app.utils.unicode import handle_unicode
class SQLiteTrackMethods:
"""
This class contains all methods for interacting with the tracks table.
"""
@classmethod
def insert_one_track(cls, track: dict, cur: Cursor):
"""
Inserts a single track into the database.
"""
sql = """INSERT OR REPLACE INTO tracks(
album,
albumartist,
albumhash,
artist,
bitrate,
copyright,
date,
disc,
duration,
filepath,
folder,
genre,
last_mod,
title,
track,
trackhash
) VALUES(:album, :albumartist, :albumhash, :artist, :bitrate, :copyright,
:date, :disc, :duration, :filepath, :folder, :genre, :last_mod, :title, :track, :trackhash)
"""
track = OrderedDict(sorted(track.items()))
track["artist"] = track["artists"]
track["albumartist"] = track["albumartists"]
del track["artists"]
del track["albumartists"]
try:
cur.execute(sql, track)
except UnicodeEncodeError:
# for each of the values in the track, call handle_unicode on it
for key, value in track.items():
track[key] = handle_unicode(value)
cur.execute(sql, track)
@classmethod
def insert_many_tracks(cls, tracks: list[dict]):
"""
Inserts a list of tracks into the database.
"""
with SQLiteManager() as cur:
for track in tracks:
cls.insert_one_track(track, cur)
@staticmethod
def get_all_tracks():
"""
Get all tracks from the database and return a generator of Track objects
or an empty list.
"""
with SQLiteManager() as cur:
cur.execute("SELECT * FROM tracks")
rows = cur.fetchall()
if rows is not None:
return tuples_to_tracks(rows)
return []
@staticmethod
def get_track_by_trackhash(trackhash: str):
"""
Gets a track using its trackhash. Returns a Track object or None.
"""
with SQLiteManager() as cur:
cur.execute("SELECT * FROM tracks WHERE trackhash=?", (trackhash,))
row = cur.fetchone()
if row is not None:
return tuple_to_track(row)
return None
@staticmethod
def get_track_by_albumhash(albumhash: str):
"""
Gets a track using its albumhash. Returns a Track object or None.
"""
with SQLiteManager() as cur:
cur.execute("SELECT * FROM tracks WHERE albumhash=?", (albumhash,))
row = cur.fetchone()
if row is not None:
return tuple_to_track(row)
return None
@staticmethod
def remove_tracks_by_filepaths(filepaths: str | set[str]):
"""
Removes a track or tracks from the database using their filepaths.
"""
if isinstance(filepaths, str):
filepaths = {filepaths}
with SQLiteManager() as cur:
for filepath in filepaths:
cur.execute("DELETE FROM tracks WHERE filepath=?", (filepath,))
@staticmethod
def remove_tracks_not_in_folders(folders: set[str]):
sql = "DELETE FROM tracks WHERE folder NOT IN ({})".format(
",".join("?" * len(folders))
)
with SQLiteManager() as cur:
cur.execute(sql, tuple(folders))
+18
View File
@@ -20,6 +20,7 @@ from app.db.utils import (
favorites_to_dataclass,
playlist_to_dataclass,
playlists_to_dataclasses,
plugin_to_dataclass,
plugin_to_dataclasses,
similar_artist_to_dataclass,
similar_artists_to_dataclass,
@@ -110,6 +111,23 @@ class PluginTable(Base):
def get_all(cls):
return plugin_to_dataclasses(cls.all())
@classmethod
def activate(cls, name: str, value: bool):
return cls.execute(
update(cls).where(cls.name == name).values(active=value), commit=True
)
@classmethod
def get_by_name(cls, name: str):
result = cls.execute(select(cls).where(cls.name == name))
return plugin_to_dataclass(result.fetchone())
@classmethod
def update_settings(cls, name: str, settings: dict[str, Any]):
return cls.execute(
update(cls).where(cls.name == name).values(settings=settings), commit=True
)
class SimilarArtistTable(Base):
__tablename__ = "notlastfm_similar_artists"
-4
View File
@@ -6,7 +6,6 @@ from requests import ConnectionError as RequestConnectionError
from requests import ReadTimeout
from app import settings
from app.db.sqlite.tracks import SQLiteTrackMethods
from app.lib.artistlib import CheckArtistImages
from app.lib.colorlib import ProcessAlbumColors, ProcessArtistColors
from app.lib.errors import PopulateCancelledError
@@ -22,9 +21,6 @@ from app.utils.progressbar import tqdm
from app.db.userdata import SimilarArtistTable
get_all_tracks = SQLiteTrackMethods.get_all_tracks
insert_many_tracks = SQLiteTrackMethods.insert_many_tracks
remove_tracks_by_filepaths = SQLiteTrackMethods.remove_tracks_by_filepaths
POPULATE_KEY = ""
-4
View File
@@ -6,10 +6,6 @@ import os
from app.lib.pydub.pydub import AudioSegment
from app.lib.pydub.pydub.silence import detect_leading_silence, detect_silence
from app.db.sqlite.tracks import SQLiteTrackMethods as trackdb
from app.store.tracks import TrackStore
from app.utils.progressbar import tqdm
from app.utils.threading import ThreadWithReturnValue
-4
View File
@@ -12,11 +12,7 @@ from watchdog.observers import Observer
from app import settings
from app.config import UserConfig
from app.db.sqlite.albumcolors import SQLiteAlbumMethods as aldb
# from app.db.sqlite.settings import SettingsSQLMethods as sdb
from app.db.sqlite.tracks import SQLiteManager
from app.db.sqlite.tracks import SQLiteTrackMethods as db
from app.lib.colorlib import process_color
from app.lib.taglib import extract_thumb, get_tags
from app.logger import log
+1 -4
View File
@@ -5,13 +5,10 @@ Reads and applies the latest database migrations.
"""
import inspect
import sys
from types import ModuleType
# from app.db.sqlite.migrations import MigrationManager
from app.db.metadata import MigrationTable
from app.logger import log
from app.migrations import v1_3_0, v1_4_9
from app.migrations.base import Migration
@@ -41,7 +38,7 @@ def apply_migrations():
migrations past that index are applied and the new length
is stored as the new migration index.
"""
modules = [v1_3_0, v1_4_9]
modules = []
migrations = [get_all_migrations(m) for m in modules]
# index = MigrationManager.get_index()
-306
View File
@@ -1,306 +0,0 @@
import json
import os
import shutil
import time
from collections import OrderedDict
from sqlite3 import OperationalError
from typing import Generator
from app.db.sqlite.utils import SQLiteManager
from app.migrations.base import Migration
from app.settings import Paths
from app.utils.decorators import coroutine
from app.utils.hashing import create_hash
# playlists table
# ---------------
# 0: id
# 1: banner_pos
# 2: has_gif
# 3: image
# 4: last_updated
# 5: name
# 6: trackhashes
class m1_RemoveSmallThumbnailFolder(Migration):
"""
Removes the small thumbnail folder.
Because we are added a new folder "original" in the same directory, and the small thumbs folder is used to check if an album's thumbnail is already extracted.
So we need to remove it, to force the app to extract thumbnails for all albums.
"""
@staticmethod
def migrate():
thumbs_sm_path = Paths.get_sm_thumb_path()
thumbs_lg_path = Paths.get_lg_thumb_path()
for path in [thumbs_sm_path, thumbs_lg_path]:
if os.path.exists(path):
shutil.rmtree(path)
for path in [thumbs_sm_path, thumbs_lg_path]:
os.makedirs(path, exist_ok=True)
class m2_RemovePlaylistArtistHashes(Migration):
"""
removes the artisthashes column from the playlists table.
"""
@staticmethod
def migrate():
# remove artisthashes column
sql = "ALTER TABLE playlists DROP COLUMN artisthashes"
with SQLiteManager(userdata_db=True) as cur:
try:
cur.execute(sql)
except OperationalError:
pass
cur.close()
class m3_AddSettingsToPlaylistTable(Migration):
"""
adds the settings column and removes the banner_pos and has_gif columns
to the playlists table.
"""
@staticmethod
def migrate():
select_playlists_sql = "SELECT * FROM playlists"
with SQLiteManager(userdata_db=True) as cur:
create_playlist_table_sql = """CREATE TABLE IF NOT EXISTS playlists (
id integer PRIMARY KEY,
image text,
last_updated text not null,
name text not null,
settings text,
trackhashes text
);"""
insert_playlist_sql = """INSERT INTO playlists(
image,
last_updated,
name,
settings,
trackhashes
) VALUES(:image, :last_updated, :name, :settings, :trackhashes)
"""
cur.execute(select_playlists_sql)
# load all playlists
playlists = cur.fetchall()
# drop old playlists table
cur.execute("DROP TABLE playlists")
# create new playlists table
cur.execute(create_playlist_table_sql)
def transform_playlists(pipeline: Generator, playlists: tuple):
for playlist in playlists:
# create dict that matches the new schema
p = {
"id": playlist[0],
"name": playlist[5],
"image": playlist[3],
"trackhashes": playlist[6],
"last_updated": playlist[4],
"settings": json.dumps(
{
"has_gif": False,
"banner_pos": playlist[1],
"square_img": False,
"pinned": False,
}
),
}
pipeline.send(p)
@coroutine
def insert_playlist():
while True:
playlist = yield
p = OrderedDict(sorted(playlist.items()))
cur.execute(insert_playlist_sql, p)
# insert playlists using a coroutine
# (my first coroutine)
pipeline = insert_playlist()
transform_playlists(pipeline, playlists)
pipeline.close()
cur.close()
class m4_AddLastUpdatedToTrackTable(Migration):
"""
adds the last modified column to the tracks table.
"""
@staticmethod
def migrate():
# add last_mod column and default to current timestamp
timestamp = time.time()
sql = f"ALTER TABLE tracks ADD COLUMN last_mod text not null DEFAULT '{timestamp}'"
with SQLiteManager() as cur:
try:
cur.execute(sql)
except OperationalError:
pass
cur.close()
class m5_MovePlaylistsAndFavoritesTo10BitHashes(Migration):
"""
moves the playlists and favorites to 10 bit hashes.
"""
@staticmethod
def migrate():
def get_track_data_by_hash(trackhash: str, tracks: list[tuple]) -> tuple:
for track in tracks:
# trackhash is the 15th bit hash
if track[15] == trackhash:
# return artist, album, title
return track[4], track[1], track[13]
def get_track_by_albumhash(albumhash: str, tracks: list[tuple]) -> tuple:
for track in tracks:
# albumhash is the 3rd bit hash
if track[3] == albumhash:
# return album, albumartist
return track[1], track[2]
_base = "SELECT * FROM"
fetch_playlists_sql = f"{_base} playlists"
fetch_tracks_sql = f"{_base} tracks"
update_playlist_hashes_sql = (
"UPDATE playlists SET trackhashes = :trackhashes WHERE id = :id"
)
fetch_favorites_sql = f"{_base} favorites"
update_fav_sql = "UPDATE favorites SET hash = :hash WHERE id = :id"
remove_fav_sql = "DELETE FROM favorites WHERE id = :id"
db_tracks = []
# read tracks from db
with SQLiteManager() as cur:
cur.execute(fetch_tracks_sql)
db_tracks.extend(cur.fetchall())
cur.close()
# update playlists
with SQLiteManager(userdata_db=True) as cur:
cur.execute(fetch_playlists_sql)
playlists = cur.fetchall()
# for each playlist
for p in playlists:
pid = p[0]
# load trackhashes
trackhashes: list[str] = json.loads(p[5])
for index, t in enumerate(trackhashes):
(artist, album, title) = get_track_data_by_hash(t, db_tracks)
# create new hash
new_hash = create_hash(artist, album, title, decode=True, limit=10)
trackhashes[index] = new_hash
# convert to string
trackhashes = json.dumps(trackhashes)
# save to db
cur.execute(
update_playlist_hashes_sql, {"trackhashes": trackhashes, "id": pid}
)
cur.close()
# update favorites
with SQLiteManager(userdata_db=True) as cur:
cur.execute(fetch_favorites_sql)
favorites = cur.fetchall()
# for each favorite
for f in favorites:
fid = f[0]
fhash: str = f[1]
ftype: str = f[2] # "track" || "album"
if ftype == "album":
(album, albumartist) = get_track_by_albumhash(fhash, db_tracks)
# create new hash
new_hash = create_hash(album, albumartist, decode=True, limit=10)
# save to db
cur.execute(update_fav_sql, {"hash": new_hash, "id": fid})
continue
if ftype == "track":
(artist, album, title) = get_track_data_by_hash(fhash, db_tracks)
# create new hash
new_hash = create_hash(artist, album, title, decode=True, limit=10)
# save to db
cur.execute(update_fav_sql, {"hash": new_hash, "id": fid})
continue
# remove favorites that are not track or album. ie. artists
cur.execute(remove_fav_sql, {"id": fid})
cur.close()
class m6_RemoveAllTracks(Migration):
"""
removes all tracks from the tracks table.
"""
@staticmethod
def migrate():
sql = "DELETE FROM tracks"
with SQLiteManager() as cur:
cur.execute(sql)
cur.close()
class m7_UpdateAppSettingsTable(Migration):
@staticmethod
def migrate():
drop_table_sql = "DROP TABLE settings"
create_table_sql = """
CREATE TABLE IF NOT EXISTS settings (
id integer PRIMARY KEY,
root_dirs text NOT NULL,
exclude_dirs text,
artist_separators text NOT NULL default '/,;',
extract_feat integer NOT NULL DEFAULT 1,
remove_prod integer NOT NULL DEFAULT 1,
clean_album_title integer NOT NULL DEFAULT 1,
remove_remaster integer NOT NULL DEFAULT 1,
merge_albums integer NOT NULL DEFAULT 0,
show_albums_as_singles integer NOT NULL DEFAULT 0
);
"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(drop_table_sql)
cur.execute(create_table_sql)
-405
View File
@@ -1,405 +0,0 @@
import os
import shutil
import sqlite3
from time import time
from app.db.sqlite.utils import SQLiteManager
from app.migrations.base import Migration
from app.settings import Paths
import hashlib
from unidecode import unidecode
from app.db.sqlite.tracks import SQLiteTrackMethods as tdb
from app.db.sqlite.playlists import SQLitePlaylistMethods as pdb
from app.db.sqlite.logger.tracks import SQLiteTrackLogger as ldb
from app.utils.hashing import create_hash
def create_sha256_hash(*args: str, decode=False, limit=10) -> str:
"""
This function creates a case-insensitive, non-alphanumeric chars ignoring hash from the given arguments.
Example use case:
- Creating computable IDs for duplicate artists. eg. Juice WRLD and Juice Wrld should have the same ID.
:param args: The arguments to hash.
:param decode: Whether to decode the arguments before hashing.
:param limit: The number of characters to return.
:return: The hash.
"""
def remove_non_alnum(token: str) -> str:
token = token.lower().strip().replace(" ", "")
t = "".join(t for t in token if t.isalnum())
if t == "":
return token
return t
str_ = "".join(remove_non_alnum(t) for t in args)
if decode:
str_ = unidecode(str_)
str_ = str_.encode("utf-8")
str_ = hashlib.sha256(str_).hexdigest()
return str_[-limit:]
def create_sha1_hash(*args: str, decode=False, limit=10) -> str:
"""
This function creates a case-insensitive, non-alphanumeric chars ignoring hash from the given arguments.
Example use case:
- Creating computable IDs for duplicate artists. eg. Juice WRLD and Juice Wrld should have the same ID.
:param args: The arguments to hash.
:param decode: Whether to decode the arguments before hashing.
:param limit: The number of characters to return.
:return: The hash.
"""
def remove_non_alnum(token: str) -> str:
token = token.lower().strip().replace(" ", "")
t = "".join(t for t in token if t.isalnum())
if t == "":
return token
return t
str_ = "".join(remove_non_alnum(t) for t in args)
if decode:
str_ = unidecode(str_)
str_ = str_.encode("utf-8")
str_ = hashlib.sha1(str_).hexdigest()
return (
str_[: limit // 2] + str_[-limit // 2 :]
if limit % 2 == 0
else str_[: limit // 2] + str_[-limit // 2 - 1 :]
)
class _1AddTimestampToFavoritesTable(Migration):
"""
Adds a timestamp column to the favorites table.
"""
@staticmethod
def migrate():
# INFO: add timestamp column with automatic current timestamp
sql = f"ALTER TABLE favorites ADD COLUMN timestamp INTEGER NOT NULL DEFAULT 0"
# INFO: execute the sql
with SQLiteManager(userdata_db=True) as cur:
table_exists = cur.execute(
"select count(*) from pragma_table_info('favorites') where name = 'timestamp'"
)
table_exists = table_exists.fetchone()
if table_exists[0] == 1:
return
# INFO: Add the timestamp column to the favorites table
timestamp = int(time())
cur.execute(sql)
cur.execute(f"UPDATE favorites SET timestamp = {timestamp}")
class _2DeleteOriginalThumbnails(Migration):
"""
Original thumbnails are too large and are not needed.
"""
# TODO: Implement this migration
@staticmethod
def migrate():
imgpath = Paths.get_thumbs_path()
og_imgpath = os.path.join(imgpath, "original")
if os.path.exists(og_imgpath):
shutil.rmtree(og_imgpath)
class _3MoveScrobbleToUserId1(Migration):
"""
Updates all track logs from user id = 0 to user id = 1
"""
@staticmethod
def migrate():
sql = """
UPDATE track_logger SET userid = 1 WHERE userid = 0;
ALTER TABLE track_logger RENAME TO _track_logger;
CREATE TABLE IF NOT EXISTS track_logger (
id integer PRIMARY KEY,
trackhash text NOT NULL,
duration integer NOT NULL,
timestamp integer NOT NULL,
source text,
userid integer NOT NULL DEFAULT 1,
constraint fk_users foreign key (userid) references users(id) on delete cascade
);
INSERT INTO track_logger SELECT * FROM _track_logger;
DROP TABLE _track_logger;
"""
# INFO: Move the scrobble table to the user id 1
with SQLiteManager(userdata_db=True) as cur:
cur.executescript(sql)
cur.close()
class _4AddUserIdToFavoritesTable(Migration):
"""
Adds a userid column to the favorites table.
"""
@staticmethod
def migrate():
# check if userid column exists
exists_sql = (
"select count(*) from pragma_table_info('favorites') where name = 'userid'"
)
sql = """
ALTER TABLE favorites ADD userid INTEGER NOT NULL DEFAULT 1;
ALTER TABLE favorites RENAME TO _favorites;
CREATE TABLE IF NOT EXISTS favorites (
id integer PRIMARY KEY,
hash text not null,
type text not null,
timestamp integer not null default 0,
userid integer not null,
constraint fk_users foreign key (userid) references users(id) on delete cascade
);
INSERT INTO favorites SELECT * FROM _favorites;
DROP TABLE _favorites;
"""
with SQLiteManager(userdata_db=True) as cur:
data = cur.execute(exists_sql)
data = data.fetchone()
if data[0] == 1:
return # INFO: column already exists
cur.executescript(sql)
class _5AddUserIdToPlaylistsTable(Migration):
"""
Adds a userid column to the playlists table.
"""
@staticmethod
def migrate():
# check if userid column exists
exists_sql = (
"select count(*) from pragma_table_info('playlists') where name = 'userid'"
)
# Add the userid column to the playlists table
# Rename the old table to _playlists
# Create a new playlists table with the userid column
# Then, copy the data from the old table to the new table
# Finally, drop the old table
sql = """
ALTER TABLE playlists ADD userid INTEGER NOT NULL DEFAULT 1;
ALTER TABLE playlists RENAME TO _playlists;
CREATE TABLE IF NOT EXISTS playlists (
id integer PRIMARY KEY,
image text,
last_updated text not null,
name text not null,
settings text,
trackhashes text,
userid integer not null,
constraint fk_users foreign key (userid) references users(id) on delete cascade
);
INSERT INTO playlists SELECT * FROM _playlists;
DROP TABLE _playlists;
"""
with SQLiteManager(userdata_db=True) as cur:
# INFO: Check if the column already exists
data = cur.execute(exists_sql)
data = data.fetchone()
# INFO: If the column already exists, return
if data[0] == 1:
return # INFO: column already exists
# INFO: Execute the sql
cur.executescript(sql)
class _6MoveHashesToSha1(Migration):
"""
Moves the 10 bit item hashes from sha256 to sha1 which is
faster and more lenient on less powerful devices.
Thanks to [@tcsenpai](https:github.com/tcsenpai) for the contribution.
"""
# enabled: bool = False
# pass
# INFO: Apparentlly, every single table is affected by this migration.
# NOTE: Use generators to avoid memory issues.
@classmethod
def port_track(cls, trackhash: str):
# get the track with the track hash
track = tdb.get_track_by_trackhash(trackhash)
if track is None:
return
title = track.og_title
if track.trackhash != trackhash:
# raise ValueError("Track hash mismatch")
print("Track hash mismatch")
title = track.title
else:
print("Porting track: ", track.title)
# return the new hash
finalhash = create_sha1_hash(
", ".join(a.name for a in track.artists),
track.og_album,
title,
)
if finalhash != create_hash(
", ".join(a.name for a in track.artists), track.og_album, title
):
raise ValueError("Hash mismatch")
@classmethod
def port_album(cls, albumhash: str):
# get the first track with the album hash
track = tdb.get_track_by_albumhash(albumhash)
if track is None:
return
# return the new hash
return create_sha1_hash(
track.og_album,
", ".join(a.name for a in track.albumartists),
)
@classmethod
def port_artist(cls, artisthash: str):
# find all tracks with the artist hash
tracks = [t for t in cls.tracks if artisthash in t.artist_hashes]
if len(tracks) == 0:
return
# find the artist name
artist = [
a.name
for a in tracks[0].artists
if create_sha256_hash(a.name, decode=True) == artisthash
][0]
# return the new hash
return create_sha1_hash(artist, decode=True)
@classmethod
def migrate_favorites(cls):
with SQLiteManager(userdata_db=True) as cur:
# read all favorites
data = cur.execute("SELECT * FROM favorites")
data = data.fetchall()
for track in cls.tracks:
track.artist_hashes = "-".join(
[create_sha256_hash(a.name, decode=True) for a in track.artists]
)
for entry in data:
# hash is the 2nd column in the table
hash = entry[1]
# entry type is the 3rd column in the table
if entry[2] == "track":
newhash = cls.port_track(hash)
if newhash:
cur.execute(
f"UPDATE favorites SET hash = '{newhash}' WHERE hash = '{hash}' AND type = 'track'"
)
elif entry[2] == "album":
newhash = cls.port_album(hash)
if newhash:
cur.execute(
f"UPDATE favorites SET hash = '{newhash}' WHERE hash = '{hash}' AND type = 'album'"
)
elif entry[2] == "artist":
newhash = cls.port_artist(hash)
if newhash:
cur.execute(
f"UPDATE favorites SET hash = '{newhash}' WHERE hash = '{hash}' AND type = 'artist'"
)
@classmethod
def migrate_playlists(cls):
playlists = pdb.get_all_playlists()
for playlist in playlists:
# remove previous hashes
to_remove = [
{"trackhash": trackhash, "index": index}
for index, trackhash in enumerate(playlist.trackhashes)
]
pdb.remove_tracks_from_playlist(playlist.id, to_remove)
# add new hashes
newhashes = [
cls.port_track(trackhash) for trackhash in playlist.trackhashes
]
newhashes = [h for h in newhashes if h is not None]
pdb.add_tracks_to_playlist(playlist.id, newhashes)
print("Ported playlist: ", playlist.name)
print("Total tracks: ", len(newhashes))
@classmethod
def migrate_scrobble(cls):
# read all logs
logs = ldb.get_all()
with SQLiteManager(userdata_db=True) as cur:
# for each log, port the hash
for log in logs:
newhash = cls.port_track(log[1])
if newhash:
cur.execute(
f"UPDATE track_logger SET trackhash = '{newhash}' WHERE trackhash = '{log[1]}'"
)
@classmethod
def migrate(cls):
cls.tracks = list(tdb.get_all_tracks())
cls.migrate_favorites()
# cls.migrate_playlists()
# cls.migrate_scrobble()
+3 -5
View File
@@ -7,7 +7,7 @@ from typing import List, Optional
import requests
from unidecode import unidecode
from app.db.sqlite.plugins import PluginsMethods
from app.db.userdata import PluginTable
from app.plugins import Plugin, plugin_method
from app.settings import Paths
@@ -190,15 +190,13 @@ class LyricsProvider(LRCProvider):
class Lyrics(Plugin):
def __init__(self) -> None:
plugin = PluginsMethods.get_plugin_by_name("lyrics_finder")
plugin = PluginTable.get_by_name("lyrics_finder")
if not plugin:
return
name = plugin.name
description = plugin.description
super().__init__(name, description)
super().__init__(name, "Musixmatch lyrics finder")
self.provider = LyricsProvider()
-1
View File
@@ -14,7 +14,6 @@ def hash_password(password: str) -> str:
:return: The hashed password.
"""
return hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), UserConfig().serverId.encode("utf-8"), 100000
).hex()
View File
-59
View File
@@ -1,59 +0,0 @@
import json
import sqlite3
import os
from app.db.sqlite.artistcolors import SQLiteArtistMethods
from app.db.sqlite.queries import CREATE_APPDB_TABLES
from app.db.sqlite.utils import SQLiteManager
db_path = "test.db"
def test_sqlite_manager():
with SQLiteManager(test_db_path=db_path) as cur:
for query in CREATE_APPDB_TABLES.split(";"):
cur.execute(query)
cur.execute(
"INSERT INTO tracks (album, albumartist, albumhash, artist, bitrate, copyright, date, disc, duration, filepath, folder, genre, last_mod, title, track, trackhash) VALUES ('Dummy Album', 'Dummy Album Artist', 'dummyalbumhash', 'Dummy Artist', 320, 'Dummy Copyright', 1630454400, 1, 180, '/path/to/dummy/file.mp3', '/path/to/dummy/folder', 'Dummy Genre', 1630454400.5, 'Dummy Title', 1, 'dummytrackhash');"
)
cur.execute("SELECT * FROM tracks")
result = cur.fetchone()
assert result[7] == 1630454400
# Test using a connection
with SQLiteManager(conn=sqlite3.connect(db_path)) as cur:
cur.execute("SELECT * FROM tracks")
result = cur.fetchone()
assert result[7] == 1630454400
def test_insert_one_artist():
color1 = "rgb(0, 0, 0)"
color2 = "rgb(255, 255, 255)"
with SQLiteManager(test_db_path=db_path) as cur:
SQLiteArtistMethods.insert_one_artist(cur, "artisthash1", [color1, color2])
cur.execute("SELECT * FROM artists WHERE artisthash=?", ("artisthash1",))
result = cur.fetchone()
assert result[1:] == ("artisthash1", json.dumps([color1, color2]), None)
def test_get_all_artists():
with SQLiteManager(test_db_path=db_path) as cur:
artists = SQLiteArtistMethods.get_all_artists(cur)
# assert that that the generator is not empty and that for each tuple has 4 elements
try:
while True:
artist = next(artists)
assert len(artist) == 4
except StopIteration:
pass
def test_remove_test_db():
os.remove(db_path)
-34
View File
@@ -1,34 +0,0 @@
# from hypothesis import given
from app.utils.parsers import parse_feat_from_title
def test_extract_featured_artists_from_title():
test_titles = [
"Own it (Featuring Ed Sheeran & Stormzy)",
"Own it (Featuring Ed Sheeran and Stormzy)",
"Autograph (On my line)(Feat. Lil Peep)(Deluxe)",
"Why so sad? (with Juice Wrld, Lil Peep)",
"Why so sad? (with Juice Wrld/Lil Peep)",
"Simmer (with Burna Boy)",
"Simmer (without Burna Boy)",
]
results = [
["Ed Sheeran", "Stormzy"],
["Ed Sheeran", "Stormzy"],
["Lil Peep"],
["Juice Wrld", "Lil Peep"],
["Juice Wrld", "Lil Peep"],
["Burna Boy"],
[],
]
for title, expected in zip(test_titles, results):
assert parse_feat_from_title(title)[0] == expected
# === HYPOTHESIS GHOSTWRITER TESTS ===
# @given(__dir=st.text(), full=st.booleans())
# def test_fuzz_run_fast_scandir(__dir: str, full) -> None:
# app.utils.run_fast_scandir(_dir=__dir, full=full)