mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-04 20:43:04 +00:00
modularize src
+ merge main.py and manage.py + move start logic to swingmusic/__main__.py + add a run.py on the project root
This commit is contained in:
@@ -0,0 +1,21 @@
|
||||
import locale
|
||||
from typing import Iterable, TypeVar
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
# Set to user's default locale:
|
||||
locale.setlocale(locale.LC_ALL, "")
|
||||
|
||||
# Or set to a specific locale:
|
||||
# locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
|
||||
|
||||
|
||||
def format_number(number: float) -> str:
|
||||
return locale.format_string("%d", number, grouping=True)
|
||||
|
||||
|
||||
def flatten(list_: Iterable[list[T]]) -> list[T]:
|
||||
"""
|
||||
Flattens a list of lists into a single list.
|
||||
"""
|
||||
return [item for sublist in list_ for item in sublist]
|
||||
@@ -0,0 +1,43 @@
|
||||
import hmac
|
||||
import hashlib
|
||||
|
||||
from flask_jwt_extended import current_user
|
||||
|
||||
from swingmusic.config import UserConfig
|
||||
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
"""
|
||||
Hashes the given password using sha256 algorithm and the user id as salt.
|
||||
|
||||
:param password: The password to hash.
|
||||
|
||||
:return: The hashed password.
|
||||
"""
|
||||
return hashlib.pbkdf2_hmac(
|
||||
"sha256", password.encode("utf-8"), UserConfig().serverId.encode("utf-8"), 100000
|
||||
).hex()
|
||||
|
||||
|
||||
def check_password(password: str, hashed: str) -> bool:
|
||||
"""
|
||||
This function checks if the given password matches the hashed password.
|
||||
|
||||
:param password: The password to check.
|
||||
:param hashed: The hashed password.
|
||||
|
||||
:return: Whether the password matches.
|
||||
"""
|
||||
|
||||
return hmac.compare_digest(hash_password(password), hashed)
|
||||
|
||||
|
||||
def get_current_userid() -> int:
|
||||
"""
|
||||
Get the current session user.
|
||||
"""
|
||||
try:
|
||||
return current_user["id"]
|
||||
except RuntimeError:
|
||||
# Catch this error raised during migration execution
|
||||
return 1
|
||||
@@ -0,0 +1,45 @@
|
||||
from typing import List, Optional, TypeVar
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
def use_bisection(
|
||||
source: List[T], key: str, queries: List[str], limit: int = -1
|
||||
) -> List[Optional[T]]:
|
||||
"""
|
||||
Uses bisection to find a list of items in another list.
|
||||
|
||||
Returns a list of found items with `None` items being not found items.
|
||||
"""
|
||||
|
||||
def find(query: str):
|
||||
left = 0
|
||||
right = len(source) - 1
|
||||
|
||||
while left <= right:
|
||||
mid = (left + right) // 2
|
||||
if source[mid].__getattribute__(key) == query:
|
||||
return source[mid]
|
||||
elif source[mid].__getattribute__(key) > query:
|
||||
right = mid - 1
|
||||
else:
|
||||
left = mid + 1
|
||||
|
||||
return None
|
||||
|
||||
if len(source) == 0:
|
||||
return []
|
||||
|
||||
results = []
|
||||
|
||||
for query in queries:
|
||||
res = find(query)
|
||||
|
||||
if res is None:
|
||||
continue
|
||||
|
||||
results.append(res)
|
||||
|
||||
if limit != -1 and len(results) >= limit:
|
||||
break
|
||||
|
||||
return results
|
||||
@@ -0,0 +1,18 @@
|
||||
from typing import Iterator
|
||||
|
||||
|
||||
class CustomList(list):
|
||||
# TODO: I think SharedMemoryList implementation will be done here.
|
||||
# This list should be used as a normal list without any changes in the stores.
|
||||
|
||||
def __getitem__(self, index):
|
||||
# Do some shared memory stuff here
|
||||
# print the length of the list
|
||||
# print(f"__getitem__ Length of the list: {len(self)}")
|
||||
return super().__getitem__(index)
|
||||
|
||||
def __iter__(self) -> Iterator:
|
||||
# Do some shared memory stuff here
|
||||
# print the length of the list
|
||||
# print(f"__iter__ Length of the list: {len(self)}")
|
||||
return super().__iter__()
|
||||
@@ -0,0 +1,144 @@
|
||||
import pendulum
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
|
||||
def timestamp_from_days_ago(days_ago: int):
|
||||
"""
|
||||
Returns a timestamp from a number of days ago.
|
||||
"""
|
||||
current_datetime = datetime.now()
|
||||
delta = timedelta(days=days_ago)
|
||||
past_timestamp = current_datetime - delta
|
||||
|
||||
return int(past_timestamp.timestamp())
|
||||
|
||||
|
||||
def create_new_date(date: datetime | None = None) -> str:
|
||||
"""
|
||||
Creates a new date and time string in the format of "YYYY-MM-DD HH:MM:SS"
|
||||
:return: A string of the current date and time.
|
||||
"""
|
||||
if not date:
|
||||
date = datetime.now()
|
||||
|
||||
return date.strftime(_format)
|
||||
|
||||
|
||||
def timestamp_to_time_passed(timestamp: str | int | float):
|
||||
"""
|
||||
Converts a timestamp to time passed. e.g. 2 minutes ago, 1 hour ago, yesterday, 2 days ago, 2 weeks ago, etc.
|
||||
"""
|
||||
now = datetime.now().timestamp()
|
||||
then = datetime.fromtimestamp(int(timestamp)).timestamp()
|
||||
|
||||
diff = now - then
|
||||
now = pendulum.now()
|
||||
return now.subtract(seconds=diff).diff_for_humans()
|
||||
|
||||
|
||||
def date_string_to_time_passed(prev_date: str) -> str:
|
||||
"""
|
||||
Converts a date string to time passed. e.g. 2 minutes ago, 1 hour ago, yesterday, 2 days ago, 2 weeks ago, etc.
|
||||
"""
|
||||
then = datetime.strptime(prev_date, _format).timestamp()
|
||||
return timestamp_to_time_passed(then)
|
||||
|
||||
|
||||
def seconds_to_time_string(seconds: int):
|
||||
"""
|
||||
Converts seconds to a time string. e.g. 1 hour 2 minutes, 1 hour 2 seconds, 1 hour, 1 minute 2 seconds, etc.
|
||||
"""
|
||||
hours = seconds // 3600
|
||||
minutes = (seconds % 3600) // 60
|
||||
remaining_seconds = seconds % 60
|
||||
|
||||
if hours > 0:
|
||||
if minutes > 0:
|
||||
return f"{hours} hr{'s' if hours > 1 else ''}, {minutes} min{'s' if minutes > 1 else ''}"
|
||||
|
||||
return f"{hours} hr{'s' if hours > 1 else ''}"
|
||||
|
||||
if minutes > 0:
|
||||
return f"{minutes} min{'s' if minutes > 1 else ''}"
|
||||
|
||||
return f"{remaining_seconds} sec"
|
||||
|
||||
|
||||
def get_date_range(duration: str, units_ago: int = 0):
|
||||
"""
|
||||
Returns a tuple of dates representing the start and end of a given duration.
|
||||
"""
|
||||
date_range = None
|
||||
seconds_ago = 0
|
||||
|
||||
if duration != "alltime":
|
||||
seconds_ago = (
|
||||
pendulum.now() - pendulum.now().subtract().start_of(duration)
|
||||
).total_seconds() * units_ago
|
||||
|
||||
match duration:
|
||||
case "day" | "week" | "month" | "year":
|
||||
date_range = (
|
||||
pendulum.now()
|
||||
.subtract(seconds=seconds_ago)
|
||||
.start_of(duration)
|
||||
.timestamp(),
|
||||
pendulum.now()
|
||||
# .end_of(duration)
|
||||
.timestamp(),
|
||||
)
|
||||
case "alltime":
|
||||
date_range = (0, pendulum.now().timestamp())
|
||||
case _:
|
||||
raise ValueError(f"Invalid duration: {duration}")
|
||||
|
||||
return (int(date_range[0]), int(date_range[1]))
|
||||
|
||||
|
||||
def get_duration_ago(duration: str, units_ago: int = 1) -> int:
|
||||
"""
|
||||
Returns the start of the last duration.
|
||||
"""
|
||||
seconds_in_day = 24 * 60 * 60
|
||||
now = pendulum.now()
|
||||
|
||||
match duration:
|
||||
case "day":
|
||||
return int(
|
||||
now.subtract(seconds=seconds_in_day * units_ago).timestamp()
|
||||
)
|
||||
case "week":
|
||||
return int(
|
||||
now
|
||||
.subtract(seconds=seconds_in_day * 7 * units_ago)
|
||||
.timestamp()
|
||||
)
|
||||
case "month":
|
||||
return int(
|
||||
now
|
||||
.subtract(seconds=seconds_in_day * 30 * units_ago)
|
||||
.timestamp()
|
||||
)
|
||||
case "year":
|
||||
return int(
|
||||
now
|
||||
.subtract(seconds=seconds_in_day * 365 * units_ago)
|
||||
.timestamp()
|
||||
)
|
||||
case _:
|
||||
raise ValueError(f"Invalid duration: {duration}")
|
||||
|
||||
|
||||
def get_duration_in_seconds(duration: str) -> int:
|
||||
"""
|
||||
Returns the number of seconds in a given duration.
|
||||
"""
|
||||
match duration:
|
||||
case "week" | "month" | "year":
|
||||
return int(pendulum.now().subtract().start_of(duration).timestamp())
|
||||
case "alltime":
|
||||
return int(pendulum.now().timestamp())
|
||||
|
||||
raise ValueError(f"Invalid duration: {duration}")
|
||||
@@ -0,0 +1,11 @@
|
||||
def coroutine(func):
|
||||
"""
|
||||
Decorator: primes `func` by advancing to first `yield`
|
||||
"""
|
||||
|
||||
def start(*args, **kwargs):
|
||||
cr = func(*args, **kwargs)
|
||||
next(cr)
|
||||
return cr
|
||||
|
||||
return start
|
||||
@@ -0,0 +1,21 @@
|
||||
import mimetypes
|
||||
|
||||
|
||||
def get_mime_from_ext(filename: str):
|
||||
"""
|
||||
Constructs a mime type from a file extension.
|
||||
"""
|
||||
ext = filename.rsplit(".", maxsplit=1)[-1]
|
||||
return f"audio/{ext}"
|
||||
|
||||
|
||||
def guess_mime_type(filename: str):
|
||||
"""
|
||||
Guess the mime type of a file.
|
||||
"""
|
||||
type = mimetypes.guess_type(filename)[0]
|
||||
|
||||
if type is None:
|
||||
return get_mime_from_ext(filename)
|
||||
|
||||
return type
|
||||
@@ -0,0 +1,62 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from swingmusic.utils.wintools import win_replace_slash
|
||||
|
||||
CWD = Path(__file__).parent.resolve()
|
||||
|
||||
FILES = ["flac", "mp3", "wav", "m4a", "ogg", "wma", "opus", "alac", "aiff"]
|
||||
SUPPORTED_FILES = tuple(f".{file}" for file in FILES)
|
||||
|
||||
|
||||
def run_fast_scandir(_dir: str, full=False) -> tuple[list[str], list[str]]:
|
||||
"""
|
||||
Scans a directory for files with a specific extension.
|
||||
Returns a list of files and folders in the directory.
|
||||
"""
|
||||
# if on mac, ignore Library folder and its children
|
||||
if os.name == "posix":
|
||||
dir_path = Path(_dir)
|
||||
library_path = Path.home() / "Library"
|
||||
if dir_path == library_path or library_path in dir_path.parents:
|
||||
return [], []
|
||||
|
||||
# if the path contains "node_modules" ignore
|
||||
if "node_modules" in _dir:
|
||||
return [], []
|
||||
|
||||
if _dir == "":
|
||||
return [], []
|
||||
|
||||
subfolders = []
|
||||
files = []
|
||||
|
||||
try:
|
||||
for _file in os.scandir(_dir):
|
||||
if _file.is_dir() and not _file.name.startswith("."):
|
||||
subfolders.append(_file.path)
|
||||
if _file.is_file():
|
||||
ext = os.path.splitext(_file.name)[1].lower()
|
||||
if ext in SUPPORTED_FILES:
|
||||
files.append(win_replace_slash(_file.path))
|
||||
|
||||
if full or len(files) == 0:
|
||||
for _dir in list(subfolders):
|
||||
sub_dirs, _file = run_fast_scandir(_dir, full=True)
|
||||
subfolders.extend(sub_dirs)
|
||||
files.extend(_file)
|
||||
except (OSError, PermissionError, FileNotFoundError, ValueError):
|
||||
return [], []
|
||||
|
||||
return subfolders, files
|
||||
|
||||
|
||||
def get_home_res_path(filename: str):
|
||||
"""
|
||||
Returns a path to resources in the home directory of this project.
|
||||
Used to resolve resources in builds.
|
||||
"""
|
||||
try:
|
||||
return (CWD / ".." / ".." / filename).resolve()
|
||||
except ValueError:
|
||||
return None
|
||||
@@ -0,0 +1,9 @@
|
||||
import string
|
||||
import random
|
||||
|
||||
|
||||
def get_random_str(length=5):
|
||||
"""
|
||||
Generates a random string of length `length`.
|
||||
"""
|
||||
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
|
||||
@@ -0,0 +1,44 @@
|
||||
import hashlib
|
||||
import xxhash
|
||||
|
||||
from unidecode import unidecode
|
||||
|
||||
|
||||
def create_hash(*args: str, decode=False, limit=10) -> str:
|
||||
"""
|
||||
This function creates a case-insensitive, non-alphanumeric chars ignoring hash from the given arguments.
|
||||
|
||||
Example use case:
|
||||
- Creating computable IDs for duplicate artists. eg. Juice WRLD and Juice Wrld should have the same ID.
|
||||
|
||||
:param args: The arguments to hash.
|
||||
:param decode: Whether to decode the arguments before hashing.
|
||||
:param limit: The number of characters to return.
|
||||
|
||||
:return: The hash.
|
||||
"""
|
||||
|
||||
def remove_non_alnum(token: str) -> str:
|
||||
token = token.lower().strip().replace(" ", "")
|
||||
t = "".join(t for t in token if t.isalnum())
|
||||
|
||||
if t == "":
|
||||
return token
|
||||
|
||||
return t
|
||||
|
||||
str_ = "".join(remove_non_alnum(t) for t in args)
|
||||
|
||||
if decode:
|
||||
str_ = unidecode(str_)
|
||||
|
||||
str_ = str_.encode("utf-8")
|
||||
return xxhash.xxh3_64(str_).hexdigest()
|
||||
# str_ = hashlib.sha1(str_).hexdigest()
|
||||
|
||||
# INFO: Return first 5 + last 5 characters
|
||||
# return (
|
||||
# str_[: limit // 2] + str_[-limit // 2 :]
|
||||
# if limit % 2 == 0
|
||||
# else str_[: limit // 2] + str_[-limit // 2 - 1 :]
|
||||
# )
|
||||
@@ -0,0 +1,111 @@
|
||||
from swingmusic.models.track import Track
|
||||
from typing import List, Dict, Tuple
|
||||
from collections import Counter
|
||||
|
||||
|
||||
def violates_gap_rule(
|
||||
balanced_mix: Dict[int, Track], position: int, track: Track, gap: int = 3
|
||||
) -> bool:
|
||||
"""
|
||||
Check if placing the track at the given position violates the gap rule.
|
||||
|
||||
The gap rule is violated if the track has an artist in common with any
|
||||
track within the gap range (default = 3).
|
||||
"""
|
||||
track_artists = set(artist["artisthash"] for artist in track.artists)
|
||||
|
||||
for i in range(max(0, position - gap), position):
|
||||
if i in balanced_mix:
|
||||
existing_artists = set(
|
||||
artist["artisthash"] for artist in balanced_mix[i].artists
|
||||
)
|
||||
if track_artists.intersection(existing_artists):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def find_next_position(
|
||||
balanced_mix: Dict[int, Track], start: int, track: Track, total_tracks: int
|
||||
) -> int:
|
||||
"""
|
||||
Find the next available position for the track, starting from 'start' and wrapping around.
|
||||
"""
|
||||
for i in range(start, total_tracks):
|
||||
if i not in balanced_mix and not violates_gap_rule(balanced_mix, i, track):
|
||||
return i
|
||||
for i in range(start):
|
||||
if i not in balanced_mix and not violates_gap_rule(balanced_mix, i, track):
|
||||
return i
|
||||
return start # If no better position is found, return the original position
|
||||
|
||||
|
||||
def is_tracklist_balanced(tracks: List[Track], gap: int = 3) -> Tuple[bool, bool]:
|
||||
"""
|
||||
Checks if a tracklist is balanced or can be balanced.
|
||||
|
||||
Args:
|
||||
- tracks: List of Track objects
|
||||
- gap: Minimum number of tracks between songs by the same artist (default 3)
|
||||
|
||||
Returns:
|
||||
- A tuple (can_be_balanced, is_currently_balanced)
|
||||
"""
|
||||
total_tracks = len(tracks)
|
||||
|
||||
# Count tracks per artist (considering only the first artist)
|
||||
artist_counts = Counter(track.artists[0]["artisthash"] for track in tracks)
|
||||
|
||||
# Calculate the maximum number of tracks an artist can have in a balanced list
|
||||
max_tracks_per_artist = (total_tracks + gap) // (gap + 1)
|
||||
|
||||
# Check if it's mathematically possible to balance the tracklist
|
||||
can_be_balanced = all(
|
||||
count <= max_tracks_per_artist for count in artist_counts.values()
|
||||
)
|
||||
|
||||
if not can_be_balanced:
|
||||
return False, False
|
||||
|
||||
# Check if the current arrangement is balanced
|
||||
is_currently_balanced = True
|
||||
artist_last_positions = {}
|
||||
|
||||
for i, track in enumerate(tracks):
|
||||
artist = track.artists[0]["artisthash"]
|
||||
if artist in artist_last_positions:
|
||||
if i - artist_last_positions[artist] <= gap:
|
||||
is_currently_balanced = False
|
||||
break
|
||||
artist_last_positions[artist] = i
|
||||
|
||||
return can_be_balanced, is_currently_balanced
|
||||
|
||||
|
||||
def balance_mix(tracks: List[Track]) -> List[Track]:
|
||||
"""
|
||||
Balances the mix by ensuring that the tracks in a mix are distributed evenly.
|
||||
Preserves the overall rating order of tracks while minimizing disruption.
|
||||
|
||||
Tracks that need to be moved are moved down the tracklist until they no longer
|
||||
violate the gap rule.
|
||||
"""
|
||||
can_be_balanced, is_balanced = is_tracklist_balanced(tracks)
|
||||
|
||||
if is_balanced:
|
||||
# Already balanced, no need to modify
|
||||
return tracks
|
||||
|
||||
# Proceed with best-effort balancing
|
||||
balanced_mix: Dict[int, Track] = {}
|
||||
total_tracks = len(tracks)
|
||||
|
||||
for i, track in enumerate(tracks):
|
||||
if i in balanced_mix or not violates_gap_rule(balanced_mix, i, track):
|
||||
balanced_mix[i] = track
|
||||
else:
|
||||
new_position = find_next_position(balanced_mix, i, track, total_tracks)
|
||||
balanced_mix[new_position] = track
|
||||
|
||||
# Convert the dictionary back to a list, preserving the new order
|
||||
return [balanced_mix[i] for i in sorted(balanced_mix.keys())]
|
||||
@@ -0,0 +1,31 @@
|
||||
import socket as Socket
|
||||
|
||||
|
||||
def has_connection(host="google.it", port=80, timeout=3):
|
||||
"""
|
||||
# REVIEW Was:
|
||||
Host: 8.8.8.8 (google-public-dns-a.google.com)
|
||||
OpenPort: 53/tcp
|
||||
Service: domain (DNS/TCP)
|
||||
"""
|
||||
try:
|
||||
Socket.setdefaulttimeout(timeout)
|
||||
Socket.socket(Socket.AF_INET, Socket.SOCK_STREAM).connect((host, port))
|
||||
return True
|
||||
except Socket.error as ex:
|
||||
return False
|
||||
|
||||
|
||||
def get_ip():
|
||||
"""
|
||||
Returns the IP address of this device.
|
||||
"""
|
||||
soc = Socket.socket(Socket.AF_INET, Socket.SOCK_DGRAM)
|
||||
try:
|
||||
soc.connect(("8.8.8.8", 80))
|
||||
except OSError:
|
||||
return None
|
||||
ip_address = str(soc.getsockname()[0])
|
||||
soc.close()
|
||||
|
||||
return ip_address
|
||||
@@ -0,0 +1,206 @@
|
||||
import re
|
||||
|
||||
from swingmusic.config import UserConfig
|
||||
from swingmusic.enums.album_versions import AlbumVersionEnum, get_all_keywords
|
||||
|
||||
|
||||
def split_artists(src: str, config: UserConfig):
|
||||
"""
|
||||
Splits a string of artists into a list of artists, preserving those in ignoreList.
|
||||
Case-insensitive matching is used for the ignoreList.
|
||||
"""
|
||||
result = []
|
||||
current = ""
|
||||
i = 0
|
||||
|
||||
while i < len(src):
|
||||
# Check if any ignored artist starts at this position (case-insensitive)
|
||||
ignored_match = next(
|
||||
(
|
||||
src[i : i + len(ignored)]
|
||||
for ignored in config.artistSplitIgnoreList
|
||||
if src.lower().startswith(ignored.lower(), i)
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
if ignored_match:
|
||||
# If we have accumulated any current string, add it to result
|
||||
if current.strip():
|
||||
result.extend([a.strip() for a in current.split(",") if a.strip()])
|
||||
current = ""
|
||||
# Add the ignored artist to the result (preserving original case)
|
||||
result.append(ignored_match)
|
||||
# Move past the ignored artist
|
||||
i += len(ignored_match)
|
||||
elif src[i] in config.artistSeparators:
|
||||
# If we encounter a separator, process the current string
|
||||
if current.strip():
|
||||
result.extend([a.strip() for a in current.split(",") if a.strip()])
|
||||
current = ""
|
||||
i += 1
|
||||
else:
|
||||
# If it's not an ignored artist or a separator, add to current
|
||||
current += src[i]
|
||||
i += 1
|
||||
|
||||
# Process any remaining current string
|
||||
if current.strip():
|
||||
result.extend([a.strip() for a in current.split(",") if a.strip()])
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def remove_prod(title: str) -> str:
|
||||
"""
|
||||
Removes the producer string in a track title using regex.
|
||||
"""
|
||||
|
||||
# check if title contain title, if not return it.
|
||||
if not ("prod." in title.lower()):
|
||||
return title
|
||||
|
||||
# check if title has brackets
|
||||
if re.search(r"[()\[\]]", title):
|
||||
regex = r"\s?(\(|\[)prod\..*?(\)|\])\s?"
|
||||
else:
|
||||
regex = r"\s?\bprod\.\s*\S+"
|
||||
|
||||
# remove the producer string
|
||||
title = re.sub(regex, "", title, flags=re.IGNORECASE)
|
||||
return title.strip()
|
||||
|
||||
|
||||
def parse_feat_from_title(title: str, config: UserConfig) -> tuple[list[str], str]:
|
||||
"""
|
||||
Extracts featured artists from a song title using regex.
|
||||
"""
|
||||
regex = r"\((?:feat|ft|featuring|with)\.?\s+(.+?)\)"
|
||||
# regex for square brackets 👇
|
||||
sqr_regex = r"\[(?:feat|ft|featuring|with)\.?\s+(.+?)\]"
|
||||
|
||||
match = re.search(regex, title, re.IGNORECASE)
|
||||
|
||||
if not match:
|
||||
match = re.search(sqr_regex, title, re.IGNORECASE)
|
||||
regex = sqr_regex
|
||||
|
||||
if not match:
|
||||
return [], title
|
||||
|
||||
artists = match.group(1)
|
||||
artists = split_artists(artists, config)
|
||||
|
||||
# remove "feat" group from title
|
||||
new_title = re.sub(regex, "", title, flags=re.IGNORECASE)
|
||||
return artists, new_title
|
||||
|
||||
|
||||
def get_base_album_title(string: str) -> tuple[str, str | None]:
|
||||
"""
|
||||
Extracts the base album title from a string.
|
||||
"""
|
||||
pattern = re.compile(
|
||||
rf"\s*(\(|\[)[^\)\]]*?({get_all_keywords()})[^\)\]]*?(\)|\])$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# TODO: Fix "Redundant character escape '\]' in RegExp "
|
||||
match = pattern.search(string)
|
||||
|
||||
if match:
|
||||
removed_block = match.group(0)
|
||||
title = string.replace(removed_block, "")
|
||||
return title.strip(), removed_block.strip()
|
||||
|
||||
return string, None
|
||||
|
||||
|
||||
def get_anniversary(text: str) -> str | None:
|
||||
"""
|
||||
Extracts anniversary from text using regex.
|
||||
"""
|
||||
_end = "anniversary"
|
||||
match = re.search(r"\b\d+\w*(?= anniversary)", text, re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(0).strip().lower() + f" {_end}"
|
||||
else:
|
||||
return _end
|
||||
|
||||
|
||||
def get_album_info(bracket_text: str | None) -> list[str]:
|
||||
"""
|
||||
Extracts album version info from the bracketed text on an album title string using regex.
|
||||
"""
|
||||
if not bracket_text:
|
||||
return []
|
||||
|
||||
# replace all non-alphanumeric characters with an empty string
|
||||
bracket_text = re.sub(r"[^a-zA-Z0-9\s]", "", bracket_text)
|
||||
versions = []
|
||||
|
||||
for version_keywords in AlbumVersionEnum:
|
||||
for keyword in version_keywords.value:
|
||||
if re.search(keyword, bracket_text, re.IGNORECASE):
|
||||
versions.append(version_keywords.name.lower())
|
||||
break
|
||||
|
||||
if "anniversary" in versions:
|
||||
anniversary = get_anniversary(bracket_text)
|
||||
versions.insert(0, anniversary)
|
||||
versions.remove("anniversary")
|
||||
|
||||
return versions
|
||||
|
||||
|
||||
def get_base_title_and_versions(
|
||||
original_album_title: str, get_versions=True
|
||||
) -> tuple[str, list[str]]:
|
||||
"""
|
||||
Extracts the base album title and version info from an album title string using regex.
|
||||
"""
|
||||
album_title, version_block = get_base_album_title(original_album_title)
|
||||
|
||||
if version_block is None:
|
||||
return original_album_title, []
|
||||
|
||||
if not get_versions:
|
||||
return album_title, []
|
||||
|
||||
versions = get_album_info(version_block)
|
||||
|
||||
# if no version info could be extracted, accept defeat!
|
||||
if len(versions) == 0:
|
||||
album_title = original_album_title
|
||||
|
||||
return album_title, versions
|
||||
|
||||
|
||||
def remove_bracketed_remaster(text: str):
|
||||
"""
|
||||
Removes remaster info from a track title that contains brackets using regex.
|
||||
"""
|
||||
return re.sub(
|
||||
r"\s*[\\[(][^)\]]*remaster[^)\]]*[)\]]\s*", "", text, flags=re.IGNORECASE
|
||||
).strip()
|
||||
|
||||
|
||||
def remove_hyphen_remasters(text: str):
|
||||
"""
|
||||
Removes remaster info from a track title that contains a hypen (-) using regex.
|
||||
"""
|
||||
return re.sub(
|
||||
r"\s-\s*[^-]*\bremaster[^-]*\s*", "", text, flags=re.IGNORECASE
|
||||
).strip()
|
||||
|
||||
|
||||
def clean_title(title: str) -> str:
|
||||
"""
|
||||
Removes remaster info from a track title using regex.
|
||||
"""
|
||||
if "remaster" not in title.lower():
|
||||
return title
|
||||
|
||||
rem_1 = remove_bracketed_remaster(title)
|
||||
rem_2 = remove_hyphen_remasters(title)
|
||||
|
||||
return rem_1 if len(rem_2) > len(rem_1) else rem_2
|
||||
@@ -0,0 +1,31 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
from swingmusic.utils.filesystem import get_home_res_path
|
||||
|
||||
|
||||
def getFlaskOpenApiPath():
|
||||
"""
|
||||
Used to retrieve the path to the flask_openapi3 package
|
||||
|
||||
See: https://github.com/luolingchun/flask-openapi3/issues/147
|
||||
"""
|
||||
site_packages_path = [p for p in sys.path if "site-packages" in p][0]
|
||||
|
||||
return f"{site_packages_path}/flask_openapi3"
|
||||
|
||||
|
||||
def getClientFilesExtensions():
|
||||
"""
|
||||
Get all the file extensions for the client files
|
||||
"""
|
||||
|
||||
client_path = get_home_res_path("client")
|
||||
|
||||
extensions = set()
|
||||
for root, dirs, files in os.walk(client_path):
|
||||
for file in files:
|
||||
ext = file.split(".")[-1]
|
||||
extensions.add("." + ext)
|
||||
|
||||
return extensions
|
||||
@@ -0,0 +1,16 @@
|
||||
from tqdm import tqdm as _tqdm
|
||||
|
||||
|
||||
def tqdm(*args, **kwargs):
|
||||
"""
|
||||
Wrapper for tqdm that sets globals.
|
||||
"""
|
||||
bar_format = "{percentage:3.0f}%|{bar:45}|{n_fmt}/{total_fmt}{desc}"
|
||||
kwargs["bar_format"] = bar_format
|
||||
|
||||
if "desc" in kwargs:
|
||||
print(f'INFO|{kwargs["desc"].capitalize()} ...')
|
||||
kwargs["desc"] = ""
|
||||
|
||||
|
||||
return _tqdm(*args, **kwargs)
|
||||
@@ -0,0 +1,50 @@
|
||||
from collections import defaultdict
|
||||
from operator import attrgetter
|
||||
|
||||
from swingmusic.models import Track
|
||||
from swingmusic.utils.hashing import create_hash
|
||||
|
||||
|
||||
def remove_duplicates(tracks: list[Track], is_album_tracks=False) -> list[Track]:
|
||||
"""
|
||||
Remove duplicates from a list of Track objects based on the trackhash attribute.
|
||||
|
||||
Retain objects with the highest bitrate.
|
||||
"""
|
||||
tracks_dict = defaultdict(list)
|
||||
|
||||
# if is_album_tracks, sort by disc and track number
|
||||
if is_album_tracks:
|
||||
for t in tracks:
|
||||
# _pos is used for sorting tracks by disc and track number
|
||||
t._pos = int(f"{t.disc}{str(t.track).zfill(3)}")
|
||||
|
||||
# _ati is used to remove duplicates when merging album versions
|
||||
t._ati = f"{t._pos}{create_hash(t.title)}"
|
||||
|
||||
# create groups of tracks with the same _ati
|
||||
for track in tracks:
|
||||
tracks_dict[track._ati].append(track)
|
||||
|
||||
tracks = []
|
||||
|
||||
# pick the track with max bitrate for each group
|
||||
for track_group in tracks_dict.values():
|
||||
max_bitrate_track = max(track_group, key=attrgetter("bitrate"))
|
||||
tracks.append(max_bitrate_track)
|
||||
|
||||
return sorted(tracks, key=lambda t: t._pos)
|
||||
|
||||
# else, sort by trackhash
|
||||
for track in tracks:
|
||||
# create groups of tracks with the same trackhash
|
||||
tracks_dict[track.trackhash].append(track)
|
||||
|
||||
tracks = []
|
||||
|
||||
# pick the track with max bitrate for each trackhash group
|
||||
for track_group in tracks_dict.values():
|
||||
max_bitrate_track = max(track_group, key=attrgetter("bitrate"))
|
||||
tracks.append(max_bitrate_track)
|
||||
|
||||
return tracks
|
||||
@@ -0,0 +1,308 @@
|
||||
from collections import defaultdict
|
||||
import copy
|
||||
|
||||
from typing import Any, Callable, TypeVar, List
|
||||
from swingmusic.db.userdata import ScrobbleTable
|
||||
from swingmusic.models.stats import StatItem
|
||||
from swingmusic.models.track import Track
|
||||
from swingmusic.models.album import Album
|
||||
from swingmusic.store.albums import AlbumStore
|
||||
from swingmusic.store.tracks import TrackStore
|
||||
from swingmusic.utils.dates import seconds_to_time_string
|
||||
|
||||
|
||||
def get_artists_in_period(
|
||||
start_time: int | float, end_time: int | float, userid: int | None = None
|
||||
):
|
||||
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time, userid)
|
||||
artists: Any = defaultdict(
|
||||
lambda: {"playcount": 0, "playduration": 0, "tracks": {}}
|
||||
)
|
||||
|
||||
for scrobble in scrobbles:
|
||||
track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])
|
||||
if not track:
|
||||
continue
|
||||
|
||||
track = track[0]
|
||||
|
||||
for artist in track.artists:
|
||||
artisthash = artist["artisthash"]
|
||||
|
||||
artists[artisthash]["artist"] = artist["name"]
|
||||
artists[artisthash]["artisthash"] = artist["artisthash"]
|
||||
artists[artisthash]["playcount"] += 1
|
||||
artists[artisthash]["playduration"] += scrobble.duration
|
||||
|
||||
# index the track counts too
|
||||
artists[artisthash]["tracks"][track.trackhash] = (
|
||||
artists[artisthash]["tracks"].get(track.trackhash, 0) + 1
|
||||
)
|
||||
|
||||
artists = list(artists.values())
|
||||
return sorted(artists, key=lambda x: x["playduration"], reverse=True)
|
||||
|
||||
|
||||
def get_albums_in_period(start_time: int, end_time: int, userid: int | None = None):
|
||||
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time, userid)
|
||||
albums: dict[str, Album] = {}
|
||||
|
||||
for scrobble in scrobbles:
|
||||
track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])
|
||||
if not track:
|
||||
continue
|
||||
|
||||
track = track[0]
|
||||
album_entry = AlbumStore.albummap.get(track.albumhash)
|
||||
if not album_entry:
|
||||
continue
|
||||
album_entry = copy.deepcopy(album_entry)
|
||||
|
||||
albumhash = album_entry.album.albumhash
|
||||
if albumhash not in albums:
|
||||
albums[albumhash] = album_entry.album
|
||||
albums[albumhash].playcount = 0
|
||||
albums[albumhash].playduration = 0
|
||||
|
||||
albums[albumhash].playcount += 1
|
||||
albums[albumhash].playduration += scrobble.duration
|
||||
|
||||
return list(albums.values())
|
||||
|
||||
|
||||
def get_tracks_in_period(start_time: int, end_time: int, userid: int | None = None):
|
||||
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time, userid)
|
||||
tracks: dict[str, Track] = {}
|
||||
duration = 0
|
||||
|
||||
total = 0
|
||||
|
||||
for scrobble in scrobbles:
|
||||
total += 1
|
||||
if scrobble.trackhash not in tracks:
|
||||
try:
|
||||
track = copy.deepcopy(
|
||||
TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])[0]
|
||||
)
|
||||
except IndexError:
|
||||
continue
|
||||
|
||||
tracks[scrobble.trackhash] = track
|
||||
tracks[scrobble.trackhash].playcount = 0
|
||||
tracks[scrobble.trackhash].playduration = 0
|
||||
|
||||
tracks[scrobble.trackhash].playcount += 1
|
||||
tracks[scrobble.trackhash].playduration += scrobble.duration
|
||||
duration += scrobble.duration
|
||||
|
||||
return list(tracks.values()), total, duration
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def calculate_trend(
|
||||
item: T,
|
||||
current_items: List[T],
|
||||
previous_items: List[T],
|
||||
key_func: Callable[[T], Any],
|
||||
):
|
||||
"""
|
||||
Calculate the trend of an item based on its position in current and previous lists.
|
||||
|
||||
:param item: The item to calculate the trend for
|
||||
:param current_items: The current list of items
|
||||
:param previous_items: The previous list of items
|
||||
:param key_func: A function to extract the comparison key from an item
|
||||
:return: A dictionary containing:
|
||||
- The trend as a string: 'rising', 'falling', or 'stable'
|
||||
- A boolean flag indicating whether the item is new
|
||||
"""
|
||||
current_rank = next(
|
||||
(i for i, t in enumerate(current_items) if key_func(t) == key_func(item)), -1
|
||||
)
|
||||
previous_rank = next(
|
||||
(i for i, t in enumerate(previous_items) if key_func(t) == key_func(item)), -1
|
||||
)
|
||||
|
||||
is_new = previous_rank == -1
|
||||
|
||||
if is_new:
|
||||
return {"trend": "rising", "is_new": True}
|
||||
elif current_rank == -1:
|
||||
return {"trend": "falling", "is_new": False}
|
||||
elif current_rank < previous_rank:
|
||||
return {"trend": "rising", "is_new": False}
|
||||
elif current_rank > previous_rank:
|
||||
return {"trend": "falling", "is_new": False}
|
||||
else:
|
||||
return {"trend": "stable", "is_new": False}
|
||||
|
||||
|
||||
def calculate_album_trend(
|
||||
album_entry: Album, current_albums: List[Album], previous_albums: List[Album]
|
||||
):
|
||||
return calculate_trend(
|
||||
album_entry, current_albums, previous_albums, lambda a: a.albumhash
|
||||
)
|
||||
|
||||
|
||||
def calculate_artist_trend(
|
||||
artist: dict[str, Any],
|
||||
current_artists: List[dict[str, Any]],
|
||||
previous_artists: List[dict[str, Any]],
|
||||
):
|
||||
return calculate_trend(
|
||||
artist, current_artists, previous_artists, lambda a: a["artisthash"]
|
||||
)
|
||||
|
||||
|
||||
def calculate_track_trend(
|
||||
track: Track, current_tracks: List[Track], previous_tracks: List[Track]
|
||||
):
|
||||
return calculate_trend(
|
||||
track, current_tracks, previous_tracks, lambda t: t.trackhash
|
||||
)
|
||||
|
||||
|
||||
def calculate_scrobble_trend(current_scrobbles: int, previous_scrobbles: int) -> str:
|
||||
return (
|
||||
"rising"
|
||||
if current_scrobbles > previous_scrobbles
|
||||
else ("falling" if current_scrobbles < previous_scrobbles else "stable")
|
||||
)
|
||||
|
||||
|
||||
def calculate_new_artists(
|
||||
current_artists: List[dict[str, Any]], timestamp: int, userid: int | None = None
|
||||
):
|
||||
"""
|
||||
Calculate the number of new artists based on the current and all previous scrobbles.
|
||||
"""
|
||||
current_artists_set = set(artist["artisthash"] for artist in current_artists)
|
||||
all_records = ScrobbleTable.get_all_in_period(0, timestamp, userid)
|
||||
trackhashes = set(record.trackhash for record in all_records)
|
||||
|
||||
previous_artists_set = set()
|
||||
|
||||
for record in trackhashes:
|
||||
entry = TrackStore.trackhashmap.get(record)
|
||||
if not entry:
|
||||
continue
|
||||
|
||||
entry = entry.tracks[0]
|
||||
|
||||
for artist in entry.artists:
|
||||
artisthash = artist["artisthash"]
|
||||
previous_artists_set.add(artisthash)
|
||||
|
||||
return len(current_artists_set - previous_artists_set)
|
||||
|
||||
|
||||
def calculate_new_albums(current_albums: List[Album], previous_albums: List[Album]):
|
||||
current_albums_set = set(album.albumhash for album in current_albums)
|
||||
previous_albums_set = set(album.albumhash for album in previous_albums)
|
||||
|
||||
return len(current_albums_set - previous_albums_set)
|
||||
|
||||
|
||||
def get_track_group_stats(tracks: list[Track], is_album: bool = False):
|
||||
if len(tracks) == 0:
|
||||
return []
|
||||
|
||||
played_tracks = [track for track in tracks if track.playcount > 0]
|
||||
unplayed_count = len(tracks) - len(played_tracks)
|
||||
|
||||
played_stat = StatItem(
|
||||
"played",
|
||||
f"never played",
|
||||
f"{unplayed_count}/{len(tracks)} tracks",
|
||||
)
|
||||
|
||||
play_duration = sum(track.playduration for track in played_tracks)
|
||||
play_duration_stat = StatItem(
|
||||
"play_duration",
|
||||
"listened all time",
|
||||
f"{seconds_to_time_string(play_duration)}",
|
||||
)
|
||||
|
||||
try:
|
||||
top_track = max(played_tracks, key=lambda x: x.playduration)
|
||||
except ValueError:
|
||||
top_track = None
|
||||
|
||||
top_track_stat = (
|
||||
StatItem(
|
||||
"toptrack",
|
||||
f"top track ({seconds_to_time_string(top_track.playduration)} listened)",
|
||||
f"{top_track.title}",
|
||||
top_track.image if top_track else None,
|
||||
)
|
||||
if top_track
|
||||
else StatItem(
|
||||
"toptrack",
|
||||
"top track",
|
||||
"—",
|
||||
)
|
||||
)
|
||||
|
||||
albums_map = {}
|
||||
|
||||
for track in tracks:
|
||||
if track.albumhash not in albums_map:
|
||||
albums_map[track.albumhash] = {
|
||||
"playcount": 0,
|
||||
"playduration": 0,
|
||||
"title": track.album,
|
||||
"image": track.image if track.image else None,
|
||||
}
|
||||
|
||||
albums_map[track.albumhash]["playcount"] += 1
|
||||
albums_map[track.albumhash]["playduration"] += track.playduration
|
||||
|
||||
stats = [play_duration_stat, played_stat, top_track_stat]
|
||||
if not is_album:
|
||||
albums = list(albums_map.values())
|
||||
albums.sort(key=lambda x: x["playduration"], reverse=True)
|
||||
|
||||
top_album = albums[0] if albums[0]["playduration"] else None
|
||||
top_album_stat = (
|
||||
StatItem(
|
||||
"topalbum",
|
||||
f"top album ({seconds_to_time_string(top_album['playduration'])} listened)",
|
||||
f"{top_album['title']}",
|
||||
top_album["image"])
|
||||
if top_album
|
||||
else StatItem(
|
||||
"topalbum",
|
||||
"top album",
|
||||
"—",
|
||||
)
|
||||
)
|
||||
|
||||
stats.append(top_album_stat)
|
||||
|
||||
if is_album:
|
||||
tracktotal: int = max(
|
||||
int(track.extra.get("track_total", 0) or 0) for track in tracks
|
||||
)
|
||||
percentage = (len(tracks) / tracktotal) * 100 if tracktotal > 0 else 101
|
||||
completedness = int(percentage) if percentage <= 100 else "?"
|
||||
|
||||
completeness_stat = (
|
||||
StatItem(
|
||||
"completeness",
|
||||
f"{len(tracks)}/{tracktotal} tracks available",
|
||||
f"{completedness}% complete",
|
||||
)
|
||||
if tracktotal
|
||||
else StatItem(
|
||||
"completeness",
|
||||
f"{len(tracks)}/? tracks available",
|
||||
"?",
|
||||
)
|
||||
)
|
||||
|
||||
stats.append(completeness_stat)
|
||||
|
||||
return stats
|
||||
@@ -0,0 +1,42 @@
|
||||
import threading
|
||||
from multiprocessing import Process, Pipe
|
||||
|
||||
|
||||
def background(func):
|
||||
"""
|
||||
Runs the decorated function in a background thread.
|
||||
"""
|
||||
|
||||
def background_func(*a, **kw):
|
||||
threading.Thread(target=func, args=a, kwargs=kw).start()
|
||||
|
||||
return background_func
|
||||
|
||||
|
||||
|
||||
class ProcessWithReturnValue(Process):
|
||||
"""
|
||||
A process class that returns a value on join.
|
||||
Uses a pipe to communicate the return value back to the parent process.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None
|
||||
):
|
||||
Process.__init__(self, group=group, target=target, name=name, args=args, kwargs=kwargs)
|
||||
self._parent_conn, self._child_conn = Pipe()
|
||||
self._target = target
|
||||
self._args = args
|
||||
self._kwargs = kwargs
|
||||
|
||||
def run(self):
|
||||
if self._target is not None:
|
||||
result = self._target(*self._args, **self._kwargs)
|
||||
self._child_conn.send(result)
|
||||
self._child_conn.close()
|
||||
|
||||
def join(self, *args):
|
||||
Process.join(self, *args)
|
||||
if self._parent_conn.poll():
|
||||
return self._parent_conn.recv()
|
||||
return None
|
||||
@@ -0,0 +1,5 @@
|
||||
def handle_unicode(string: str):
|
||||
"""
|
||||
Handles Unicode errors by ignoring unicode characters
|
||||
"""
|
||||
return string.encode("utf-16", "ignore").decode("utf-16")
|
||||
@@ -0,0 +1,18 @@
|
||||
import platform
|
||||
|
||||
IS_WIN = platform.system() == "Windows"
|
||||
|
||||
|
||||
# TODO: Check is_windows on app start in settings.py
|
||||
def is_windows():
|
||||
"""
|
||||
Returns True if the OS is Windows.
|
||||
"""
|
||||
return IS_WIN
|
||||
|
||||
|
||||
def win_replace_slash(path: str):
|
||||
if is_windows():
|
||||
return path.replace("\\", "/").replace("//", "/")
|
||||
|
||||
return path
|
||||
@@ -0,0 +1,24 @@
|
||||
import os
|
||||
|
||||
|
||||
def get_xdg_config_dir() -> str:
|
||||
"""
|
||||
Returns the XDG_CONFIG_HOME environment variable if it exists, otherwise
|
||||
returns the default config directory. If none of those exist, returns the
|
||||
user's home directory.
|
||||
"""
|
||||
xdg_config_home = os.environ.get("XDG_CONFIG_HOME")
|
||||
|
||||
if xdg_config_home:
|
||||
return xdg_config_home
|
||||
|
||||
try:
|
||||
alt_dir = os.path.join(os.environ.get("HOME"), ".config")
|
||||
|
||||
if os.path.exists(alt_dir):
|
||||
return alt_dir
|
||||
except TypeError:
|
||||
return os.path.expanduser("~")
|
||||
|
||||
# Fallback to current directory
|
||||
return os.path.abspath(".")
|
||||
Reference in New Issue
Block a user