Merge branch 'custom-root-dirs'

This commit is contained in:
geoffrey45
2023-02-15 17:38:22 +03:00
49 changed files with 2107 additions and 1175 deletions
+1 -2
View File
@@ -16,10 +16,9 @@ __pycache__
.hypothesis
sqllib.py
encoderx.py
tests
.pytest_cache
# pyinstaller files
dist
build
client
client
-44
View File
@@ -1,44 +0,0 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['manage.py'],
pathex=[],
binaries=[],
datas=[('assets', 'assets'), ('client', 'client'), ('pyinstaller.config.ini', '.')],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='swing',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)
+11 -10
View File
@@ -5,8 +5,8 @@ This module combines all API blueprints into a single Flask app instance.
from flask import Flask
from flask_cors import CORS
from app.api import album, artist, favorites, folder, playlist, search, track
from app.imgserver import imgbp as imgserver
from app.api import (album, artist, favorites, folder, imgserver, playlist,
search, settings, track)
def create_api():
@@ -18,13 +18,14 @@ def create_api():
with app.app_context():
app.register_blueprint(album.albumbp)
app.register_blueprint(artist.artistbp)
app.register_blueprint(track.trackbp)
app.register_blueprint(search.searchbp)
app.register_blueprint(folder.folderbp)
app.register_blueprint(playlist.playlistbp)
app.register_blueprint(favorites.favbp)
app.register_blueprint(imgserver)
app.register_blueprint(album.api)
app.register_blueprint(artist.api)
app.register_blueprint(track.api)
app.register_blueprint(search.api)
app.register_blueprint(folder.api)
app.register_blueprint(playlist.api)
app.register_blueprint(favorites.api)
app.register_blueprint(imgserver.api)
app.register_blueprint(settings.api)
return app
+8 -17
View File
@@ -12,15 +12,14 @@ from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.store import Store
from app.models import FavType, Track
get_album_by_id = adb.get_album_by_id
get_albums_by_albumartist = adb.get_albums_by_albumartist
check_is_fav = favdb.check_is_favorite
albumbp = Blueprint("album", __name__, url_prefix="")
api = Blueprint("album", __name__, url_prefix="")
@albumbp.route("/album", methods=["POST"])
@api.route("/album", methods=["POST"])
def get_album():
"""Returns all the tracks in the given album."""
@@ -62,23 +61,16 @@ def get_album():
tracks = utils.remove_duplicates(tracks)
album.count = len(tracks)
for track in tracks:
if track.date != "Unknown":
album.date = track.date
break
album.get_date_from_tracks(tracks)
try:
album.duration = sum((t.duration for t in tracks))
except AttributeError:
album.duration = 0
if (
album.count == 1
and tracks[0].title == album.title
# and tracks[0].track == 1
# and tracks[0].disc == 1
):
album.check_is_single(tracks)
if album.is_single:
album.is_single = True
else:
album.check_type()
@@ -88,7 +80,7 @@ def get_album():
return {"tracks": tracks, "info": album}
@albumbp.route("/album/<albumhash>/tracks", methods=["GET"])
@api.route("/album/<albumhash>/tracks", methods=["GET"])
def get_album_tracks(albumhash: str):
"""
Returns all the tracks in the given album.
@@ -105,7 +97,7 @@ def get_album_tracks(albumhash: str):
return {"tracks": tracks}
@albumbp.route("/album/from-artist", methods=["POST"])
@api.route("/album/from-artist", methods=["POST"])
def get_artist_albums():
data = request.get_json()
@@ -130,7 +122,6 @@ def get_artist_albums():
return {"data": albums}
# @album_bp.route("/album/bio", methods=["POST"])
# def get_album_bio():
# """Returns the album bio for the given album."""
+15 -11
View File
@@ -5,12 +5,12 @@ from collections import deque
from flask import Blueprint, request
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.store import Store
from app.models import Album, FavType, Track
from app.utils import remove_duplicates
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
artistbp = Blueprint("artist", __name__, url_prefix="/")
api = Blueprint("artist", __name__, url_prefix="/")
class CacheEntry:
@@ -19,7 +19,7 @@ class CacheEntry:
"""
def __init__(
self, artisthash: str, albumhashes: set[str], tracks: list[Track]
self, artisthash: str, albumhashes: set[str], tracks: list[Track]
) -> None:
self.albums: list[Album] = []
self.tracks: list[Track] = []
@@ -39,7 +39,7 @@ class ArtistsCache:
Holds artist page cache.
"""
artists: deque[CacheEntry] = deque(maxlen=6)
artists: deque[CacheEntry] = deque(maxlen=1)
@classmethod
def get_albums_by_artisthash(cls, artisthash: str):
@@ -48,9 +48,9 @@ class ArtistsCache:
"""
for (index, albums) in enumerate(cls.artists):
if albums.artisthash == artisthash:
return (albums.albums, index)
return albums.albums, index
return ([], -1)
return [], -1
@classmethod
def albums_cached(cls, artisthash: str) -> bool:
@@ -131,6 +131,7 @@ class ArtistsCache:
album_tracks = Store.get_tracks_by_albumhash(album.albumhash)
album_tracks = remove_duplicates(album_tracks)
album.get_date_from_tracks(album_tracks)
album.check_is_single(album_tracks)
entry.type_checked = True
@@ -156,7 +157,7 @@ def add_albums_to_cache(artisthash: str):
# =======================================================
@artistbp.route("/artist/<artisthash>", methods=["GET"])
@api.route("/artist/<artisthash>", methods=["GET"])
def get_artist(artisthash: str):
"""
Get artist data.
@@ -203,7 +204,7 @@ def get_artist(artisthash: str):
return {"artist": artist, "tracks": tracks[:limit]}
@artistbp.route("/artist/<artisthash>/albums", methods=["GET"])
@api.route("/artist/<artisthash>/albums", methods=["GET"])
def get_artist_albums(artisthash: str):
limit = request.args.get("limit")
@@ -214,7 +215,6 @@ def get_artist_albums(artisthash: str):
limit = int(limit)
all_albums = []
is_cached = ArtistsCache.albums_cached(artisthash)
if not is_cached:
@@ -242,6 +242,10 @@ def get_artist_albums(artisthash: str):
albums = list(albums)
albums = remove_EPs_and_singles(albums)
compilations = [a for a in albums if a.is_compilation]
for c in compilations:
albums.remove(c)
appearances = filter(lambda a: artisthash not in a.albumartisthash, all_albums)
appearances = list(appearances)
@@ -258,10 +262,11 @@ def get_artist_albums(artisthash: str):
"singles": singles[:limit],
"eps": eps[:limit],
"appearances": appearances[:limit],
"compilations": compilations[:limit]
}
@artistbp.route("/artist/<artisthash>/tracks", methods=["GET"])
@api.route("/artist/<artisthash>/tracks", methods=["GET"])
def get_artist_tracks(artisthash: str):
"""
Returns all artists by a given artist.
@@ -275,7 +280,6 @@ def get_artist_tracks(artisthash: str):
# return {"albums": albums[:limit]}
# @artist_bp.route("/artist/<artist>")
# @cache.cached()
# def get_artist_data(artist: str):
+22 -18
View File
@@ -1,17 +1,18 @@
from flask import Blueprint, request
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.store import Store
from app.models import FavType
from app.utils import UseBisection
favbp = Blueprint("favorite", __name__, url_prefix="/")
api = Blueprint("favorite", __name__, url_prefix="/")
def remove_none(items: list):
return [i for i in items if i is not None]
@favbp.route("/favorite/add", methods=["POST"])
@api.route("/favorite/add", methods=["POST"])
def add_favorite():
"""
Adds a favorite to the database.
@@ -32,7 +33,7 @@ def add_favorite():
return {"msg": "Added to favorites"}
@favbp.route("/favorite/remove", methods=["POST"])
@api.route("/favorite/remove", methods=["POST"])
def remove_favorite():
"""
Removes a favorite from the database.
@@ -53,7 +54,7 @@ def remove_favorite():
return {"msg": "Removed from favorites"}
@favbp.route("/albums/favorite")
@api.route("/albums/favorite")
def get_favorite_albums():
limit = request.args.get("limit")
@@ -77,7 +78,7 @@ def get_favorite_albums():
return {"albums": fav_albums[:limit]}
@favbp.route("/tracks/favorite")
@api.route("/tracks/favorite")
def get_favorite_tracks():
limit = request.args.get("limit")
@@ -100,7 +101,7 @@ def get_favorite_tracks():
return {"tracks": tracks[:limit]}
@favbp.route("/artists/favorite")
@api.route("/artists/favorite")
def get_favorite_artists():
limit = request.args.get("limit")
@@ -124,7 +125,7 @@ def get_favorite_artists():
return {"artists": artists[:limit]}
@favbp.route("/favorites")
@api.route("/favorites")
def get_all_favorites():
"""
Returns all the favorites in the database.
@@ -149,6 +150,8 @@ def get_all_favorites():
favs = favdb.get_all()
favs.reverse()
favs = [fav for fav in favs if fav[1] != ""]
tracks = []
albums = []
artists = []
@@ -161,21 +164,22 @@ def get_all_favorites():
):
break
if fav[2] == FavType.track:
tracks.append(fav[1])
elif fav[2] == FavType.album:
albums.append(fav[1])
elif fav[2] == FavType.artist:
artists.append(fav[1])
if not len(tracks) >= track_limit:
if fav[2] == FavType.track:
tracks.append(fav[1])
if not len(albums) >= album_limit:
if fav[2] == FavType.album:
albums.append(fav[1])
if not len(artists) >= artist_limit:
if fav[2] == FavType.artist:
artists.append(fav[1])
src_tracks = sorted(Store.tracks, key=lambda x: x.trackhash)
src_albums = sorted(Store.albums, key=lambda x: x.albumhash)
src_artists = sorted(Store.artists, key=lambda x: x.artisthash)
tracks = tracks[:track_limit]
albums = albums[:album_limit]
artists = artists[:artist_limit]
tracks = UseBisection(src_tracks, "trackhash", tracks)()
albums = UseBisection(src_albums, "albumhash", albums)()
artists = UseBisection(src_artists, "artisthash", artists)()
@@ -191,7 +195,7 @@ def get_all_favorites():
}
@favbp.route("/favorites/check")
@api.route("/favorites/check")
def check_favorite():
"""
Checks if a favorite exists in the database.
+101 -6
View File
@@ -1,28 +1,66 @@
"""
Contains all the folder routes.
"""
import os
import psutil
from pathlib import Path
from flask import Blueprint, request
from app import settings
from app.lib.folderslib import GetFilesAndDirs
from app.db.sqlite.settings import SettingsSQLMethods as db
from app.models import Folder
from app.utils import create_folder_hash, is_windows, win_replace_slash
folderbp = Blueprint("folder", __name__, url_prefix="/")
api = Blueprint("folder", __name__, url_prefix="/")
@folderbp.route("/folder", methods=["POST"])
@api.route("/folder", methods=["POST"])
def get_folder_tree():
"""
Returns a list of all the folders and tracks in the given folder.
"""
data = request.get_json()
req_dir = "$home"
if data is not None:
req_dir: str = data["folder"]
else:
req_dir = settings.HOME_DIR
try:
req_dir: str = data["folder"]
except KeyError:
req_dir = "$home"
root_dirs = db.get_root_dirs()
try:
if req_dir == "$home" and root_dirs[0] == "$home":
req_dir = settings.USER_HOME_DIR
except IndexError:
pass
if req_dir == "$home":
req_dir = settings.HOME_DIR
folders = [Path(f) for f in root_dirs]
return {
"folders": [
Folder(
name=f.name if f.name != "" else str(f).replace("\\", "/"),
path=win_replace_slash(str(f)),
has_tracks=True,
is_sym=f.is_symlink(),
path_hash=create_folder_hash(*f.parts[1:]),
)
for f in folders
],
"tracks": [],
}
if is_windows():
# Trailing slash needed when drive letters are passed,
# Remember, the trailing slash is removed in the client.
req_dir = req_dir + "/"
else:
req_dir = "/" + req_dir + "/" if not req_dir.startswith("/") else req_dir + "/"
tracks, folders = GetFilesAndDirs(req_dir)()
@@ -30,3 +68,60 @@ def get_folder_tree():
"tracks": tracks,
"folders": sorted(folders, key=lambda i: i.name),
}
def get_all_drives(is_win: bool = False):
"""
Returns a list of all the drives on a Windows machine.
"""
drives = psutil.disk_partitions(all=False)
drives = [d.mountpoint for d in drives]
if is_win:
drives = [win_replace_slash(d) for d in drives]
else:
remove = ["/boot", "/boot/efi", "/tmp"]
drives = [d for d in drives if d not in remove]
return drives
@api.route("/folder/dir-browser", methods=["POST"])
def list_folders():
"""
Returns a list of all the folders in the given folder.
"""
data = request.get_json()
is_win = is_windows()
try:
req_dir: str = data["folder"]
except KeyError:
req_dir = "$root"
if req_dir == "$root":
# req_dir = settings.USER_HOME_DIR
# if is_win:
return {
"folders": [{"name": d, "path": d} for d in get_all_drives(is_win=is_win)]
}
if is_win:
req_dir = req_dir + "/"
else:
req_dir = "/" + req_dir + "/"
req_dir = str(Path(req_dir).resolve())
try:
entries = os.scandir(req_dir)
except PermissionError:
return {"folders": []}
dirs = [e.name for e in entries if e.is_dir() and not e.name.startswith(".")]
dirs = [
{"name": d, "path": win_replace_slash(os.path.join(req_dir, d))} for d in dirs
]
return {
"folders": sorted(dirs, key=lambda i: i["name"]),
}
@@ -1,14 +1,13 @@
import os
from pathlib import Path
from flask import Blueprint, request, send_from_directory
from flask import Blueprint, send_from_directory
imgbp = Blueprint("imgserver", __name__, url_prefix="/img")
from app.settings import APP_DIR
api = Blueprint("imgserver", __name__, url_prefix="/img")
SUPPORTED_IMAGES = (".jpg", ".png", ".webp", ".jpeg")
HOME = os.path.expanduser("~")
APP_DIR = Path(HOME) / ".swing"
APP_DIR = Path(APP_DIR)
IMG_PATH = APP_DIR / "images"
ASSETS_PATH = APP_DIR / "assets"
@@ -23,7 +22,7 @@ ARTIST_SM_PATH = ARTIST_PATH / "small"
PLAYLIST_PATH = IMG_PATH / "playlists"
@imgbp.route("/")
@api.route("/")
def hello():
return "<h1>Image Server</h1>"
@@ -37,7 +36,7 @@ def send_fallback_img(filename: str = "default.webp"):
return send_from_directory(ASSETS_PATH, filename)
@imgbp.route("/t/<imgpath>")
@api.route("/t/<imgpath>")
def send_lg_thumbnail(imgpath: str):
fpath = LG_THUMB_PATH / imgpath
@@ -47,7 +46,7 @@ def send_lg_thumbnail(imgpath: str):
return send_fallback_img()
@imgbp.route("/t/s/<imgpath>")
@api.route("/t/s/<imgpath>")
def send_sm_thumbnail(imgpath: str):
fpath = SM_THUMB_PATH / imgpath
@@ -57,7 +56,7 @@ def send_sm_thumbnail(imgpath: str):
return send_fallback_img()
@imgbp.route("/a/<imgpath>")
@api.route("/a/<imgpath>")
def send_lg_artist_image(imgpath: str):
fpath = ARTIST_LG_PATH / imgpath
@@ -67,7 +66,7 @@ def send_lg_artist_image(imgpath: str):
return send_fallback_img("artist.webp")
@imgbp.route("/a/s/<imgpath>")
@api.route("/a/s/<imgpath>")
def send_sm_artist_image(imgpath: str):
fpath = ARTIST_SM_PATH / imgpath
@@ -77,7 +76,7 @@ def send_sm_artist_image(imgpath: str):
return send_fallback_img("artist.webp")
@imgbp.route("/p/<imgpath>")
@api.route("/p/<imgpath>")
def send_playlist_image(imgpath: str):
fpath = PLAYLIST_PATH / imgpath
+9 -10
View File
@@ -13,7 +13,7 @@ from app.db.store import Store
from app.lib import playlistlib
from app.utils import create_new_date, remove_duplicates
playlistbp = Blueprint("playlist", __name__, url_prefix="/")
api = Blueprint("playlist", __name__, url_prefix="/")
PL = SQLitePlaylistMethods
@@ -30,7 +30,7 @@ delete_playlist = PL.delete_playlist
# get_tracks_by_trackhashes = SQLiteTrackMethods.get_tracks_by_trackhashes
@playlistbp.route("/playlists", methods=["GET"])
@api.route("/playlists", methods=["GET"])
def send_all_playlists():
"""
Gets all the playlists.
@@ -46,7 +46,7 @@ def send_all_playlists():
return {"data": playlists}
@playlistbp.route("/playlist/new", methods=["POST"])
@api.route("/playlist/new", methods=["POST"])
def create_playlist():
"""
Creates a new playlist. Accepts POST method with a JSON body.
@@ -79,7 +79,7 @@ def create_playlist():
return {"playlist": playlist}, 201
@playlistbp.route("/playlist/<playlist_id>/add", methods=["POST"])
@api.route("/playlist/<playlist_id>/add", methods=["POST"])
def add_track_to_playlist(playlist_id: str):
"""
Takes a playlist ID and a track hash, and adds the track to the playlist
@@ -97,12 +97,12 @@ def add_track_to_playlist(playlist_id: str):
return {"error": "Track already exists in playlist"}, 409
add_artist_to_playlist(int(playlist_id), trackhash)
PL.update_last_updated(playlist_id)
PL.update_last_updated(int(playlist_id))
return {"msg": "Done"}, 200
@playlistbp.route("/playlist/<playlistid>")
@api.route("/playlist/<playlistid>")
def get_playlist(playlistid: str):
"""
Gets a playlist by id, and if it exists, it gets all the tracks in the playlist and returns them.
@@ -123,7 +123,7 @@ def get_playlist(playlistid: str):
return {"info": playlist, "tracks": tracks}
@playlistbp.route("/playlist/<playlistid>/update", methods=["PUT"])
@api.route("/playlist/<playlistid>/update", methods=["PUT"])
def update_playlist_info(playlistid: str):
if playlistid is None:
return {"error": "Playlist ID not provided"}, 400
@@ -166,7 +166,6 @@ def update_playlist_info(playlistid: str):
return {"error": "Failed: Invalid image"}, 400
p_tuple = (*playlist.values(),)
print("banner pos:", playlist["banner_pos"])
update_playlist(int(playlistid), playlist)
@@ -188,7 +187,7 @@ def update_playlist_info(playlistid: str):
# return {"data": artists}
@playlistbp.route("/playlist/delete", methods=["POST"])
@api.route("/playlist/delete", methods=["POST"])
def remove_playlist():
"""
Deletes a playlist by ID.
@@ -209,7 +208,7 @@ def remove_playlist():
return {"msg": "Done"}, 200
@playlistbp.route("/playlist/<pid>/set-image-pos", methods=["POST"])
@api.route("/playlist/<pid>/set-image-pos", methods=["POST"])
def update_image_position(pid: int):
data = request.get_json()
message = {"msg": "No data provided"}
+9 -9
View File
@@ -8,7 +8,7 @@ from app import models, utils
from app.db.store import Store
from app.lib import searchlib
searchbp = Blueprint("search", __name__, url_prefix="/")
api = Blueprint("search", __name__, url_prefix="/")
SEARCH_COUNT = 12
@@ -95,7 +95,7 @@ class DoSearch:
# self.search_playlists()
@searchbp.route("/search/tracks", methods=["GET"])
@api.route("/search/tracks", methods=["GET"])
def search_tracks():
"""
Searches for tracks that match the search query.
@@ -113,7 +113,7 @@ def search_tracks():
}
@searchbp.route("/search/albums", methods=["GET"])
@api.route("/search/albums", methods=["GET"])
def search_albums():
"""
Searches for albums.
@@ -131,7 +131,7 @@ def search_albums():
}
@searchbp.route("/search/artists", methods=["GET"])
@api.route("/search/artists", methods=["GET"])
def search_artists():
"""
Searches for artists.
@@ -167,7 +167,7 @@ def search_artists():
# }
@searchbp.route("/search/top", methods=["GET"])
@api.route("/search/top", methods=["GET"])
def get_top_results():
"""
Returns the top results for the search query.
@@ -188,7 +188,7 @@ def get_top_results():
}
@searchbp.route("/search/loadmore")
@api.route("/search/loadmore")
def search_load_more():
"""
Returns more songs, albums or artists from a search query.
@@ -199,20 +199,20 @@ def search_load_more():
if s_type == "tracks":
t = SearchResults.tracks
return {
"tracks": t[index : index + SEARCH_COUNT],
"tracks": t[index: index + SEARCH_COUNT],
"more": len(t) > index + SEARCH_COUNT,
}
elif s_type == "albums":
a = SearchResults.albums
return {
"albums": a[index : index + SEARCH_COUNT],
"albums": a[index: index + SEARCH_COUNT],
"more": len(a) > index + SEARCH_COUNT,
}
elif s_type == "artists":
a = SearchResults.artists
return {
"artists": a[index : index + SEARCH_COUNT],
"artists": a[index: index + SEARCH_COUNT],
"more": len(a) > index + SEARCH_COUNT,
}
+126
View File
@@ -0,0 +1,126 @@
from flask import Blueprint, request
from app import settings
from app.logger import log
from app.lib import populate
from app.db.store import Store
from app.utils import background, get_random_str
from app.lib.watchdogg import Watcher as WatchDog
from app.db.sqlite.settings import SettingsSQLMethods as sdb
api = Blueprint("settings", __name__, url_prefix="/")
def get_child_dirs(parent: str, children: list[str]):
"""Returns child directories in a list, given a parent directory"""
return [_dir for _dir in children if _dir.startswith(parent) and _dir != parent]
def reload_everything():
"""
Reloads all stores using the current database items
"""
Store.load_all_tracks()
Store.process_folders()
Store.load_albums()
Store.load_artists()
@background
def rebuild_store(db_dirs: list[str]):
"""
Restarts the watchdog and rebuilds the music library.
"""
log.info("Rebuilding library...")
Store.remove_tracks_by_dir_except(db_dirs)
reload_everything()
key = get_random_str()
try:
populate.Populate(key=key)
except populate.PopulateCancelledError:
reload_everything()
return
WatchDog().restart()
log.info("Rebuilding library... ✅")
def finalize(new_: list[str], removed_: list[str], db_dirs_: list[str]):
"""
Params:
new_: will be added to the database
removed_: will be removed from the database
db_dirs_: will be used to remove tracks that
are outside these directories from the database and store.
"""
sdb.remove_root_dirs(removed_)
sdb.add_root_dirs(new_)
rebuild_store(db_dirs_)
@api.route("/settings/add-root-dirs", methods=["POST"])
def add_root_dirs():
"""
Add custom root directories to the database.
"""
msg = {"msg": "Failed! No directories were given."}
data = request.get_json()
if data is None:
return msg, 400
try:
new_dirs: list[str] = data["new_dirs"]
removed_dirs: list[str] = data["removed"]
except KeyError:
return msg, 400
db_dirs = sdb.get_root_dirs()
_h = "$home"
db_home = any([d == _h for d in db_dirs]) # if $home is in db
incoming_home = any([d == _h for d in new_dirs]) # if $home is in incoming
# handle $home case
if db_home and incoming_home:
return {"msg": "Not changed!"}
if db_home or incoming_home:
sdb.remove_root_dirs(db_dirs)
if incoming_home:
finalize([_h], [], [settings.USER_HOME_DIR])
return {"root_dirs": [_h]}
# ---
for _dir in new_dirs:
children = get_child_dirs(_dir, db_dirs)
removed_dirs.extend(children)
for _dir in removed_dirs:
try:
db_dirs.remove(_dir)
except ValueError:
pass
db_dirs.extend(new_dirs)
db_dirs = [dir_ for dir_ in db_dirs if dir_ != _h]
finalize(new_dirs, removed_dirs, db_dirs)
return {"root_dirs": db_dirs}
@api.route("/settings/get-root-dirs", methods=["GET"])
def get_root_dirs():
"""
Get custom root directories from the database.
"""
dirs = sdb.get_root_dirs()
return {"dirs": dirs}
+3 -2
View File
@@ -2,12 +2,13 @@
Contains all the track routes.
"""
from flask import Blueprint, send_file
from app.db.store import Store
trackbp = Blueprint("track", __name__, url_prefix="/")
api = Blueprint("track", __name__, url_prefix="/")
@trackbp.route("/file/<trackhash>")
@api.route("/file/<trackhash>")
def send_track_file(trackhash: str):
"""
Returns an audio file that matches the passed id to the client.
+15 -11
View File
@@ -5,11 +5,26 @@ from .utils import SQLiteManager
class SQLiteFavoriteMethods:
"""THis class contains methods for interacting with the favorites table."""
@classmethod
def check_is_favorite(cls, itemhash: str, fav_type: str):
"""
Checks if an item is favorited.
"""
sql = """SELECT * FROM favorites WHERE hash = ? AND type = ?"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (itemhash, fav_type))
items = cur.fetchall()
return len(items) > 0
@classmethod
def insert_one_favorite(cls, fav_type: str, fav_hash: str):
"""
Inserts a single favorite into the database.
"""
# try to find the favorite in the database, if it exists, don't insert it
if cls.check_is_favorite(fav_hash, fav_type):
return
sql = """INSERT INTO favorites(type, hash) VALUES(?,?)"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (fav_type, fav_hash))
@@ -64,14 +79,3 @@ class SQLiteFavoriteMethods:
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (fav_hash, fav_type))
@classmethod
def check_is_favorite(cls, itemhash: str, fav_type: str):
"""
Checks if an item is favorited.
"""
sql = """SELECT * FROM favorites WHERE hash = ? AND type = ?"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (itemhash, fav_type))
items = cur.fetchall()
return len(items) > 0
+64
View File
@@ -0,0 +1,64 @@
"""
Reads and saves the latest database migrations version.
"""
from app.db.sqlite.utils import SQLiteManager
class MigrationManager:
all_get_sql = "SELECT * FROM migrations"
pre_init_set_sql = "UPDATE migrations SET pre_init_version = ? WHERE id = 1"
post_init_set_sql = "UPDATE migrations SET post_init_version = ? WHERE id = 1"
@classmethod
def get_preinit_version(cls) -> int:
"""
Returns the latest userdata pre-init database version.
"""
with SQLiteManager() as cur:
cur.execute(cls.all_get_sql)
return int(cur.fetchone()[1])
@classmethod
def get_maindb_postinit_version(cls) -> int:
"""
Returns the latest maindb post-init database version.
"""
with SQLiteManager() as cur:
cur.execute(cls.all_get_sql)
return int(cur.fetchone()[2])
@classmethod
def get_userdatadb_postinit_version(cls) -> int:
"""
Returns the latest userdata post-init database version.
"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(cls.all_get_sql)
return cur.fetchone()[2]
# 👇 Setters 👇
@classmethod
def set_preinit_version(cls, version: int):
"""
Sets the userdata pre-init database version.
"""
with SQLiteManager() as cur:
cur.execute(cls.pre_init_set_sql, (version,))
@classmethod
def set_maindb_postinit_version(cls, version: int):
"""
Sets the maindb post-init database version.
"""
with SQLiteManager() as cur:
cur.execute(cls.post_init_set_sql, (version,))
@classmethod
def set_userdatadb_postinit_version(cls, version: int):
"""
Sets the userdata post-init database version.
"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(cls.post_init_set_sql, (version,))
+3 -17
View File
@@ -90,23 +90,9 @@ class SQLitePlaylistMethods:
@staticmethod
def add_item_to_json_list(playlist_id: int, field: str, items: list[str]):
"""
Adds a string item to a json dumped list using a playlist id and field name. Takes the playlist ID, a field name, an item to add to the field, and an error to raise if the item is already in the field.
Parameters
----------
playlist_id : int
The ID of the playlist to add the item to.
field : str
The field in the database that you want to add the item to.
item : str
The item to add to the list.
error : Exception
The error to raise if the item is already in the list.
Returns
-------
A list of strings.
Adds a string item to a json dumped list using a playlist id and field name.
Takes the playlist ID, a field name,
an item to add to the field, and an error to raise if the item is already in the field.
"""
sql = f"SELECT {field} FROM playlists WHERE id = ?"
+18
View File
@@ -20,6 +20,12 @@ CREATE TABLE IF NOT EXISTS favorites (
hash text not null,
type text not null
);
CREATE TABLE IF NOT EXISTS settings (
id integer PRIMARY KEY,
root_dirs text NOT NULL,
exclude_dirs text
)
"""
CREATE_APPDB_TABLES = """
@@ -63,3 +69,15 @@ CREATE TABLE IF NOT EXISTS folders (
trackcount integer NOT NULL
);
"""
CREATE_MIGRATIONS_TABLE = """
CREATE TABLE IF NOT EXISTS migrations (
id integer PRIMARY KEY,
pre_init_version integer NOT NULL DEFAULT 0,
post_init_version integer NOT NULL DEFAULT 0
);
INSERT INTO migrations (pre_init_version, post_init_version)
SELECT 0, 0
WHERE NOT EXISTS (SELECT 1 FROM migrations);
"""
+88
View File
@@ -0,0 +1,88 @@
from app.db.sqlite.utils import SQLiteManager
from app.utils import win_replace_slash
class SettingsSQLMethods:
"""
Methods for interacting with the settings table.
"""
@staticmethod
def get_root_dirs() -> list[str]:
"""
Gets custom root directories from the database.
"""
sql = "SELECT root_dirs FROM settings"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql)
dirs = cur.fetchall()
dirs = [dir[0] for dir in dirs]
return [win_replace_slash(d) for d in dirs]
@staticmethod
def add_root_dirs(dirs: list[str]):
"""
Add custom root directories to the database.
"""
sql = "INSERT INTO settings (root_dirs) VALUES (?)"
existing_dirs = SettingsSQLMethods.get_root_dirs()
dirs = [dir for dir in dirs if dir not in existing_dirs]
if len(dirs) == 0:
return
with SQLiteManager(userdata_db=True) as cur:
for _dir in dirs:
cur.execute(sql, (_dir,))
@staticmethod
def remove_root_dirs(dirs: list[str]):
"""
Remove custom root directories from the database.
"""
sql = "DELETE FROM settings WHERE root_dirs = ?"
with SQLiteManager(userdata_db=True) as cur:
for _dir in dirs:
cur.execute(sql, (_dir,))
@staticmethod
def add_excluded_dirs(dirs: list[str]):
"""
Add custom exclude directories to the database.
"""
sql = "INSERT INTO settings (exclude_dirs) VALUES (?)"
with SQLiteManager(userdata_db=True) as cur:
cur.executemany(sql, dirs)
@staticmethod
def remove_excluded_dirs(dirs: list[str]):
"""
Remove custom exclude directories from the database.
"""
sql = "DELETE FROM settings WHERE exclude_dirs = ?"
with SQLiteManager(userdata_db=True) as cur:
cur.executemany(sql, dirs)
@staticmethod
def get_excluded_dirs() -> list[str]:
"""
Gets custom exclude directories from the database.
"""
sql = "SELECT exclude_dirs FROM settings"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql)
dirs = cur.fetchall()
return [dir[0] for dir in dirs]
+7 -11
View File
@@ -61,6 +61,8 @@ class SQLiteTrackMethods:
),
)
# TODO: rewrite the above code using an ordered dict and destructuring
@classmethod
def insert_many_tracks(cls, tracks: list[dict]):
"""
@@ -128,15 +130,9 @@ class SQLiteTrackMethods:
cur.execute("DELETE FROM tracks WHERE filepath=?", (filepath,))
@staticmethod
def track_exists(filepath: str):
"""
Checks if a track exists in the database using its filepath.
"""
def remove_tracks_by_folders(folders: set[str]):
sql = "DELETE FROM tracks WHERE folder = ?"
with SQLiteManager() as cur:
cur.execute("SELECT * FROM tracks WHERE filepath=?", (filepath,))
row = cur.fetchone()
if row is not None:
return True
return False
for folder in folders:
cur.execute(sql, (folder,))
+19 -4
View File
@@ -4,6 +4,7 @@ Helper functions for use with the SQLite database.
import sqlite3
from sqlite3 import Connection, Cursor
import time
from app.models import Album, Playlist, Track
from app.settings import APP_DB_PATH, USERDATA_DB_PATH
@@ -82,12 +83,26 @@ class SQLiteManager:
if self.userdata_db:
db_path = USERDATA_DB_PATH
self.conn = sqlite3.connect(db_path)
self.conn = sqlite3.connect(
db_path,
timeout=15,
)
return self.conn.cursor()
def __exit__(self, exc_type, exc_value, exc_traceback):
if self.conn:
self.conn.commit()
trial_count = 0
if self.CLOSE_CONN:
self.conn.close()
while trial_count < 10:
try:
self.conn.commit()
if self.CLOSE_CONN:
self.conn.close()
return
except sqlite3.OperationalError:
trial_count += 1
time.sleep(3)
self.conn.close()
+32 -7
View File
@@ -17,6 +17,7 @@ from app.utils import (
create_folder_hash,
get_all_artists,
remove_duplicates,
win_replace_slash,
)
@@ -40,7 +41,7 @@ class Store:
cls.tracks = list(tdb.get_all_tracks())
fav_hashes = favdb.get_fav_tracks()
fav_hashes = [t[1] for t in fav_hashes]
fav_hashes = " ".join([t[1] for t in fav_hashes])
for track in tqdm(cls.tracks, desc="Loading tracks"):
if track.trackhash in fav_hashes:
@@ -88,6 +89,17 @@ class Store:
cls.tracks.remove(track)
break
@classmethod
def remove_tracks_by_dir_except(cls, dirs: list[str]):
"""Removes all tracks not in the root directories."""
to_remove = set()
for track in cls.tracks:
if not track.folder.startswith(tuple(dirs)):
to_remove.add(track.folder)
tdb.remove_tracks_by_folders(to_remove)
@classmethod
def count_tracks_by_hash(cls, trackhash: str) -> int:
"""
@@ -163,7 +175,7 @@ class Store:
return Folder(
name=folder.name,
path=str(folder),
path=win_replace_slash(str(folder)),
is_sym=folder.is_symlink(),
has_tracks=True,
path_hash=create_folder_hash(*folder.parts[1:]),
@@ -197,6 +209,8 @@ class Store:
"""
Creates a list of folders from the tracks in the store.
"""
cls.folders.clear()
all_folders = [track.folder for track in cls.tracks]
all_folders = set(all_folders)
@@ -205,9 +219,18 @@ class Store:
]
all_folders = [Path(f) for f in all_folders]
all_folders = [f for f in all_folders if f.exists()]
# all_folders = [f for f in all_folders if f.exists()]
for path in tqdm(all_folders, desc="Processing folders"):
valid_folders = []
for folder in all_folders:
try:
if folder.exists():
valid_folders.append(folder)
except PermissionError:
pass
for path in tqdm(valid_folders, desc="Processing folders"):
folder = cls.create_folder(str(path))
cls.folders.append(folder)
@@ -277,6 +300,8 @@ class Store:
Loads all albums from the database into the store.
"""
cls.albums = []
albumhashes = set(t.albumhash for t in cls.tracks)
for albumhash in tqdm(albumhashes, desc="Loading albums"):
@@ -291,9 +316,9 @@ class Store:
albumhash = album[1]
colors = json.loads(album[2])
for al in cls.albums:
if al.albumhash == albumhash:
al.set_colors(colors)
for _al in cls.albums:
if _al.albumhash == albumhash:
_al.set_colors(colors)
break
@classmethod
+6 -7
View File
@@ -7,8 +7,8 @@ from requests import ReadTimeout
from app import utils
from app.lib.artistlib import CheckArtistImages
from app.lib.colorlib import ProcessAlbumColors, ProcessArtistColors
from app.lib.populate import Populate, ProcessTrackThumbnails
from app.lib.colorlib import ProcessArtistColors
from app.lib.populate import Populate, PopulateCancelledError
from app.lib.trackslib import validate_tracks
from app.logger import log
@@ -23,11 +23,10 @@ def run_periodic_checks():
validate_tracks()
while True:
Populate()
ProcessTrackThumbnails()
ProcessAlbumColors()
ProcessArtistColors()
try:
Populate(key=utils.get_random_str())
except PopulateCancelledError:
pass
if utils.Ping()():
try:
+1 -1
View File
@@ -82,7 +82,7 @@ class CheckArtistImages:
"""
Checks if an artist image exists and downloads it if not.
:param artistname: The artist name
:param artist: The artist name
"""
img_path = Path(settings.ARTIST_IMG_SM_PATH) / f"{artist.artisthash}.webp"
+19 -16
View File
@@ -38,19 +38,19 @@ class ProcessAlbumColors:
"""
def __init__(self) -> None:
albums = [a for a in Store.albums if len(a.colors) == 0]
with SQLiteManager() as cur:
for album in tqdm(Store.albums, desc="Processing album colors"):
if len(album.colors) == 0:
colors = self.process_color(album)
for album in tqdm(albums, desc="Processing missing album colors"):
colors = self.process_color(album)
if colors is None:
continue
if colors is None:
continue
album.set_colors(colors)
album.set_colors(colors)
color_str = json.dumps(colors)
db.insert_one_album(cur, album.albumhash, color_str)
color_str = json.dumps(colors)
db.insert_one_album(cur, album.albumhash, color_str)
@staticmethod
def process_color(album: Album):
@@ -69,14 +69,10 @@ class ProcessArtistColors:
"""
def __init__(self) -> None:
all_artists = Store.artists
all_artists = [a for a in Store.artists if len(a.colors) == 0]
if all_artists is None:
return
for artist in tqdm(all_artists, desc="Processing artist colors"):
if len(artist.colors) == 0:
self.process_color(artist)
for artist in tqdm(all_artists, desc="Processing missing artist colors"):
self.process_color(artist)
@staticmethod
def process_color(artist: Artist):
@@ -91,4 +87,11 @@ class ProcessArtistColors:
adb.insert_one_artist(artisthash=artist.artisthash, colors=colors)
Store.map_artist_color((0, artist.artisthash, json.dumps(colors)))
# TODO: Load album and artist colors into the store.
# TODO: If item color is in db, get it, assign it to the item and continue.
# - Format all colors in the format: rgb(123, 123, 123)
# - Each digit should be 3 digits long.
# - Format all db colors into a master string of the format "-itemhash:colorhash-"
# - Find the item hash using index() and get the color using the index + number, where number
# is the length of the rgb string + 1
# - Assign the color to the item and continue.
# - If the color is not in the db, extract it and add it to the db.
+21 -9
View File
@@ -1,10 +1,11 @@
import os
import pathlib
from concurrent.futures import ThreadPoolExecutor
from app.db.store import Store
from app.models import Folder, Track
from app.settings import SUPPORTED_FILES
from app.logger import log
from app.utils import win_replace_slash
class GetFilesAndDirs:
@@ -19,7 +20,7 @@ class GetFilesAndDirs:
try:
entries = os.scandir(self.path)
except FileNotFoundError:
return ([], [])
return [], []
dirs, files = [], []
@@ -27,14 +28,25 @@ class GetFilesAndDirs:
ext = os.path.splitext(entry.name)[1].lower()
if entry.is_dir() and not entry.name.startswith("."):
dirs.append(entry.path)
dirs.append(win_replace_slash(entry.path))
elif entry.is_file() and ext in SUPPORTED_FILES:
files.append(entry.path)
files.append(win_replace_slash(entry.path))
# sort files by modified time
files.sort(
key=lambda f: os.path.getmtime(f) # pylint: disable=unnecessary-lambda
)
files_ = []
for file in files:
try:
files_.append(
{
"path": file,
"time": os.path.getmtime(file),
}
)
except OSError as e:
log.error(e)
files_.sort(key=lambda f: f["time"])
files = [f["path"] for f in files_]
tracks = Store.get_tracks_by_filepaths(files)
@@ -44,4 +56,4 @@ class GetFilesAndDirs:
folders = filter(lambda f: f.has_tracks, folders)
return (tracks, folders) # type: ignore
return tracks, folders # type: ignore
+48 -5
View File
@@ -3,7 +3,10 @@ from tqdm import tqdm
from app import settings
from app.db.sqlite.tracks import SQLiteTrackMethods
from app.db.sqlite.settings import SettingsSQLMethods as sdb
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.store import Store
from app.lib.colorlib import ProcessAlbumColors, ProcessArtistColors
from app.lib.taglib import extract_thumb, get_tags
from app.logger import log
@@ -13,6 +16,12 @@ from app.utils import run_fast_scandir
get_all_tracks = SQLiteTrackMethods.get_all_tracks
insert_many_tracks = SQLiteTrackMethods.insert_many_tracks
POPULATE_KEY = ""
class PopulateCancelledError(Exception):
pass
class Populate:
"""
@@ -22,12 +31,34 @@ class Populate:
also checks if the album art exists in the image path, if not tries to extract it.
"""
def __init__(self) -> None:
def __init__(self, key: str) -> None:
global POPULATE_KEY
POPULATE_KEY = key
tracks = get_all_tracks()
tracks = list(tracks)
files = run_fast_scandir(settings.HOME_DIR, full=True)[1]
dirs_to_scan = sdb.get_root_dirs()
if len(dirs_to_scan) == 0:
log.warning(
(
"The root directory is not configured. "
+ "Open the app in your webbrowser to configure."
)
)
return
try:
if dirs_to_scan[0] == "$home":
dirs_to_scan = [settings.USER_HOME_DIR]
except IndexError:
pass
files = []
for _dir in dirs_to_scan:
files.extend(run_fast_scandir(_dir, full=True)[1])
untagged = self.filter_untagged(tracks, files)
@@ -35,7 +66,12 @@ class Populate:
log.info("All clear, no unread files.")
return
self.tag_untagged(untagged)
self.tag_untagged(untagged, key)
ProcessTrackThumbnails()
ProcessAlbumColors()
ProcessArtistColors()
@staticmethod
def filter_untagged(tracks: list[Track], files: list[str]):
@@ -43,17 +79,24 @@ class Populate:
return set(files) - set(tagged_files)
@staticmethod
def tag_untagged(untagged: set[str]):
def tag_untagged(untagged: set[str], key: str):
log.info("Found %s new tracks", len(untagged))
tagged_tracks: list[dict] = []
tagged_count = 0
fav_tracks = favdb.get_fav_tracks()
fav_tracks = "-".join([t[1] for t in fav_tracks])
for file in tqdm(untagged, desc="Reading files"):
if POPULATE_KEY != key:
raise PopulateCancelledError("Populate key changed")
tags = get_tags(file)
if tags is not None:
tagged_tracks.append(tags)
track = Track(**tags)
track.is_favorite = track.trackhash in fav_tracks
Store.add_track(track)
Store.add_folder(track.folder)
@@ -97,4 +140,4 @@ class ProcessTrackThumbnails:
)
)
results = [r for r in results]
list(results)
+1 -1
View File
@@ -43,7 +43,7 @@ class SearchTracks:
Gets all songs with a given title.
"""
tracks = [track.title for track in self.tracks]
tracks = [track.og_title for track in self.tracks]
results = process.extract(
self.query,
tracks,
+28 -9
View File
@@ -1,13 +1,17 @@
import os
import datetime
import os
from io import BytesIO
from tinytag import TinyTag
from PIL import Image, UnidentifiedImageError
from tinytag import TinyTag
from app import settings
from app.utils import create_hash
from app.utils import (
create_hash,
parse_artist_from_filename,
parse_title_from_filename,
win_replace_slash,
)
def parse_album_art(filepath: str):
@@ -81,7 +85,7 @@ def get_tags(filepath: str):
try:
tags = TinyTag.get(filepath)
except: # pylint: disable=bare-except
except: # noqa: E722
return None
no_albumartist: bool = (tags.albumartist == "") or (tags.albumartist is None)
@@ -97,9 +101,24 @@ def get_tags(filepath: str):
for tag in to_filename:
p = getattr(tags, tag)
if p == "" or p is None:
setattr(tags, tag, filename)
maybe = parse_title_from_filename(filename)
setattr(tags, tag, maybe)
to_check = ["album", "artist", "year", "albumartist"]
parse = ["artist", "albumartist"]
for tag in parse:
p = getattr(tags, tag)
if p == "" or p is None:
maybe = parse_artist_from_filename(filename)
if maybe:
setattr(tags, tag, ", ".join(maybe))
else:
setattr(tags, tag, "Unknown")
# TODO: Move parsing title, album and artist to startup. (Maybe!)
to_check = ["album", "year", "albumartist"]
for prop in to_check:
p = getattr(tags, prop)
if (p is None) or (p == ""):
@@ -127,10 +146,10 @@ def get_tags(filepath: str):
tags.albumhash = create_hash(tags.album, tags.albumartist)
tags.trackhash = create_hash(tags.artist, tags.album, tags.title)
tags.image = f"{tags.albumhash}.webp"
tags.folder = os.path.dirname(filepath)
tags.folder = win_replace_slash(os.path.dirname(filepath))
tags.date = extract_date(tags.year)
tags.filepath = filepath
tags.filepath = win_replace_slash(filepath)
tags.filetype = filetype
tags = tags.__dict__
+136 -24
View File
@@ -2,17 +2,22 @@
This library contains the classes and functions related to the watchdog file watcher.
"""
import os
import sqlite3
import time
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
from app.db.sqlite.tracks import SQLiteManager
from app.db.sqlite.tracks import SQLiteTrackMethods as db
from app.logger import log
from app.db.store import Store
from app.lib.taglib import get_tags
from app.logger import log
from app.models import Artist, Track
from app import settings
from app.db.sqlite.tracks import SQLiteManager
from app.db.sqlite.tracks import SQLiteTrackMethods as db
from app.db.sqlite.settings import SettingsSQLMethods as sdb
class Watcher:
@@ -20,39 +25,100 @@ class Watcher:
Contains the methods for initializing and starting watchdog.
"""
home_dir = os.path.expanduser("~")
dirs = [home_dir]
observers: list[Observer] = []
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = Handler()
"""
Starts watchers for each dir in root_dirs
"""
for dir_ in self.dirs:
trials = 0
while trials < 10:
try:
dirs = sdb.get_root_dirs()
dirs = [rf"{d}" for d in dirs]
dir_map = [
{"original": d, "realpath": os.path.realpath(d)} for d in dirs
]
break
except sqlite3.OperationalError:
trials += 1
time.sleep(1)
else:
log.error(
"WatchDogError: Failed to start Watchdog. Waiting for database timed out!"
)
return
if len(dirs) == 0:
log.warning(
"WatchDogInfo: No root directories configured. Watchdog not started."
)
return
dir_map = [d for d in dir_map if d["realpath"] != d["original"]]
# if len(dirs) > 0 and dirs[0] == "$home":
# dirs = [settings.USER_HOME_DIR]
if any([d == "$home" for d in dirs]):
dirs = [settings.USER_HOME_DIR]
event_handler = Handler(root_dirs=dirs, dir_map=dir_map)
for _dir in dirs:
exists = os.path.exists(_dir)
if not exists:
log.error("WatchdogError: Directory not found: %s", _dir)
for _dir in dirs:
self.observer.schedule(
event_handler, os.path.realpath(dir_), recursive=True
event_handler, os.path.realpath(_dir), recursive=True
)
self.observers.append(self.observer)
try:
self.observer.start()
except OSError:
log.error("Could not start watchdog.")
log.info("Started watchdog")
except (FileNotFoundError, PermissionError):
log.error(
"WatchdogError: Failed to start watchdog, root directories could not be resolved."
)
return
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
for obsv in self.observers:
obsv.unschedule_all()
obsv.stop()
self.stop_all()
for obsv in self.observers:
obsv.join()
def stop_all(self):
"""
Unschedules and stops all existing watchers.
"""
log.info("Stopping all watchdog observers")
for obsv in self.observers:
obsv.unschedule_all()
obsv.stop()
def restart(self):
"""
Stops all existing watchers, refetches root_dirs from the db
and restarts the watchers.
"""
log.info("🔃 Restarting watchdog")
self.stop_all()
self.run()
def add_track(filepath: str) -> None:
"""
@@ -118,28 +184,46 @@ def remove_track(filepath: str) -> None:
class Handler(PatternMatchingEventHandler):
files_to_process = []
files_to_process_windows = []
root_dirs = []
dir_map = []
def __init__(self, root_dirs: list[str], dir_map: dict[str:str]):
self.root_dirs = root_dirs
self.dir_map = dir_map
patterns = [f"*{f}" for f in settings.SUPPORTED_FILES]
def __init__(self):
log.info("✅ started watchdog")
PatternMatchingEventHandler.__init__(
self,
patterns=["*.flac", "*.mp3"],
patterns=patterns,
ignore_directories=True,
case_sensitive=False,
)
def get_abs_path(self, path: str):
"""
Convert a realpath to a path relative to the matching root directory.
"""
for d in self.dir_map:
if d["realpath"] in path:
return path.replace(d["realpath"], d["original"])
return path
def on_created(self, event):
"""
Fired when a supported file is created.
"""
self.files_to_process.append(event.src_path)
self.files_to_process_windows.append(event.src_path)
def on_deleted(self, event):
"""
Fired when a delete event occurs on a supported file.
"""
remove_track(event.src_path)
path = self.get_abs_path(event.src_path)
remove_track(path)
def on_moved(self, event):
"""
@@ -148,25 +232,53 @@ class Handler(PatternMatchingEventHandler):
trash = "share/Trash"
if trash in event.dest_path:
remove_track(event.src_path)
path = self.get_abs_path(event.src_path)
remove_track(path)
elif trash in event.src_path:
add_track(event.dest_path)
path = self.get_abs_path(event.dest_path)
add_track(path)
elif trash not in event.dest_path and trash not in event.src_path:
add_track(event.dest_path)
remove_track(event.src_path)
dest_path = self.get_abs_path(event.dest_path)
src_path = self.get_abs_path(event.src_path)
add_track(dest_path)
remove_track(src_path)
def on_closed(self, event):
"""
Fired when a created file is closed.
NOT FIRED IN WINDOWS
"""
try:
self.files_to_process.remove(event.src_path)
if os.path.getsize(event.src_path) > 0:
add_track(event.src_path)
path = self.get_abs_path(event.src_path)
add_track(path)
except ValueError:
pass
def on_modified(self, event):
# this event handler is triggered twice on windows
# for copy events. We need to test how this behaves in
# Linux.
# watcher = Watcher()
if event.src_path not in self.files_to_process_windows:
return
file_size = -1
while file_size != os.path.getsize(event.src_path):
file_size = os.path.getsize(event.src_path)
time.sleep(0.1)
try:
os.rename(event.src_path, event.src_path)
path = self.get_abs_path(event.src_path)
remove_track(path)
add_track(path)
self.files_to_process_windows.remove(event.src_path)
except OSError:
# File is locked, skipping
pass
+3 -4
View File
@@ -10,15 +10,15 @@ class CustomFormatter(logging.Formatter):
Custom log formatter
"""
grey = "\x1b[38;20m"
grey = "\033[92m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
red = "\033[41m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
# format = (
# "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
# )
format_ = "[%(asctime)s]@%(name)s%(message)s"
format_ = "%(message)s"
FORMATS = {
logging.DEBUG: grey + format_ + reset,
@@ -45,5 +45,4 @@ handler.setLevel(logging.DEBUG)
handler.setFormatter(CustomFormatter())
log.addHandler(handler)
# copied from: https://stackoverflow.com/a/56944256:
+44
View File
@@ -0,0 +1,44 @@
"""
Migrations module.
Reads and applies the latest database migrations.
PLEASE NOTE: OLDER MIGRATIONS CAN NEVER BE DELETED.
ONLY MODIFY OLD MIGRATIONS FOR BUG FIXES OR ENHANCEMENTS ONLY
[TRY NOT TO MODIFY BEHAVIOR, UNLESS YOU KNOW WHAT YOU'RE DOING].
"""
from app.db.sqlite.migrations import MigrationManager
from app.logger import log
from .main import main_db_migrations
from .userdata import userdata_db_migrations
def apply_migrations():
"""
Applies the latest database migrations.
"""
userdb_version = MigrationManager.get_userdatadb_postinit_version()
maindb_version = MigrationManager.get_maindb_postinit_version()
for migration in main_db_migrations:
if migration.version > maindb_version:
log.info("Running new MAIN-DB post-init migration: %s", migration.name)
migration.migrate()
for migration in userdata_db_migrations:
if migration.version > userdb_version:
log.info("Running new USERDATA-DB post-init migration: %s", migration.name)
migration.migrate()
def set_postinit_migration_versions():
"""
Sets the post-init migration versions.
"""
# TODO: Don't forget to remove the zeros below when you add a valid migration 👇.
MigrationManager.set_maindb_postinit_version(0)
MigrationManager.set_userdatadb_postinit_version(0)
+38
View File
@@ -0,0 +1,38 @@
"""
Pre-init migrations are executed before the database is created.
Useful when you need to move files or folders before the database is created.
PLEASE NOTE: OLDER MIGRATIONS CAN NEVER BE DELETED.
ONLY MODIFY OLD MIGRATIONS FOR BUG FIXES OR ENHANCEMENTS ONLY.
[TRY NOT TO MODIFY BEHAVIOR, UNLESS YOU KNOW WHAT YOU'RE DOING].
"""
from sqlite3 import OperationalError
from app.db.sqlite.migrations import MigrationManager
from app.logger import log
from .move_to_xdg_folder import MoveToXdgFolder
all_preinits = [MoveToXdgFolder]
def run_preinit_migrations():
"""
Runs all pre-init migrations.
"""
try:
userdb_version = MigrationManager.get_preinit_version()
except (OperationalError):
userdb_version = 0
for migration in all_preinits:
if migration.version > userdb_version:
log.warn("Running new pre-init migration: %s", migration.name)
migration.migrate()
def set_preinit_migration_versions():
"""
Sets the migration versions.
"""
MigrationManager.set_preinit_version(all_preinits[-1].version)
@@ -0,0 +1,49 @@
"""
This migration handles moving the config folder to the XDG standard location.
It also handles moving the userdata and the downloaded artist images to the new location.
"""
import os
import shutil
from app.settings import APP_DIR, USER_HOME_DIR
from app.logger import log
class MoveToXdgFolder:
version = 1
name = "MoveToXdgFolder"
@staticmethod
def migrate():
old_config_dir = os.path.join(USER_HOME_DIR, ".swing")
new_config_dir = APP_DIR
if not os.path.exists(old_config_dir):
log.info("No old config folder found. Skipping migration.")
return
log.info("Found old config folder: %s", old_config_dir)
old_imgs_dir = os.path.join(old_config_dir, "images")
# move images to new location
if os.path.exists(old_imgs_dir):
shutil.copytree(
old_imgs_dir,
os.path.join(new_config_dir, "images"),
copy_function=shutil.copy2,
dirs_exist_ok=True,
)
log.warn("Moved artist images to: %s", new_config_dir)
# move userdata.db to new location
userdata_db = os.path.join(old_config_dir, "userdata.db")
if os.path.exists(userdata_db):
shutil.copy2(userdata_db, new_config_dir)
log.warn("Moved userdata.db to: %s", new_config_dir)
log.warn("Migration complete. ✅")
# swing.db is not moved because the new code fixes bugs which require
# the whole database to be recreated anyway. (ie. the bug which caused duplicate album and artist color entries)
+10
View File
@@ -0,0 +1,10 @@
"""
Migrations for the main database.
PLEASE NOTE: OLDER MIGRATIONS CAN NEVER BE DELETED.
ONLY MODIFY OLD MIGRATIONS FOR BUG FIXES OR ENHANCEMENTS ONLY
[TRY NOT TO MODIFY BEHAVIOR, UNLESS YOU KNOW WHAT YOU'RE DOING].
"""
main_db_migrations = []
+10
View File
@@ -0,0 +1,10 @@
"""
Migrations for the userdata database.
PLEASE NOTE: OLDER MIGRATIONS CAN NEVER BE DELETED.
ONLY MODIFY OLD MIGRATIONS FOR BUG FIXES OR ENHANCEMENTS ONLY
[TRY NOT TO MODIFY BEHAVIOR, UNLESS YOU KNOW WHAT YOU'RE DOING].
"""
userdata_db_migrations = []
+62 -14
View File
@@ -5,7 +5,7 @@ import dataclasses
import json
from dataclasses import dataclass
from app import utils
from app import utils, settings
@dataclass(slots=True)
@@ -55,23 +55,37 @@ class Track:
image: str = ""
artist_hashes: list[str] = dataclasses.field(default_factory=list)
is_favorite: bool = False
og_title: str = ""
def __post_init__(self):
self.og_title = self.title
if self.artist is not None:
artist_str = str(self.artist).split(", ")
self.artist_hashes = [utils.create_hash(a, decode=True) for a in artist_str]
artists = utils.split_artists(self.artist)
self.artist = [Artist(a) for a in artist_str]
if settings.EXTRACT_FEAT:
featured, new_title = utils.parse_feat_from_title(self.title)
original_lower = "-".join([a.lower() for a in artists])
artists.extend([a for a in featured if a.lower() not in original_lower])
albumartists = str(self.albumartist).split(", ")
self.title = new_title
if self.og_title == self.album:
self.album = new_title
self.artist_hashes = [utils.create_hash(a, decode=True) for a in artists]
self.artist = [Artist(a) for a in artists]
albumartists = utils.split_artists(self.albumartist)
self.albumartist = [Artist(a) for a in albumartists]
self.filetype = self.filepath.rsplit(".", maxsplit=1)[-1]
self.image = self.albumhash + ".webp"
if self.genre is not None:
self.genre = str(self.genre).replace("/", ", ")
self.genre = str(self.genre).lower().split(", ")
self.genre = str(self.genre).replace("/", ",").replace(";", ",")
self.genre = str(self.genre).lower().split(",")
self.genre = [g.strip() for g in self.genre]
@dataclass
@@ -96,6 +110,7 @@ class Album:
is_single: bool = False
is_EP: bool = False
is_favorite: bool = False
is_live: bool = False
genres: list[str] = dataclasses.field(default_factory=list)
def __post_init__(self):
@@ -113,11 +128,15 @@ class Album:
if self.is_soundtrack:
return
self.is_live = self.check_is_live_album()
if self.is_live:
return
self.is_compilation = self.check_is_compilation()
if self.is_compilation:
return
self.is_EP = self.check_is_EP()
self.is_EP = self.check_is_ep()
def check_is_soundtrack(self) -> bool:
"""
@@ -137,9 +156,30 @@ class Album:
artists = [a.name for a in self.albumartists] # type: ignore
artists = "".join(artists).lower()
return "various artists" in artists
if "various artists" in artists:
return True
def check_is_EP(self) -> bool:
substrings = ["the essential", "best of", "greatest hits", "#1 hits", "number ones", "super hits",
"ultimate collection"]
for substring in substrings:
if substring in self.title.lower():
return True
return False
def check_is_live_album(self):
"""
Checks if the album is a live album.
"""
keywords = ["live from", "live at", "live in"]
for keyword in keywords:
if keyword in self.title.lower():
return True
return False
def check_is_ep(self) -> bool:
"""
Checks if the album is an EP.
"""
@@ -150,13 +190,21 @@ class Album:
Checks if the album is a single.
"""
if (
len(tracks) == 1
and tracks[0].title == self.title
and tracks[0].track == 1
and tracks[0].disc == 1
len(tracks) == 1
and tracks[0].title == self.title
# and tracks[0].track == 1
# and tracks[0].disc == 1
# Todo: Are the above commented checks necessary?
):
self.is_single = True
def get_date_from_tracks(self, tracks: list[Track]):
for track in tracks:
if track.date != "Unknown":
self.date = track.date
break
@dataclass
class Playlist:
+41 -32
View File
@@ -1,16 +1,42 @@
"""
Contains default configs
"""
import multiprocessing
import os
APP_VERSION = "Swing v0.0.1.alpha"
# ------- HELPER METHODS --------
def get_xdg_config_dir():
"""
Returns the XDG_CONFIG_HOME environment variable if it exists, otherwise
returns the default config directory. If none of those exist, returns the
user's home directory.
"""
xdg_config_home = os.environ.get("XDG_CONFIG_HOME")
if xdg_config_home:
return xdg_config_home
try:
alt_dir = os.path.join(os.environ.get("HOME"), ".config")
if os.path.exists(alt_dir):
return alt_dir
except TypeError:
return os.path.expanduser("~")
# ------- HELPER METHODS --------
APP_VERSION = "v.1.1.0.beta"
# paths
CONFIG_FOLDER = ".swing"
HOME_DIR = os.path.expanduser("~")
XDG_CONFIG_DIR = get_xdg_config_dir()
USER_HOME_DIR = os.path.expanduser("~")
APP_DIR = os.path.join(HOME_DIR, CONFIG_FOLDER)
CONFIG_FOLDER = "swingmusic" if XDG_CONFIG_DIR != USER_HOME_DIR else ".swingmusic"
APP_DIR = os.path.join(XDG_CONFIG_DIR, CONFIG_FOLDER)
IMG_PATH = os.path.join(APP_DIR, "images")
ARTIST_IMG_PATH = os.path.join(IMG_PATH, "artists")
@@ -20,7 +46,7 @@ ARTIST_IMG_LG_PATH = os.path.join(ARTIST_IMG_PATH, "large")
THUMBS_PATH = os.path.join(IMG_PATH, "thumbnails")
SM_THUMB_PATH = os.path.join(THUMBS_PATH, "small")
LG_THUMBS_PATH = os.path.join(THUMBS_PATH, "large")
MUSIC_DIR = os.path.join(USER_HOME_DIR, "Music")
# TEST_DIR = "/home/cwilvx/Downloads/Telegram Desktop"
# TEST_DIR = "/mnt/dfc48e0f-103b-426e-9bf9-f25d3743bc96/Music/Chill/Wolftyla Radio"
@@ -34,11 +60,6 @@ IMG_PLAYLIST_URI = IMG_BASE_URI + "playlists/"
# defaults
DEFAULT_ARTIST_IMG = IMG_ARTIST_URI + "0.webp"
LAST_FM_API_KEY = "762db7a44a9e6fb5585661f5f2bdf23a"
CPU_COUNT = multiprocessing.cpu_count()
THUMB_SIZE = 400
SM_THUMB_SIZE = 64
SM_ARTIST_IMG_SIZE = 64
@@ -46,34 +67,15 @@ SM_ARTIST_IMG_SIZE = 64
The size of extracted images in pixels
"""
LOGGER_ENABLE: bool = True
FILES = ["flac", "mp3", "wav", "m4a"]
FILES = ["flac", "mp3", "wav", "m4a", "ogg", "wma", "opus", "alac", "aiff"]
SUPPORTED_FILES = tuple(f".{file}" for file in FILES)
SUPPORTED_IMAGES = (".jpg", ".png", ".webp", ".jpeg")
SUPPORTED_DIR_IMAGES = [
"folder",
"cover",
"album",
"front",
]
# ===== DB =========
USE_MONGO = False
# ===== SQLite =====
APP_DB_NAME = "swing.db"
USER_DATA_DB_NAME = "userdata.db"
APP_DB_PATH = os.path.join(APP_DIR, APP_DB_NAME)
USERDATA_DB_PATH = os.path.join(APP_DIR, USER_DATA_DB_NAME)
# ===== Store =====
USE_STORE = True
HELP_MESSAGE = """
Usage: swingmusic [options]
@@ -81,10 +83,17 @@ Options:
--build: Build the application
--host: Set the host
--port: Set the port
--no-feat: Do not extract featured artists from the song title
--help, -h: Show this help message
--version, -v: Show the version
"""
EXTRACT_FEAT = True
"""
Whether to extract the featured artists from the song title.
Changed using the `--no-feat` flag
"""
class TCOLOR:
"""
@@ -95,7 +104,7 @@ class TCOLOR:
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
YELLOW = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
+15 -7
View File
@@ -3,21 +3,25 @@ Contains the functions to prepare the server for use.
"""
import os
import shutil
import time
from configparser import ConfigParser
from app import settings
from app.db.sqlite import create_connection, create_tables, queries
from app.db.store import Store
from app.migrations import apply_migrations, set_postinit_migration_versions
from app.migrations._preinit import (
run_preinit_migrations,
set_preinit_migration_versions,
)
from app.settings import APP_DB_PATH, USERDATA_DB_PATH
from app.utils import get_home_res_path
config = ConfigParser()
config_path = get_home_res_path("pyinstaller.config.ini")
config.read(config_path)
try:
IS_BUILD = config["DEFAULT"]["BUILD"] == "True"
except KeyError:
@@ -64,10 +68,6 @@ def create_config_dir() -> None:
"""
Creates the config directory if it doesn't exist.
"""
home_dir = os.path.expanduser("~")
config_folder = os.path.join(home_dir, settings.CONFIG_FOLDER)
thumb_path = os.path.join("images", "thumbnails")
small_thumb_path = os.path.join(thumb_path, "small")
large_thumb_path = os.path.join(thumb_path, "large")
@@ -91,7 +91,7 @@ def create_config_dir() -> None:
]
for _dir in dirs:
path = os.path.join(config_folder, _dir)
path = os.path.join(settings.APP_DIR, _dir)
exists = os.path.exists(path)
if not exists:
@@ -107,6 +107,7 @@ def setup_sqlite():
"""
# if os.path.exists(DB_PATH):
# os.remove(DB_PATH)
run_preinit_migrations()
app_db_conn = create_connection(APP_DB_PATH)
playlist_db_conn = create_connection(USERDATA_DB_PATH)
@@ -114,9 +115,16 @@ def setup_sqlite():
create_tables(app_db_conn, queries.CREATE_APPDB_TABLES)
create_tables(playlist_db_conn, queries.CREATE_USERDATA_TABLES)
create_tables(app_db_conn, queries.CREATE_MIGRATIONS_TABLE)
create_tables(playlist_db_conn, queries.CREATE_MIGRATIONS_TABLE)
app_db_conn.close()
playlist_db_conn.close()
apply_migrations()
set_preinit_migration_versions()
set_postinit_migration_versions()
Store.load_all_tracks()
Store.process_folders()
Store.load_albums()
+139 -22
View File
@@ -1,13 +1,19 @@
"""
This module contains mini functions for the server.
"""
import os
import hashlib
from pathlib import Path
import os
import platform
import random
import re
import socket as Socket
import string
import threading
from datetime import datetime
from unidecode import unidecode
from pathlib import Path
import requests
from unidecode import unidecode
from app import models
from app.settings import SUPPORTED_FILES
@@ -27,27 +33,34 @@ def background(func):
return background_func
def run_fast_scandir(__dir: str, full=False) -> tuple[list[str], list[str]]:
def run_fast_scandir(_dir: str, full=False) -> tuple[list[str], list[str]]:
"""
Scans a directory for files with a specific extension. Returns a list of files and folders in the directory.
Scans a directory for files with a specific extension.
Returns a list of files and folders in the directory.
"""
if _dir == "":
return [], []
subfolders = []
files = []
for f in os.scandir(__dir):
if f.is_dir() and not f.name.startswith("."):
subfolders.append(f.path)
if f.is_file():
ext = os.path.splitext(f.name)[1].lower()
if ext in SUPPORTED_FILES:
files.append(f.path)
try:
for _file in os.scandir(_dir):
if _file.is_dir() and not _file.name.startswith("."):
subfolders.append(_file.path)
if _file.is_file():
ext = os.path.splitext(_file.name)[1].lower()
if ext in SUPPORTED_FILES:
files.append(win_replace_slash(_file.path))
if full or len(files) == 0:
for _dir in list(subfolders):
sf, f = run_fast_scandir(_dir, full=True)
subfolders.extend(sf)
files.extend(f)
if full or len(files) == 0:
for _dir in list(subfolders):
sub_dirs, _file = run_fast_scandir(_dir, full=True)
subfolders.extend(sub_dirs)
files.extend(_file)
except (OSError, PermissionError, FileNotFoundError, ValueError):
return [], []
return subfolders, files
@@ -169,18 +182,16 @@ def get_artists_from_tracks(tracks: list[models.Track]) -> set[str]:
def get_albumartists(albums: list[models.Album]) -> set[str]:
artists = set()
# master_artist_list = [a.albumartists for a in albums]
for album in albums:
albumartists = [a.name for a in album.albumartists] # type: ignore
artists.update(albumartists)
# return [models.Artist(a) for a in artists]
return artists
def get_all_artists(
tracks: list[models.Track], albums: list[models.Album]
tracks: list[models.Track], albums: list[models.Album]
) -> list[models.Artist]:
artists_from_tracks = get_artists_from_tracks(tracks)
artist_from_albums = get_albumartists(albums)
@@ -221,6 +232,112 @@ def bisection_search_string(strings: list[str], target: str) -> str | None:
def get_home_res_path(filename: str):
"""
Returns a path to resources in the home directory of this project. Used to resolve resources in builds.
Returns a path to resources in the home directory of this project.
Used to resolve resources in builds.
"""
return (CWD / ".." / filename).resolve()
try:
return (CWD / ".." / filename).resolve()
except ValueError:
return None
def get_ip():
"""
Returns the IP address of this device.
"""
soc = Socket.socket(Socket.AF_INET, Socket.SOCK_DGRAM)
soc.connect(("8.8.8.8", 80))
ip_address = str(soc.getsockname()[0])
soc.close()
return ip_address
def is_windows():
"""
Returns True if the OS is Windows.
"""
return platform.system() == "Windows"
def parse_feat_from_title(title: str) -> tuple[list[str], str]:
"""
Extracts featured artists from a song title using regex.
"""
regex = r"\((?:feat|ft|featuring|with)\.?\s+(.+?)\)"
# regex for square brackets 👇
sqr_regex = r"\[(?:feat|ft|featuring|with)\.?\s+(.+?)\]"
match = re.search(regex, title, re.IGNORECASE)
if not match:
match = re.search(sqr_regex, title, re.IGNORECASE)
regex = sqr_regex
if not match:
return [], title
artists = match.group(1)
artists = split_artists(artists, with_and=True)
# remove "feat" group from title
new_title = re.sub(regex, "", title, flags=re.IGNORECASE)
return artists, new_title
def get_random_str(length=5):
"""
Generates a random string of length `length`.
"""
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
def win_replace_slash(path: str):
if is_windows():
return path.replace("\\", "/").replace("//", "/")
return path
def split_artists(src: str, with_and: bool = False):
exp = r"\s*(?:and|&|,|;)\s*" if with_and else r"\s*[,;]\s*"
artists = re.split(exp, src)
return [a.strip() for a in artists]
def parse_artist_from_filename(title: str):
"""
Extracts artist names from a song title using regex.
"""
regex = r"^(.+?)\s*[-–—]\s*(?:.+?)$"
match = re.search(regex, title, re.IGNORECASE)
if not match:
return []
artists = match.group(1)
artists = split_artists(artists)
return artists
def parse_title_from_filename(title: str):
"""
Extracts track title from a song title using regex.
"""
regex = r"^(?:.+?)\s*[-–—]\s*(.+?)$"
match = re.search(regex, title, re.IGNORECASE)
if not match:
return title
res = match.group(1)
# remove text in brackets starting with "official" case insensitive
res = re.sub(r"\s*\([^)]*official[^)]*\)", "", res, flags=re.IGNORECASE)
return res.strip()
# for title in sample_titles:
# print(parse_artist_from_filename(title))
# print(parse_title_from_filename(title))
+47 -14
View File
@@ -8,12 +8,13 @@ from configparser import ConfigParser
import PyInstaller.__main__ as bundler
from app import settings
from app.api import create_api
from app.functions import run_periodic_checks
from app.lib.watchdogg import Watcher as WatchDog
from app.settings import APP_VERSION, HELP_MESSAGE, TCOLOR
from app.setup import run_setup
from app.utils import background, get_home_res_path
from app.utils import background, get_home_res_path, get_ip, is_windows
werkzeug = logging.getLogger("werkzeug")
werkzeug.setLevel(logging.ERROR)
@@ -42,7 +43,7 @@ def serve_client_files(path):
@app.route("/")
def serve_client():
"""
Serves the index.html file at client/index.html.
Serves the index.html file at `client/index.html`.
"""
return app.send_static_file("index.html")
@@ -58,6 +59,7 @@ class ArgsEnum:
build = "--build"
port = "--port"
host = "--host"
no_feat = "--no-feat"
help = ["--help", "-h"]
version = ["--version", "-v"]
@@ -67,6 +69,7 @@ class HandleArgs:
self.handle_build()
self.handle_host()
self.handle_port()
self.handle_no_feat()
self.handle_help()
self.handle_version()
@@ -80,6 +83,8 @@ class HandleArgs:
config["DEFAULT"]["BUILD"] = "True"
config.write(file)
_s = ";" if is_windows() else ":"
bundler.run(
[
"manage.py",
@@ -87,9 +92,9 @@ class HandleArgs:
"--name",
"swingmusic",
"--clean",
"--add-data=assets:assets",
"--add-data=client:client",
"--add-data=pyinstaller.config.ini:.",
f"--add-data=assets{_s}assets",
f"--add-data=client{_s}client",
f"--add-data=pyinstaller.config.ini{_s}.",
"-y",
]
)
@@ -129,6 +134,11 @@ class HandleArgs:
Variables.FLASK_HOST = host # type: ignore
@staticmethod
def handle_no_feat():
if ArgsEnum.no_feat in ARGS:
settings.EXTRACT_FEAT = False
@staticmethod
def handle_help():
if any((a in ARGS for a in ArgsEnum.help)):
@@ -153,31 +163,54 @@ def start_watchdog():
WatchDog().run()
def log_info():
lines = " -------------------------------------"
def log_startup_info():
lines = "------------------------------"
# clears terminal 👇
os.system("cls" if os.name == "nt" else "echo -e \\\\033c")
print(lines)
print(f" {TCOLOR.HEADER}{APP_VERSION} {TCOLOR.ENDC}")
print(f"{TCOLOR.HEADER}SwingMusic {APP_VERSION} {TCOLOR.ENDC}")
adresses = [Variables.FLASK_HOST]
if Variables.FLASK_HOST == "0.0.0.0":
adresses = ["localhost", get_ip()]
print("Started app on:")
for address in adresses:
# noinspection HttpUrlsUsage
print(
f"{TCOLOR.OKGREEN}http://{address}:{Variables.FLASK_PORT}{TCOLOR.ENDC}"
)
print(lines)
print("\n")
if not settings.EXTRACT_FEAT:
print(
f"{TCOLOR.OKBLUE}Extracting featured artists from track titles: {TCOLOR.FAIL}DISABLED!{TCOLOR.ENDC}"
)
print(
f" Started app on: {TCOLOR.OKGREEN}http://{Variables.FLASK_HOST}:{Variables.FLASK_PORT}{TCOLOR.ENDC}"
f"{TCOLOR.OKBLUE}App data folder: {settings.APP_DIR}{TCOLOR.OKGREEN}{TCOLOR.ENDC}"
)
print(lines)
print("\n")
if __name__ == "__main__":
HandleArgs()
log_info()
log_startup_info()
run_bg_checks()
start_watchdog()
app.run(
debug=True,
debug=False,
threaded=True,
host=Variables.FLASK_HOST,
port=Variables.FLASK_PORT,
use_reloader=False,
)
# TODO: Find out how to print in color: red for errors, etc.
# TODO: Find a way to verify the host string
# TODO: Organize code in this file: move args to new file, etc.
# TODO: Organize code in this file: move args to new file, etc.
-44
View File
@@ -1,44 +0,0 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['manage.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='manage',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)
Generated
+781 -625
View File
File diff suppressed because it is too large Load Diff
+12 -7
View File
@@ -5,25 +5,30 @@ description = ""
authors = ["geoffrey45 <geoffreymungai45@gmail.com>"]
[tool.poetry.dependencies]
python = ">=3.10"
python = ">=3.10,<3.12"
Flask = "^2.0.2"
Flask-Cors = "^3.0.10"
requests = "^2.27.1"
watchdog = "^2.2.0"
watchdog = "^2.2.1"
gunicorn = "^20.1.0"
Pillow = "^9.0.1"
"colorgram.py" = "^1.2.0"
tqdm = "^4.64.0"
rapidfuzz = "^2.13.7"
tinytag = "^1.8.1"
hypothesis = "^6.56.3"
pytest = "^7.1.3"
pylint = "^2.15.5"
Unidecode = "^1.3.6"
pyinstaller = "^5.7.0"
psutil = "^5.9.4"
[tool.poetry.dev-dependencies]
black = {version = "^22.6.0", allow-prereleases = true}
pylint = "^2.15.5"
pytest = "^7.1.3"
hypothesis = "^6.56.3"
pyinstaller = "^5.7.0"
[tool.poetry.dev-dependencies.black]
version = "^22.6.0"
allow-prereleases = true
[build-system]
requires = ["poetry-core>=1.0.0"]
-30
View File
@@ -1,30 +0,0 @@
black @ file:///home/cwilvx/.cache/pypoetry/artifacts/6a/ca/67/2501f462728be2eb38d33f074ba5e8c08d49867e154b321b3f0b41db86/black-22.6.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
cachelib @ file:///home/cwilvx/.cache/pypoetry/artifacts/cd/93/0e/3cc9b898ce11816a06c6fdc2c82b4f32443ad17db9ac94b2b74380ebdf/cachelib-0.7.0-py3-none-any.whl
certifi @ file:///home/cwilvx/.cache/pypoetry/artifacts/8c/ef/5f/67cf35ca016dcd84174e483e20fc3cfcd8bf39b6852e96236e852f31ba/certifi-2022.5.18.1-py3-none-any.whl
charset-normalizer @ file:///home/cwilvx/.cache/pypoetry/artifacts/d5/27/31/db4fb74906e3a7f55f720e0079ac1850dd86e30651cdfa5e1f04c53cfa/charset_normalizer-2.0.12-py3-none-any.whl
click @ file:///home/cwilvx/.cache/pypoetry/artifacts/63/f3/4c/2270b95f4d37b9ea73cd401abe68b6e9ede30380533cd4e7118a8e3aa3/click-8.1.3-py3-none-any.whl
colorgram.py @ file:///home/cwilvx/.cache/pypoetry/artifacts/b9/4f/19/0bfe8f89dd3c5df77fd3399df1820ed195abdb2f850e3d64336a672d1b/colorgram.py-1.2.0-py2.py3-none-any.whl
Flask @ file:///home/cwilvx/.cache/pypoetry/artifacts/61/b9/1a/04191a9edc7415cae23e0e84b682bd895d55cc79f68018278adbca71c8/Flask-2.1.2-py3-none-any.whl
Flask-Caching @ file:///home/cwilvx/.cache/pypoetry/artifacts/e9/38/2f/8faf7982cf117a9058f8e8c2140c686f929bf8911986c0ab697cae8448/Flask_Caching-1.11.1-py3-none-any.whl
Flask-Cors @ file:///home/cwilvx/.cache/pypoetry/artifacts/b7/c4/f4/3606582505f2ade21c9f72607db37c2bd347d83951df4749019c3d39f8/Flask_Cors-3.0.10-py2.py3-none-any.whl
gunicorn @ file:///home/cwilvx/.cache/pypoetry/artifacts/9f/68/9f/f1166be9473b4fe2cc59c98fac616db1f94b18662b9055d1ac940374e3/gunicorn-20.1.0-py3-none-any.whl
idna @ file:///home/cwilvx/.cache/pypoetry/artifacts/90/36/8c/81eabf6ac88608721ab27f439c9a6b9a8e6a21cc58c59ebb1a42720199/idna-3.3-py3-none-any.whl
itsdangerous @ file:///home/cwilvx/.cache/pypoetry/artifacts/2e/15/8d/e1a5243416994d875e03f548c0c5af64a08970297056408d4e67e6bc28/itsdangerous-2.1.2-py3-none-any.whl
jarowinkler @ file:///home/cwilvx/.cache/pypoetry/artifacts/37/93/e8/2c0fb4589d71bd0c06ac156569ba434fb917ce65e3fc77353dc1960e7a/jarowinkler-1.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Jinja2 @ file:///home/cwilvx/.cache/pypoetry/artifacts/49/36/ae/943f6cd852641f7249acddef711eb97d0c9ed91d7f435c798b6d7041ca/Jinja2-3.1.2-py3-none-any.whl
MarkupSafe @ file:///home/cwilvx/.cache/pypoetry/artifacts/dd/cc/d7/91f68383c04a15a87f0a2b31599de891c89b1d15e309273f759daf132c/MarkupSafe-2.1.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
mutagen @ file:///home/cwilvx/.cache/pypoetry/artifacts/b4/fa/ad/d30a69658cc841ca38b77185eed0d97982259ce1cf1da32af87b376d4e/mutagen-1.45.1-py3-none-any.whl
mypy-extensions @ file:///home/cwilvx/.cache/pypoetry/artifacts/2f/c6/09/3e1afdcb75322c65b786e63cd7e879b6be3db36dea78ca376db5483ae4/mypy_extensions-0.4.3-py2.py3-none-any.whl
pathspec @ file:///home/cwilvx/.cache/pypoetry/artifacts/48/a0/9f/f5128d9e11d591bca7a942dd80ec44f9b2de8294775c68e0b99beeef93/pathspec-0.9.0-py2.py3-none-any.whl
Pillow @ file:///home/cwilvx/.cache/pypoetry/artifacts/95/cd/1a/99053885d95d74defc6d40d0bd7518f83fd74b133dfd762dfb523db565/Pillow-9.2.0-cp310-cp310-manylinux_2_28_x86_64.whl
platformdirs @ file:///home/cwilvx/.cache/pypoetry/artifacts/ea/8e/52/e5ac2f14474cef8f0fd44b4aa7d6968bfa89442d1b88ab567c446eae70/platformdirs-2.5.2-py3-none-any.whl
progress @ file:///home/cwilvx/.cache/pypoetry/artifacts/79/c2/d7/2a7bb2708100a9ccc186a9d3b9376c85fb53798080a3fe7480454fb17b/progress-1.6.tar.gz
pymongo @ file:///home/cwilvx/.cache/pypoetry/artifacts/64/8f/c6/6c691a87845035107c96bbd25b59f19d5cc716c2d46cbbdeb4ec149795/pymongo-4.1.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
rapidfuzz @ file:///home/cwilvx/.cache/pypoetry/artifacts/9d/5e/96/b127feb34cd55e8eedc2ba19c53199ebceb9252389ec7a95cd4eb6e154/rapidfuzz-2.0.11-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
requests @ file:///home/cwilvx/.cache/pypoetry/artifacts/d2/b2/c6/a04ce59140c6739203837d8dd0f518e29051b7ab61d2f34d4fd4241d30/requests-2.27.1-py2.py3-none-any.whl
six @ file:///home/cwilvx/.cache/pypoetry/artifacts/89/b2/f8/fd92b6d5daa0f8889429b2fc67ec21eedc5cae5d531ee2853828ced6c7/six-1.16.0-py2.py3-none-any.whl
tomli @ file:///home/cwilvx/.cache/pypoetry/artifacts/62/12/b6/6db9ebb9c8e1a6c5aa8a92ae73098d8119816b5e8507490916621bc305/tomli-2.0.1-py3-none-any.whl
tqdm @ file:///home/cwilvx/.cache/pypoetry/artifacts/9c/70/8c/d9fd60c1049cc4dba00815d66d598e9d5f265d4d59489e074827e331a9/tqdm-4.64.0-py2.py3-none-any.whl
urllib3 @ file:///home/cwilvx/.cache/pypoetry/artifacts/88/1c/d5/a55ed0245e5d7cd3a9f40dd75733644cbf7b7d94a6c521eb6c027a326c/urllib3-1.26.9-py2.py3-none-any.whl
watchdog @ file:///home/cwilvx/.cache/pypoetry/artifacts/0f/af/c3/b6575b0b5cab70c439d50980bd9673762b57878772f930d2d908ca83fc/watchdog-2.1.8-py3-none-manylinux2014_x86_64.whl
Werkzeug @ file:///home/cwilvx/.cache/pypoetry/artifacts/34/38/89/78911cfcd7dec75796d6056c730d94f730967bfe4fb4c5192b8d0d81ec/Werkzeug-2.1.2-py3-none-any.whl
-39
View File
@@ -1,39 +0,0 @@
# Fixes !
- [ ] Click on artist image to go to artist page ⚠
- [ ] Play next song if current song can't be loaded ⚠
<!-- -->
- [ ] Removing song duplicates from queries
- [ ] Add support for WAV files
- [ ] Compress thumbnails
# Features +
## Needed features
- [ ] Adding songs to queue
<!-- -->
- [ ] Add keyboard shortcuts
- [ ] Adjust volume
- [ ] Add listening statistics for all songs
- [ ] Extract color from artist image [for use with artist card gradient]
- [ ] Adding songs to favorites
- [ ] Playing song radio
## Future features
- [ ] Toggle shuffle
- [ ] Toggle repeat
- [ ] Suggest similar artists
- [ ] Getting artist info
- [ ] Create a Python script to build, bundle and serve the app
- [ ] Getting extra song info (probably from genius)
- [ ] Getting lyrics
- [ ] Sorting songs
- [ ] Suggest undiscorvered artists, albums and songs
- [ ] Remember last played song
- [ ] Add next and previous song transition and progress bar reset animations
- [ ] Add playlist to folder
- [ ] Add functionality to 'Listen now' button
- [ ] Paginated requests for songs
- [ ] Package app as installable PWA
-25
View File
@@ -1,25 +0,0 @@
#!/bin/zsh
gpath=$(poetry run which gunicorn)
# pytest=$(poetry run which pytest)
# $pytest # -q
while getopts ':s' opt; do
case $opt in
s)
echo "Starting image server"
cd "./app"
"$gpath" -b 0.0.0.0:1971 -w 1 --threads=1 "imgserver:app" &
cd ../
echo "Done ✅"
;;
\?)
echo "Invalid option: -$OPTARG" >&2
;;
esac
done
echo "Starting swing"
"$gpath" -b 0.0.0.0:1970 --threads=2 "manage:create_api()"
-44
View File
@@ -1,44 +0,0 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['manage.py'],
pathex=[],
binaries=[],
datas=[('assets', 'assets'), ('client', 'client'), ('pyinstaller.config.ini', '.')],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='swing',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)
View File
+34
View File
@@ -0,0 +1,34 @@
from hypothesis import given
from app.utils import parse_feat_from_title
def test_extract_featured_artists_from_title():
test_titles = [
"Own it (Featuring Ed Sheeran & Stormzy)",
"Own it (Featuring Ed Sheeran and Stormzy)",
"Autograph (On my line)(Feat. Lil Peep)(Deluxe)",
"Why so sad? (with Juice Wrld, Lil Peep)",
"Why so sad? (with Juice Wrld/Lil Peep)",
"Simmer (with Burna Boy)",
"Simmer (without Burna Boy)",
]
results = [
["Ed Sheeran", "Stormzy"],
["Ed Sheeran", "Stormzy"],
["Lil Peep"],
["Juice Wrld", "Lil Peep"],
["Juice Wrld/Lil Peep"],
["Burna Boy"],
[],
]
for title, expected in zip(test_titles, results):
assert parse_feat_from_title(title) == expected
# === HYPOTHESIS GHOSTWRITER TESTS ===
# @given(__dir=st.text(), full=st.booleans())
# def test_fuzz_run_fast_scandir(__dir: str, full) -> None:
# app.utils.run_fast_scandir(_dir=__dir, full=full)
-5
View File
@@ -1,5 +0,0 @@
from app import create_api
if __name__ == '__main__':
app = create_api()
app.run(debug=True, threaded=True)