mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-04 20:43:04 +00:00
update supported audio files in settings.py
+ add win_replace_slash function to format win path strings + misc
This commit is contained in:
+38
-7
@@ -2,6 +2,7 @@
|
||||
Contains all the folder routes.
|
||||
"""
|
||||
import os
|
||||
import psutil
|
||||
|
||||
from pathlib import Path
|
||||
from flask import Blueprint, request
|
||||
@@ -10,7 +11,7 @@ from app import settings
|
||||
from app.lib.folderslib import GetFilesAndDirs
|
||||
from app.db.sqlite.settings import SettingsSQLMethods as db
|
||||
from app.models import Folder
|
||||
from app.utils import create_folder_hash
|
||||
from app.utils import create_folder_hash, is_windows, win_replace_slash
|
||||
|
||||
api = Blueprint("folder", __name__, url_prefix="/")
|
||||
|
||||
@@ -42,8 +43,8 @@ def get_folder_tree():
|
||||
return {
|
||||
"folders": [
|
||||
Folder(
|
||||
name=f.name,
|
||||
path=str(f),
|
||||
name=f.name if f.name != "" else str(f).replace("\\", "/"),
|
||||
path=win_replace_slash(str(f)),
|
||||
has_tracks=True,
|
||||
is_sym=f.is_symlink(),
|
||||
path_hash=create_folder_hash(*f.parts[1:]),
|
||||
@@ -61,26 +62,56 @@ def get_folder_tree():
|
||||
}
|
||||
|
||||
|
||||
def get_all_drives():
|
||||
"""
|
||||
Returns a list of all the drives on a windows machine.
|
||||
"""
|
||||
drives = psutil.disk_partitions()
|
||||
return [d.mountpoint for d in drives]
|
||||
|
||||
|
||||
@api.route("/folder/dir-browser", methods=["POST"])
|
||||
def list_folders():
|
||||
"""
|
||||
Returns a list of all the folders in the given folder.
|
||||
"""
|
||||
data = request.get_json()
|
||||
is_win = is_windows()
|
||||
|
||||
try:
|
||||
req_dir: str = data["folder"]
|
||||
except KeyError:
|
||||
req_dir = settings.USER_HOME_DIR
|
||||
req_dir = "$home"
|
||||
|
||||
if req_dir == "$home":
|
||||
req_dir = settings.USER_HOME_DIR
|
||||
# req_dir = settings.USER_HOME_DIR
|
||||
if is_win:
|
||||
return {
|
||||
"folders": [
|
||||
{"name": win_replace_slash(d), "path": win_replace_slash(d)}
|
||||
for d in get_all_drives()
|
||||
]
|
||||
}
|
||||
|
||||
entries = os.scandir(req_dir)
|
||||
req_dir = req_dir + "/"
|
||||
|
||||
try:
|
||||
entries = os.scandir(req_dir)
|
||||
except PermissionError:
|
||||
return {"folders": []}
|
||||
|
||||
dirs = [e.name for e in entries if e.is_dir() and not e.name.startswith(".")]
|
||||
dirs = [{"name": d, "path": os.path.join(req_dir, d)} for d in dirs]
|
||||
dirs = [
|
||||
{"name": d, "path": win_replace_slash(os.path.join(req_dir, d))} for d in dirs
|
||||
]
|
||||
|
||||
return {
|
||||
"folders": sorted(dirs, key=lambda i: i["name"]),
|
||||
}
|
||||
|
||||
|
||||
# todo:
|
||||
|
||||
# - handle showing windows disks in root_dir configuration
|
||||
# - handle the above, but for all partitions mounted in linux.
|
||||
# - handle the "\" in client's folder page breadcrumb
|
||||
|
||||
+25
-17
@@ -48,6 +48,19 @@ def rebuild_store(db_dirs: list[str]):
|
||||
log.info("Rebuilding library... ✅")
|
||||
|
||||
|
||||
def finalize(new_: list[str], removed_: list[str], db_dirs_: list[str]):
|
||||
"""
|
||||
Params:
|
||||
new_: will be added to the database
|
||||
removed_: will be removed from the database
|
||||
db_dirs_: will be used to remove tracks that
|
||||
are outside these directories from the database and store.
|
||||
"""
|
||||
sdb.remove_root_dirs(removed_)
|
||||
sdb.add_root_dirs(new_)
|
||||
rebuild_store(db_dirs_)
|
||||
|
||||
|
||||
@api.route("/settings/add-root-dirs", methods=["POST"])
|
||||
def add_root_dirs():
|
||||
"""
|
||||
@@ -66,33 +79,28 @@ def add_root_dirs():
|
||||
except KeyError:
|
||||
return msg, 400
|
||||
|
||||
def finalize(new_: list[str], removed_: list[str], db_dirs_: list[str]):
|
||||
sdb.remove_root_dirs(removed_)
|
||||
sdb.add_root_dirs(new_)
|
||||
rebuild_store(db_dirs_)
|
||||
|
||||
# ---
|
||||
db_dirs = sdb.get_root_dirs()
|
||||
_h = "$home"
|
||||
|
||||
try:
|
||||
if db_dirs[0] == _h and new_dirs[0] == _h.strip():
|
||||
return {"msg": "Not changed!"}
|
||||
db_home = any([d == _h for d in db_dirs]) # if $home is in db
|
||||
incoming_home = any([d == _h for d in new_dirs]) # if $home is in incoming
|
||||
|
||||
if db_dirs[0] == _h:
|
||||
sdb.remove_root_dirs(db_dirs)
|
||||
# handle $home case
|
||||
if db_home and incoming_home:
|
||||
return {"msg": "Not changed!"}
|
||||
|
||||
if new_dirs[0] == _h:
|
||||
finalize([_h], db_dirs, [settings.USER_HOME_DIR])
|
||||
if db_home or incoming_home:
|
||||
sdb.remove_root_dirs(db_dirs)
|
||||
|
||||
return {"root_dirs": [_h]}
|
||||
except IndexError:
|
||||
pass
|
||||
if incoming_home:
|
||||
finalize([_h], [], [settings.USER_HOME_DIR])
|
||||
return {"root_dirs": [_h]}
|
||||
|
||||
# ---
|
||||
|
||||
for _dir in new_dirs:
|
||||
children = get_child_dirs(_dir, db_dirs)
|
||||
removed_dirs.extend(children)
|
||||
# ---
|
||||
|
||||
for _dir in removed_dirs:
|
||||
try:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import json
|
||||
from app.db.sqlite.utils import SQLiteManager
|
||||
from app.utils import win_replace_slash
|
||||
|
||||
|
||||
class SettingsSQLMethods:
|
||||
@@ -19,7 +19,8 @@ class SettingsSQLMethods:
|
||||
cur.execute(sql)
|
||||
dirs = cur.fetchall()
|
||||
|
||||
return [dir[0] for dir in dirs]
|
||||
dirs = [dir[0] for dir in dirs]
|
||||
return [win_replace_slash(d) for d in dirs]
|
||||
|
||||
@staticmethod
|
||||
def add_root_dirs(dirs: list[str]):
|
||||
|
||||
+13
-3
@@ -17,6 +17,7 @@ from app.utils import (
|
||||
create_folder_hash,
|
||||
get_all_artists,
|
||||
remove_duplicates,
|
||||
win_replace_slash,
|
||||
)
|
||||
|
||||
|
||||
@@ -174,7 +175,7 @@ class Store:
|
||||
|
||||
return Folder(
|
||||
name=folder.name,
|
||||
path=str(folder),
|
||||
path=win_replace_slash(str(folder)),
|
||||
is_sym=folder.is_symlink(),
|
||||
has_tracks=True,
|
||||
path_hash=create_folder_hash(*folder.parts[1:]),
|
||||
@@ -218,9 +219,18 @@ class Store:
|
||||
]
|
||||
|
||||
all_folders = [Path(f) for f in all_folders]
|
||||
all_folders = [f for f in all_folders if f.exists()]
|
||||
# all_folders = [f for f in all_folders if f.exists()]
|
||||
|
||||
for path in tqdm(all_folders, desc="Processing folders"):
|
||||
valid_folders = []
|
||||
|
||||
for folder in all_folders:
|
||||
try:
|
||||
if folder.exists():
|
||||
valid_folders.append(folder)
|
||||
except PermissionError:
|
||||
pass
|
||||
|
||||
for path in tqdm(valid_folders, desc="Processing folders"):
|
||||
folder = cls.create_folder(str(path))
|
||||
|
||||
cls.folders.append(folder)
|
||||
|
||||
+19
-6
@@ -4,6 +4,8 @@ from concurrent.futures import ThreadPoolExecutor
|
||||
from app.db.store import Store
|
||||
from app.models import Folder, Track
|
||||
from app.settings import SUPPORTED_FILES
|
||||
from app.logger import log
|
||||
from app.utils import win_replace_slash
|
||||
|
||||
|
||||
class GetFilesAndDirs:
|
||||
@@ -26,14 +28,25 @@ class GetFilesAndDirs:
|
||||
ext = os.path.splitext(entry.name)[1].lower()
|
||||
|
||||
if entry.is_dir() and not entry.name.startswith("."):
|
||||
dirs.append(entry.path)
|
||||
dirs.append(win_replace_slash(entry.path))
|
||||
elif entry.is_file() and ext in SUPPORTED_FILES:
|
||||
files.append(entry.path)
|
||||
files.append(win_replace_slash(entry.path))
|
||||
|
||||
# sort files by modified time
|
||||
files.sort(
|
||||
key=lambda f: os.path.getmtime(f) # pylint: disable=unnecessary-lambda
|
||||
)
|
||||
files_ = []
|
||||
|
||||
for file in files:
|
||||
try:
|
||||
files_.append(
|
||||
{
|
||||
"path": file,
|
||||
"time": os.path.getmtime(file),
|
||||
}
|
||||
)
|
||||
except OSError as e:
|
||||
log.error(e)
|
||||
|
||||
files_.sort(key=lambda f: f["time"])
|
||||
files = [f["path"] for f in files_]
|
||||
|
||||
tracks = Store.get_tracks_by_filepaths(files)
|
||||
|
||||
|
||||
+5
-2
@@ -43,7 +43,10 @@ class Populate:
|
||||
|
||||
if len(dirs_to_scan) == 0:
|
||||
log.warning(
|
||||
"The root directory is not configured. Open the app in your web browser to configure."
|
||||
(
|
||||
"The root directory is not configured. "
|
||||
+ "Open the app in your webbrowser to configure."
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
@@ -85,7 +88,7 @@ class Populate:
|
||||
|
||||
for file in tqdm(untagged, desc="Reading files"):
|
||||
if POPULATE_KEY != key:
|
||||
raise PopulateCancelledError('Populate key changed')
|
||||
raise PopulateCancelledError("Populate key changed")
|
||||
|
||||
tags = get_tags(file)
|
||||
|
||||
|
||||
+31
-9
@@ -1,13 +1,17 @@
|
||||
import os
|
||||
import datetime
|
||||
import os
|
||||
from io import BytesIO
|
||||
|
||||
from tinytag import TinyTag
|
||||
from PIL import Image, UnidentifiedImageError
|
||||
from tinytag import TinyTag
|
||||
|
||||
from app import settings
|
||||
from app.utils import create_hash
|
||||
|
||||
from app.utils import (
|
||||
create_hash,
|
||||
parse_artist_from_filename,
|
||||
parse_title_from_filename,
|
||||
win_replace_slash,
|
||||
)
|
||||
|
||||
|
||||
def parse_album_art(filepath: str):
|
||||
@@ -81,7 +85,7 @@ def get_tags(filepath: str):
|
||||
|
||||
try:
|
||||
tags = TinyTag.get(filepath)
|
||||
except: # pylint: disable=bare-except
|
||||
except: # noqa: E722
|
||||
return None
|
||||
|
||||
no_albumartist: bool = (tags.albumartist == "") or (tags.albumartist is None)
|
||||
@@ -97,9 +101,22 @@ def get_tags(filepath: str):
|
||||
for tag in to_filename:
|
||||
p = getattr(tags, tag)
|
||||
if p == "" or p is None:
|
||||
setattr(tags, tag, filename)
|
||||
maybe = parse_title_from_filename(filename)
|
||||
setattr(tags, tag, maybe)
|
||||
|
||||
to_check = ["album", "artist", "year", "albumartist"]
|
||||
parse = ["artist", "albumartist"]
|
||||
for tag in parse:
|
||||
p = getattr(tags, tag)
|
||||
|
||||
if p == "" or p is None:
|
||||
maybe = parse_artist_from_filename(filename)
|
||||
|
||||
if maybe != []:
|
||||
setattr(tags, tag, ", ".join(maybe))
|
||||
else:
|
||||
setattr(tags, tag, "Unknown")
|
||||
|
||||
to_check = ["album", "year", "albumartist"]
|
||||
for prop in to_check:
|
||||
p = getattr(tags, prop)
|
||||
if (p is None) or (p == ""):
|
||||
@@ -127,10 +144,10 @@ def get_tags(filepath: str):
|
||||
tags.albumhash = create_hash(tags.album, tags.albumartist)
|
||||
tags.trackhash = create_hash(tags.artist, tags.album, tags.title)
|
||||
tags.image = f"{tags.albumhash}.webp"
|
||||
tags.folder = os.path.dirname(filepath)
|
||||
tags.folder = win_replace_slash(os.path.dirname(filepath))
|
||||
|
||||
tags.date = extract_date(tags.year)
|
||||
tags.filepath = filepath
|
||||
tags.filepath = win_replace_slash(filepath)
|
||||
tags.filetype = filetype
|
||||
|
||||
tags = tags.__dict__
|
||||
@@ -157,3 +174,8 @@ def get_tags(filepath: str):
|
||||
del tags[tag]
|
||||
|
||||
return tags
|
||||
|
||||
for tag in to_delete:
|
||||
del tags[tag]
|
||||
|
||||
return tags
|
||||
|
||||
@@ -63,7 +63,10 @@ class Watcher:
|
||||
|
||||
dir_map = [d for d in dir_map if d["realpath"] != d["original"]]
|
||||
|
||||
if len(dirs) > 0 and dirs[0] == "$home":
|
||||
# if len(dirs) > 0 and dirs[0] == "$home":
|
||||
# dirs = [settings.USER_HOME_DIR]
|
||||
|
||||
if any([d == "$home" for d in dirs]):
|
||||
dirs = [settings.USER_HOME_DIR]
|
||||
|
||||
event_handler = Handler(root_dirs=dirs, dir_map=dir_map)
|
||||
@@ -83,7 +86,7 @@ class Watcher:
|
||||
try:
|
||||
self.observer.start()
|
||||
log.info("Started watchdog")
|
||||
except FileNotFoundError:
|
||||
except (FileNotFoundError, PermissionError):
|
||||
log.error(
|
||||
"WatchdogError: Failed to start watchdog, root directories could not be resolved."
|
||||
)
|
||||
@@ -189,10 +192,11 @@ class Handler(PatternMatchingEventHandler):
|
||||
def __init__(self, root_dirs: list[str], dir_map: dict[str:str]):
|
||||
self.root_dirs = root_dirs
|
||||
self.dir_map = dir_map
|
||||
patterns = [f"*{f}" for f in settings.SUPPORTED_FILES]
|
||||
|
||||
PatternMatchingEventHandler.__init__(
|
||||
self,
|
||||
patterns=["*.flac", "*.mp3"],
|
||||
patterns=patterns,
|
||||
ignore_directories=True,
|
||||
case_sensitive=False,
|
||||
)
|
||||
|
||||
+1
-1
@@ -60,7 +60,7 @@ class Track:
|
||||
if self.artist is not None:
|
||||
artists = utils.split_artists(self.artist)
|
||||
|
||||
featured = utils.extract_featured_artists_from_title(self.title)
|
||||
featured = utils.parse_feat_from_title(self.title)
|
||||
original_lower = "-".join([a.lower() for a in artists])
|
||||
artists.extend([a for a in featured if a.lower() not in original_lower])
|
||||
|
||||
|
||||
+1
-1
@@ -71,7 +71,7 @@ SM_ARTIST_IMG_SIZE = 64
|
||||
The size of extracted images in pixels
|
||||
"""
|
||||
|
||||
FILES = ["flac", "mp3", "wav", "m4a"]
|
||||
FILES = ["flac", "mp3", "wav", "m4a", "ogg", "wma", "opus", "alac", "aiff"]
|
||||
SUPPORTED_FILES = tuple(f".{file}" for file in FILES)
|
||||
|
||||
# ===== SQLite =====
|
||||
|
||||
+74
-27
@@ -1,19 +1,18 @@
|
||||
"""
|
||||
This module contains mini functions for the server.
|
||||
"""
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import platform
|
||||
import random
|
||||
import re
|
||||
import socket as Socket
|
||||
import hashlib
|
||||
import string
|
||||
import threading
|
||||
import requests
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
from unidecode import unidecode
|
||||
|
||||
from app import models
|
||||
@@ -36,7 +35,8 @@ def background(func):
|
||||
|
||||
def run_fast_scandir(_dir: str, full=False) -> tuple[list[str], list[str]]:
|
||||
"""
|
||||
Scans a directory for files with a specific extension. Returns a list of files and folders in the directory.
|
||||
Scans a directory for files with a specific extension.
|
||||
Returns a list of files and folders in the directory.
|
||||
"""
|
||||
|
||||
if _dir == "":
|
||||
@@ -46,20 +46,20 @@ def run_fast_scandir(_dir: str, full=False) -> tuple[list[str], list[str]]:
|
||||
files = []
|
||||
|
||||
try:
|
||||
for _files in os.scandir(_dir):
|
||||
if _files.is_dir() and not _files.name.startswith("."):
|
||||
subfolders.append(_files.path)
|
||||
if _files.is_file():
|
||||
ext = os.path.splitext(_files.name)[1].lower()
|
||||
for _file in os.scandir(_dir):
|
||||
if _file.is_dir() and not _file.name.startswith("."):
|
||||
subfolders.append(_file.path)
|
||||
if _file.is_file():
|
||||
ext = os.path.splitext(_file.name)[1].lower()
|
||||
if ext in SUPPORTED_FILES:
|
||||
files.append(_files.path)
|
||||
files.append(win_replace_slash(_file.path))
|
||||
|
||||
if full or len(files) == 0:
|
||||
for _dir in list(subfolders):
|
||||
sub_dirs, _files = run_fast_scandir(_dir, full=True)
|
||||
sub_dirs, _file = run_fast_scandir(_dir, full=True)
|
||||
subfolders.extend(sub_dirs)
|
||||
files.extend(_files)
|
||||
except (PermissionError, FileNotFoundError, ValueError):
|
||||
files.extend(_file)
|
||||
except (OSError, PermissionError, FileNotFoundError, ValueError):
|
||||
return [], []
|
||||
|
||||
return subfolders, files
|
||||
@@ -191,7 +191,7 @@ def get_albumartists(albums: list[models.Album]) -> set[str]:
|
||||
|
||||
|
||||
def get_all_artists(
|
||||
tracks: list[models.Track], albums: list[models.Album]
|
||||
tracks: list[models.Track], albums: list[models.Album]
|
||||
) -> list[models.Artist]:
|
||||
artists_from_tracks = get_artists_from_tracks(tracks)
|
||||
artist_from_albums = get_albumartists(albums)
|
||||
@@ -232,7 +232,8 @@ def bisection_search_string(strings: list[str], target: str) -> str | None:
|
||||
|
||||
def get_home_res_path(filename: str):
|
||||
"""
|
||||
Returns a path to resources in the home directory of this project. Used to resolve resources in builds.
|
||||
Returns a path to resources in the home directory of this project.
|
||||
Used to resolve resources in builds.
|
||||
"""
|
||||
try:
|
||||
return (CWD / ".." / filename).resolve()
|
||||
@@ -259,12 +260,7 @@ def is_windows():
|
||||
return platform.system() == "Windows"
|
||||
|
||||
|
||||
def split_artists(src: str):
|
||||
artists = re.split(r"\s*[&,;]\s*", src)
|
||||
return [a.strip() for a in artists]
|
||||
|
||||
|
||||
def extract_featured_artists_from_title(title: str) -> list[str]:
|
||||
def parse_feat_from_title(title: str) -> list[str]:
|
||||
"""
|
||||
Extracts featured artists from a song title using regex.
|
||||
"""
|
||||
@@ -275,7 +271,7 @@ def extract_featured_artists_from_title(title: str) -> list[str]:
|
||||
return []
|
||||
|
||||
artists = match.group(1)
|
||||
artists = split_artists(artists)
|
||||
artists = split_artists(artists, with_and=True)
|
||||
return artists
|
||||
|
||||
|
||||
@@ -284,3 +280,54 @@ def get_random_str(length=5):
|
||||
Generates a random string of length `length`.
|
||||
"""
|
||||
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
|
||||
|
||||
|
||||
def win_replace_slash(path: str):
|
||||
if is_windows():
|
||||
return path.replace("\\", "/").replace("//", "/")
|
||||
|
||||
return path
|
||||
|
||||
|
||||
def split_artists(src: str, with_and: bool = False):
|
||||
exp = r"\s*(?:and|&|,|;)\s*" if with_and else r"\s*[,;]\s*"
|
||||
|
||||
artists = re.split(exp, src)
|
||||
return [a.strip() for a in artists]
|
||||
|
||||
def parse_artist_from_filename(title: str):
|
||||
"""
|
||||
Extracts artist names from a song title using regex.
|
||||
"""
|
||||
|
||||
regex = r"^(.+?)\s*[-–—]\s*(?:.+?)$"
|
||||
match = re.search(regex, title, re.IGNORECASE)
|
||||
|
||||
if not match:
|
||||
return []
|
||||
|
||||
artists = match.group(1)
|
||||
artists = split_artists(artists)
|
||||
return artists
|
||||
|
||||
|
||||
def parse_title_from_filename(title: str):
|
||||
"""
|
||||
Extracts track title from a song title using regex.
|
||||
"""
|
||||
|
||||
regex = r"^(?:.+?)\s*[-–—]\s*(.+?)$"
|
||||
match = re.search(regex, title, re.IGNORECASE)
|
||||
|
||||
if not match:
|
||||
return title
|
||||
|
||||
res = match.group(1)
|
||||
# remove text in brackets starting with "official" case insensitive
|
||||
res = re.sub(r"\s*\([^)]*official[^)]*\)", "", res, flags=re.IGNORECASE)
|
||||
return res.strip()
|
||||
|
||||
|
||||
# for title in sample_titles:
|
||||
# print(parse_artist_from_filename(title))
|
||||
# print(parse_title_from_filename(title))
|
||||
|
||||
Reference in New Issue
Block a user