first commit

This commit is contained in:
Tomas Dvorak
2026-04-13 17:46:58 +02:00
commit 6e8fedf534
234 changed files with 53808 additions and 0 deletions
+51
View File
@@ -0,0 +1,51 @@
import locale
import re
from collections.abc import Iterable
from typing import TypeVar
T = TypeVar("T")
# Set to user's default locale:
locale.setlocale(locale.LC_ALL, "")
# Or set to a specific locale:
# locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
def format_number(number: float) -> str:
return locale.format_string("%d", number, grouping=True)
def flatten(list_: Iterable[list[T]]) -> list[T]:
"""
Flattens a list of lists into a single list.
"""
return [item for sublist in list_ for item in sublist]
def create_valid_filename(filename: str) -> str:
"""
Create a valid filename by removing invalid characters.
"""
# Remove invalid characters for filenames
invalid_chars = r'[<>:"/\\|?*]'
filename = re.sub(invalid_chars, "_", filename)
# Remove leading/trailing spaces and dots
filename = filename.strip(" .")
# Ensure filename is not empty
if not filename:
filename = "unnamed"
return filename
class classproperty(property):
"""
A class property decorator.
"""
def __get__(self, owner_self, owner_cls):
if self.fget:
return self.fget(owner_cls)
+58
View File
@@ -0,0 +1,58 @@
import hashlib
import hmac
import os
from flask import has_app_context, has_request_context
from flask_jwt_extended import current_user
from swingmusic.config import UserConfig
from swingmusic.logger import log
def hash_password(password: str) -> str:
"""
Hashes the given password using sha256 algorithm and the user id as salt.
:param password: The password to hash.
:return: The hashed password.
"""
return hashlib.pbkdf2_hmac(
"sha256",
password.encode("utf-8"),
UserConfig().serverId.encode("utf-8"),
100000,
).hex()
def check_password(password: str, hashed: str) -> bool:
"""
This function checks if the given password matches the hashed password.
:param password: The password to check.
:param hashed: The hashed password.
:return: Whether the password matches.
"""
return hmac.compare_digest(hash_password(password), hashed)
def get_current_userid() -> int:
"""
Get the current session user.
"""
fallback_userid = int(os.getenv("SWINGMUSIC_DEFAULT_USER_ID", "1"))
# Background workers and startup code can run outside Flask contexts.
# In those paths, we intentionally use a deterministic fallback user id.
if not has_app_context() or not has_request_context():
return fallback_userid
try:
return int(current_user["id"])
except Exception as e:
if log:
log.error("get_current_userid: Unable to resolve request user id")
log.error(e)
return fallback_userid
+46
View File
@@ -0,0 +1,46 @@
from typing import TypeVar
T = TypeVar("T")
def use_bisection(
source: list[T], key: str, queries: list[str], limit: int = -1
) -> list[T | None]:
"""
Uses bisection to find a list of items in another list.
Returns a list of found items with `None` items being not found items.
"""
def find(query: str):
left = 0
right = len(source) - 1
while left <= right:
mid = (left + right) // 2
if source[mid].__getattribute__(key) == query:
return source[mid]
elif source[mid].__getattribute__(key) > query:
right = mid - 1
else:
left = mid + 1
return None
if len(source) == 0:
return []
results = []
for query in queries:
res = find(query)
if res is None:
continue
results.append(res)
if limit != -1 and len(results) >= limit:
break
return results
+19
View File
@@ -0,0 +1,19 @@
from collections.abc import Iterator
class CustomList(list):
"""
A custom list implementation with hooks for future shared memory support.
This list can be used as a drop-in replacement for standard lists.
Future enhancement: implement SharedMemoryList for inter-process
communication without serialization overhead.
"""
def __getitem__(self, index):
# Hook for shared memory operations
return super().__getitem__(index)
def __iter__(self) -> Iterator:
# Hook for shared memory operations
return super().__iter__()
+135
View File
@@ -0,0 +1,135 @@
from datetime import datetime, timedelta
import pendulum
_format = "%Y-%m-%d %H:%M:%S"
def timestamp_from_days_ago(days_ago: int):
"""
Returns a timestamp from a number of days ago.
"""
current_datetime = datetime.now()
delta = timedelta(days=days_ago)
past_timestamp = current_datetime - delta
return int(past_timestamp.timestamp())
def create_new_date(date: datetime | None = None) -> str:
"""
Creates a new date and time string in the format of "YYYY-MM-DD HH:MM:SS"
:return: A string of the current date and time.
"""
if not date:
date = datetime.now()
return date.strftime(_format)
def timestamp_to_time_passed(timestamp: str | int | float):
"""
Converts a timestamp to time passed. e.g. 2 minutes ago, 1 hour ago, yesterday, 2 days ago, 2 weeks ago, etc.
"""
now = datetime.now().timestamp()
then = datetime.fromtimestamp(int(timestamp)).timestamp()
diff = now - then
now = pendulum.now()
return now.subtract(seconds=diff).diff_for_humans()
def date_string_to_time_passed(prev_date: str) -> str:
"""
Converts a date string to time passed. e.g. 2 minutes ago, 1 hour ago, yesterday, 2 days ago, 2 weeks ago, etc.
"""
then = datetime.strptime(prev_date, _format).timestamp()
return timestamp_to_time_passed(then)
def seconds_to_time_string(seconds: int):
"""
Converts seconds to a time string. e.g. 1 hour 2 minutes, 1 hour 2 seconds, 1 hour, 1 minute 2 seconds, etc.
"""
hours = seconds // 3600
minutes = (seconds % 3600) // 60
remaining_seconds = seconds % 60
if hours > 0:
if minutes > 0:
return f"{hours} hr{'s' if hours > 1 else ''}, {minutes} min{'s' if minutes > 1 else ''}"
return f"{hours} hr{'s' if hours > 1 else ''}"
if minutes > 0:
return f"{minutes} min{'s' if minutes > 1 else ''}"
return f"{remaining_seconds} sec"
def get_date_range(duration: str, units_ago: int = 0):
"""
Returns a tuple of dates representing the start and end of a given duration.
"""
date_range = None
seconds_ago = 0
if duration != "alltime":
seconds_ago = (
pendulum.now() - pendulum.now().subtract().start_of(duration)
).total_seconds() * units_ago
match duration:
case "day" | "week" | "month" | "year":
date_range = (
pendulum.now()
.subtract(seconds=seconds_ago)
.start_of(duration)
.timestamp(),
pendulum.now()
# .end_of(duration)
.timestamp(),
)
case "alltime":
date_range = (0, pendulum.now().timestamp())
case _:
raise ValueError(f"Invalid duration: {duration}")
return (int(date_range[0]), int(date_range[1]))
def get_duration_ago(duration: str, units_ago: int = 1) -> int:
"""
Returns the start of the last duration.
"""
seconds_in_day = 24 * 60 * 60
now = pendulum.now()
match duration:
case "day":
return int(now.subtract(seconds=seconds_in_day * units_ago).timestamp())
case "week":
return int(now.subtract(seconds=seconds_in_day * 7 * units_ago).timestamp())
case "month":
return int(
now.subtract(seconds=seconds_in_day * 30 * units_ago).timestamp()
)
case "year":
return int(
now.subtract(seconds=seconds_in_day * 365 * units_ago).timestamp()
)
case _:
raise ValueError(f"Invalid duration: {duration}")
def get_duration_in_seconds(duration: str) -> int:
"""
Returns the number of seconds in a given duration.
"""
match duration:
case "week" | "month" | "year":
return int(pendulum.now().subtract().start_of(duration).timestamp())
case "alltime":
return int(pendulum.now().timestamp())
raise ValueError(f"Invalid duration: {duration}")
+11
View File
@@ -0,0 +1,11 @@
def coroutine(func):
"""
Decorator: primes `func` by advancing to first `yield`
"""
def start(*args, **kwargs):
cr = func(*args, **kwargs)
next(cr)
return cr
return start
+14
View File
@@ -0,0 +1,14 @@
import mimetypes
def guess_mime_type(filename: str):
"""
Guess the mime type of a file.
"""
type = mimetypes.guess_type(filename)[0]
if type is None:
ext = filename.rsplit(".", maxsplit=1)[-1]
return f"audio/{ext}"
return type
+87
View File
@@ -0,0 +1,87 @@
import os
from pathlib import Path
FILES = ["flac", "mp3", "wav", "m4a", "ogg", "wma", "opus", "alac", "aiff"]
SUPPORTED_FILES = tuple(f".{file}" for file in FILES)
# INFO: Skip these paths when scanning
# These are common directories that don't contain music files
IGNORE_PATH_ENDSWITH = {
"node_modules",
"site-packages",
"postgres",
"__pycache__",
"/src",
"/learnrs",
"/venv",
"/code",
"/dist",
"/demos",
"/temp",
}
IGNORE_PATH_CONTAINS = {
"Photos Library",
}
def run_fast_scandir(path: str, full=False) -> tuple[list[str], list[str]]:
"""
Scans a directory for files with a specific extension.
Returns a list of files and folders in the directory.
Note: Symlinks are followed but circular links are prevented by the
path resolution in Path.resolve() which detects and handles cycles.
:param path: folder to scan
:param full: will call recursively until end of path.
:return: (folder:[], files:[])
"""
# filter out unwanted known folders
if isinstance(path, str) and path == "":
return [], []
path: Path = Path(path).resolve()
if any(
path.as_posix().endswith(ignore_path) for ignore_path in IGNORE_PATH_ENDSWITH
):
return [], []
if any(ignore_path in path.as_posix() for ignore_path in IGNORE_PATH_CONTAINS):
return [], []
# if on mac, ignore Library folder and its children
if os.name == "posix":
library_path = (Path.home() / "Library").resolve()
if path == library_path or str(path).startswith(str(library_path)):
return [], []
subfolders = []
files = []
try:
for entry in path.iterdir():
if entry.is_dir():
if entry.name.startswith(".") or entry.name.startswith("$"):
continue # filter out system / hidden files
else:
subfolders.append(entry)
if entry.is_file():
ext = entry.suffix.lower()
if ext in SUPPORTED_FILES:
files.append(entry.as_posix())
if full or len(files) == 0:
for folder in subfolders:
sub_dirs, subfiles = run_fast_scandir(folder, full=True)
subfolders.extend(sub_dirs)
files.extend(subfiles)
except (OSError, PermissionError, FileNotFoundError, ValueError):
return [], []
return subfolders, files
+9
View File
@@ -0,0 +1,9 @@
import random
import string
def get_random_str(length=5):
"""
Generates a random string of length `length`.
"""
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
+42
View File
@@ -0,0 +1,42 @@
import xxhash
from unidecode import unidecode
def create_hash(*args: str, decode=False, limit=10) -> str:
"""
This function creates a case-insensitive, non-alphanumeric chars ignoring hash from the given arguments.
Example use case:
- Creating computable IDs for duplicate artists. eg. Juice WRLD and Juice Wrld should have the same ID.
:param args: The arguments to hash.
:param decode: Whether to decode the arguments before hashing.
:param limit: The number of characters to return.
:return: The hash.
"""
def remove_non_alnum(token: str) -> str:
token = token.lower().strip().replace(" ", "")
t = "".join(t for t in token if t.isalnum())
if t == "":
return token
return t
str_ = "".join(remove_non_alnum(t) for t in args)
if decode:
str_ = unidecode(str_)
str_ = str_.encode("utf-8")
return xxhash.xxh3_64(str_).hexdigest()
# str_ = hashlib.sha1(str_).hexdigest()
# INFO: Return first 5 + last 5 characters
# return (
# str_[: limit // 2] + str_[-limit // 2 :]
# if limit % 2 == 0
# else str_[: limit // 2] + str_[-limit // 2 - 1 :]
# )
+111
View File
@@ -0,0 +1,111 @@
from collections import Counter
from swingmusic.models.track import Track
def violates_gap_rule(
balanced_mix: dict[int, Track], position: int, track: Track, gap: int = 3
) -> bool:
"""
Check if placing the track at the given position violates the gap rule.
The gap rule is violated if the track has an artist in common with any
track within the gap range (default = 3).
"""
track_artists = {artist["artisthash"] for artist in track.artists}
for i in range(max(0, position - gap), position):
if i in balanced_mix:
existing_artists = {
artist["artisthash"] for artist in balanced_mix[i].artists
}
if track_artists.intersection(existing_artists):
return True
return False
def find_next_position(
balanced_mix: dict[int, Track], start: int, track: Track, total_tracks: int
) -> int:
"""
Find the next available position for the track, starting from 'start' and wrapping around.
"""
for i in range(start, total_tracks):
if i not in balanced_mix and not violates_gap_rule(balanced_mix, i, track):
return i
for i in range(start):
if i not in balanced_mix and not violates_gap_rule(balanced_mix, i, track):
return i
return start # If no better position is found, return the original position
def is_tracklist_balanced(tracks: list[Track], gap: int = 3) -> tuple[bool, bool]:
"""
Checks if a tracklist is balanced or can be balanced.
Args:
- tracks: List of Track objects
- gap: Minimum number of tracks between songs by the same artist (default 3)
Returns:
- A tuple (can_be_balanced, is_currently_balanced)
"""
total_tracks = len(tracks)
# Count tracks per artist (considering only the first artist)
artist_counts = Counter(track.artists[0]["artisthash"] for track in tracks)
# Calculate the maximum number of tracks an artist can have in a balanced list
max_tracks_per_artist = (total_tracks + gap) // (gap + 1)
# Check if it's mathematically possible to balance the tracklist
can_be_balanced = all(
count <= max_tracks_per_artist for count in artist_counts.values()
)
if not can_be_balanced:
return False, False
# Check if the current arrangement is balanced
is_currently_balanced = True
artist_last_positions = {}
for i, track in enumerate(tracks):
artist = track.artists[0]["artisthash"]
if artist in artist_last_positions:
if i - artist_last_positions[artist] <= gap:
is_currently_balanced = False
break
artist_last_positions[artist] = i
return can_be_balanced, is_currently_balanced
def balance_mix(tracks: list[Track]) -> list[Track]:
"""
Balances the mix by ensuring that the tracks in a mix are distributed evenly.
Preserves the overall rating order of tracks while minimizing disruption.
Tracks that need to be moved are moved down the tracklist until they no longer
violate the gap rule.
"""
can_be_balanced, is_balanced = is_tracklist_balanced(tracks)
if is_balanced:
# Already balanced, no need to modify
return tracks
# Proceed with best-effort balancing
balanced_mix: dict[int, Track] = {}
total_tracks = len(tracks)
for i, track in enumerate(tracks):
if i in balanced_mix or not violates_gap_rule(balanced_mix, i, track):
balanced_mix[i] = track
else:
new_position = find_next_position(balanced_mix, i, track, total_tracks)
balanced_mix[new_position] = track
# Convert the dictionary back to a list, preserving the new order
return [balanced_mix[i] for i in sorted(balanced_mix.keys())]
+160
View File
@@ -0,0 +1,160 @@
import random
import socket as Socket
import time
from io import BytesIO
import requests
from PIL import Image, UnidentifiedImageError
from requests.exceptions import ConnectionError, ReadTimeout, Timeout
# User agents for rotation to avoid rate limiting
DEFAULT_USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1",
]
def has_connection(host="google.it", port=80, timeout=3):
"""
# REVIEW Was:
Host: 8.8.8.8 (google-public-dns-a.google.com)
OpenPort: 53/tcp
Service: domain (DNS/TCP)
"""
try:
Socket.setdefaulttimeout(timeout)
Socket.socket(Socket.AF_INET, Socket.SOCK_STREAM).connect((host, port))
return True
except OSError:
return False
def get_ip():
"""
Get the IP address of the current system.
Will return address of default outgoing chanel.
"""
soc = Socket.socket(Socket.AF_INET, Socket.SOCK_DGRAM)
try:
soc.connect(("8.8.8.8", 80))
except OSError:
return None
ip_address = str(soc.getsockname()[0])
soc.close()
return ip_address
def download_file(
url: str,
timeout: int = 10,
max_retries: int = 2,
retry_delay: int = 10,
headers: dict | None = None,
) -> bytes | None:
"""
Downloads a file from a URL with retry logic.
:param url: URL to download from
:param timeout: Request timeout in seconds
:param max_retries: Maximum number of retry attempts
:param retry_delay: Delay between retries in seconds
:param headers: Optional headers to include in the request
:return: File content as bytes, or None if download failed
"""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=timeout, headers=headers)
response.raise_for_status()
return response.content
except (ConnectionError, Timeout, ReadTimeout):
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
return None
except requests.HTTPError:
return None
return None
def download_image(
url: str,
timeout: int = 10,
max_retries: int = 2,
retry_delay: int = 10,
headers: dict | None = None,
) -> Image.Image | None:
"""
Downloads an image from a URL and returns a PIL Image object.
:param url: URL to download image from
:param timeout: Request timeout in seconds
:param max_retries: Maximum number of retry attempts
:param retry_delay: Delay between retries in seconds
:param headers: Optional headers to include in the request
:return: PIL Image object, or None if download failed
"""
content = download_file(url, timeout, max_retries, retry_delay, headers)
if content is None:
return None
try:
return Image.open(BytesIO(content))
except UnidentifiedImageError:
return None
def make_json_request(
url: str,
timeout: int = 30,
max_retries: int = 5,
retry_delay: int = 10,
headers: dict | None = None,
params: dict | None = None,
) -> dict | None:
"""
Makes a GET request expecting JSON response with retry logic.
:param url: URL to request
:param timeout: Request timeout in seconds
:param max_retries: Maximum number of retry attempts
:param retry_delay: Delay between retries in seconds
:param headers: Optional headers to include in the request
:param params: Optional query parameters
:return: JSON response as dict, or None if request failed
"""
for attempt in range(max_retries):
try:
response = requests.get(
url, timeout=timeout, headers=headers, params=params
)
response.raise_for_status()
return response.json()
except (ConnectionError, Timeout, ReadTimeout):
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
return None
except requests.JSONDecodeError:
return None
except requests.HTTPError:
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
return None
return None
def get_random_user_agent() -> str:
"""
Returns a random user agent string for web requests.
:return: Random user agent string
"""
return random.choice(DEFAULT_USER_AGENTS)
+207
View File
@@ -0,0 +1,207 @@
import re
from swingmusic.config import UserConfig
from swingmusic.enums.album_versions import AlbumVersionEnum, get_all_keywords
def split_artists(src: str, config: UserConfig):
"""
Splits a string of artists into a list of artists, preserving those in ignoreList.
Case-insensitive matching is used for the ignoreList.
"""
result = []
current = ""
i = 0
while i < len(src):
# Check if any ignored artist starts at this position (case-insensitive)
ignored_match = next(
(
src[i : i + len(ignored)]
for ignored in config.artistSplitIgnoreList
if src.lower().startswith(ignored.lower(), i)
),
None,
)
if ignored_match:
# If we have accumulated any current string, add it to result
if current.strip():
result.extend([a.strip() for a in current.split(",") if a.strip()])
current = ""
# Add the ignored artist to the result (preserving original case)
result.append(ignored_match)
# Move past the ignored artist
i += len(ignored_match)
elif src[i] in config.artistSeparators:
# If we encounter a separator, process the current string
if current.strip():
result.extend([a.strip() for a in current.split(",") if a.strip()])
current = ""
i += 1
else:
# If it's not an ignored artist or a separator, add to current
current += src[i]
i += 1
# Process any remaining current string
if current.strip():
result.extend([a.strip() for a in current.split(",") if a.strip()])
return result
def remove_prod(title: str) -> str:
"""
Removes the producer string in a track title using regex.
"""
# check if title contain title, if not return it.
if "prod." not in title.lower():
return title
# check if title has brackets
if re.search(r"[()\[\]]", title):
regex = r"\s?(\(|\[)prod\..*?(\)|\])\s?"
else:
regex = r"\s?\bprod\.\s*\S+"
# remove the producer string
title = re.sub(regex, "", title, flags=re.IGNORECASE)
return title.strip()
def parse_feat_from_title(title: str, config: UserConfig) -> tuple[list[str], str]:
"""
Extracts featured artists from a song title using regex.
"""
regex = r"\((?:feat|ft|featuring|with)\.?\s+(.+?)\)"
# regex for square brackets 👇
sqr_regex = r"\[(?:feat|ft|featuring|with)\.?\s+(.+?)\]"
match = re.search(regex, title, re.IGNORECASE)
if not match:
match = re.search(sqr_regex, title, re.IGNORECASE)
regex = sqr_regex
if not match:
return [], title
artists = match.group(1)
artists = split_artists(artists, config)
# remove "feat" group from title
new_title = re.sub(regex, "", title, flags=re.IGNORECASE)
return artists, new_title
def get_base_album_title(string: str) -> tuple[str, str | None]:
"""
Extracts the base album title from a string.
"""
pattern = re.compile(
rf"\s*(\(|\[)[^)\]]*?({get_all_keywords()})[^)\]]*?(\)|\])$",
re.IGNORECASE,
)
# Note: Inside character class, ] doesn't need escaping when it's the first character
# or when preceded by another character. This is standard regex behavior.
match = pattern.search(string)
if match:
removed_block = match.group(0)
title = string.replace(removed_block, "")
return title.strip(), removed_block.strip()
return string, None
def get_anniversary(text: str) -> str | None:
"""
Extracts anniversary from text using regex.
"""
_end = "anniversary"
match = re.search(r"\b\d+\w*(?= anniversary)", text, re.IGNORECASE)
if match:
return match.group(0).strip().lower() + f" {_end}"
else:
return _end
def get_album_info(bracket_text: str | None) -> list[str]:
"""
Extracts album version info from the bracketed text on an album title string using regex.
"""
if not bracket_text:
return []
# replace all non-alphanumeric characters with an empty string
bracket_text = re.sub(r"[^a-zA-Z0-9\s]", "", bracket_text)
versions = []
for version_keywords in AlbumVersionEnum:
for keyword in version_keywords.value:
if re.search(keyword, bracket_text, re.IGNORECASE):
versions.append(version_keywords.name.lower())
break
if "anniversary" in versions:
anniversary = get_anniversary(bracket_text)
versions.insert(0, anniversary)
versions.remove("anniversary")
return versions
def get_base_title_and_versions(
original_album_title: str, get_versions=True
) -> tuple[str, list[str]]:
"""
Extracts the base album title and version info from an album title string using regex.
"""
album_title, version_block = get_base_album_title(original_album_title)
if version_block is None:
return original_album_title, []
if not get_versions:
return album_title, []
versions = get_album_info(version_block)
# if no version info could be extracted, accept defeat!
if len(versions) == 0:
album_title = original_album_title
return album_title, versions
def remove_bracketed_remaster(text: str):
"""
Removes remaster info from a track title that contains brackets using regex.
"""
return re.sub(
r"\s*[\\[(][^)\]]*remaster[^)\]]*[)\]]\s*", "", text, flags=re.IGNORECASE
).strip()
def remove_hyphen_remasters(text: str):
"""
Removes remaster info from a track title that contains a hypen (-) using regex.
"""
return re.sub(
r"\s-\s*[^-]*\bremaster[^-]*\s*", "", text, flags=re.IGNORECASE
).strip()
def clean_title(title: str) -> str:
"""
Removes remaster info from a track title using regex.
"""
if "remaster" not in title.lower():
return title
rem_1 = remove_bracketed_remaster(title)
rem_2 = remove_hyphen_remasters(title)
return rem_1 if len(rem_2) > len(rem_1) else rem_2
+17
View File
@@ -0,0 +1,17 @@
import os
from swingmusic.settings import Paths
def get_client_files_extensions():
"""
Get all the file extensions for the client files
"""
extensions = set()
for _root, _dirs, files in os.walk(Paths().client_path):
for file in files:
ext = file.split(".")[-1]
extensions.add("." + ext)
return extensions
+15
View File
@@ -0,0 +1,15 @@
from tqdm import tqdm as _tqdm
def tqdm(*args, **kwargs):
"""
Wrapper for tqdm that sets globals.
"""
bar_format = "{percentage:3.0f}%|{bar:45}|{n_fmt}/{total_fmt}{desc}"
kwargs["bar_format"] = bar_format
if "desc" in kwargs:
print(f"INFO|{kwargs['desc'].capitalize()} ...")
kwargs["desc"] = ""
return _tqdm(*args, **kwargs)
+50
View File
@@ -0,0 +1,50 @@
from collections import defaultdict
from operator import attrgetter
from swingmusic.models import Track
from swingmusic.utils.hashing import create_hash
def remove_duplicates(tracks: list[Track], is_album_tracks=False) -> list[Track]:
"""
Remove duplicates from a list of Track objects based on the trackhash attribute.
Retain objects with the highest bitrate.
"""
tracks_dict = defaultdict(list)
# if is_album_tracks, sort by disc and track number
if is_album_tracks:
for t in tracks:
# _pos is used for sorting tracks by disc and track number
t._pos = int(f"{t.disc}{str(t.track).zfill(3)}")
# _ati is used to remove duplicates when merging album versions
t._ati = f"{t._pos}{create_hash(t.title)}"
# create groups of tracks with the same _ati
for track in tracks:
tracks_dict[track._ati].append(track)
tracks = []
# pick the track with max bitrate for each group
for track_group in tracks_dict.values():
max_bitrate_track = max(track_group, key=attrgetter("bitrate"))
tracks.append(max_bitrate_track)
return sorted(tracks, key=lambda t: t._pos)
# else, sort by trackhash
for track in tracks:
# create groups of tracks with the same trackhash
tracks_dict[track.trackhash].append(track)
tracks = []
# pick the track with max bitrate for each trackhash group
for track_group in tracks_dict.values():
max_bitrate_track = max(track_group, key=attrgetter("bitrate"))
tracks.append(max_bitrate_track)
return tracks
+310
View File
@@ -0,0 +1,310 @@
import copy
from collections import defaultdict
from collections.abc import Callable
from typing import Any, TypeVar
from swingmusic.db.userdata import ScrobbleTable
from swingmusic.models.album import Album
from swingmusic.models.stats import StatItem
from swingmusic.models.track import Track
from swingmusic.store.albums import AlbumStore
from swingmusic.store.tracks import TrackStore
from swingmusic.utils.dates import seconds_to_time_string
def get_artists_in_period(
start_time: int | float, end_time: int | float, userid: int | None = None
):
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time, userid)
artists: Any = defaultdict(
lambda: {"playcount": 0, "playduration": 0, "tracks": {}}
)
for scrobble in scrobbles:
track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])
if not track:
continue
track = track[0]
for artist in track.artists:
artisthash = artist["artisthash"]
artists[artisthash]["artist"] = artist["name"]
artists[artisthash]["artisthash"] = artist["artisthash"]
artists[artisthash]["playcount"] += 1
artists[artisthash]["playduration"] += scrobble.duration
# index the track counts too
artists[artisthash]["tracks"][track.trackhash] = (
artists[artisthash]["tracks"].get(track.trackhash, 0) + 1
)
artists = list(artists.values())
return sorted(artists, key=lambda x: x["playduration"], reverse=True)
def get_albums_in_period(start_time: int, end_time: int, userid: int | None = None):
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time, userid)
albums: dict[str, Album] = {}
for scrobble in scrobbles:
track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])
if not track:
continue
track = track[0]
album_entry = AlbumStore.albummap.get(track.albumhash)
if not album_entry:
continue
album_entry = copy.deepcopy(album_entry)
albumhash = album_entry.album.albumhash
if albumhash not in albums:
albums[albumhash] = album_entry.album
albums[albumhash].playcount = 0
albums[albumhash].playduration = 0
albums[albumhash].playcount += 1
albums[albumhash].playduration += scrobble.duration
return list(albums.values())
def get_tracks_in_period(start_time: int, end_time: int, userid: int | None = None):
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time, userid)
tracks: dict[str, Track] = {}
duration = 0
total = 0
for scrobble in scrobbles:
total += 1
if scrobble.trackhash not in tracks:
try:
track = copy.deepcopy(
TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])[0]
)
except IndexError:
continue
tracks[scrobble.trackhash] = track
tracks[scrobble.trackhash].playcount = 0
tracks[scrobble.trackhash].playduration = 0
tracks[scrobble.trackhash].playcount += 1
tracks[scrobble.trackhash].playduration += scrobble.duration
duration += scrobble.duration
return list(tracks.values()), total, duration
T = TypeVar("T")
def calculate_trend(
item: T,
current_items: list[T],
previous_items: list[T],
key_func: Callable[[T], Any],
):
"""
Calculate the trend of an item based on its position in current and previous lists.
:param item: The item to calculate the trend for
:param current_items: The current list of items
:param previous_items: The previous list of items
:param key_func: A function to extract the comparison key from an item
:return: A dictionary containing:
- The trend as a string: 'rising', 'falling', or 'stable'
- A boolean flag indicating whether the item is new
"""
current_rank = next(
(i for i, t in enumerate(current_items) if key_func(t) == key_func(item)), -1
)
previous_rank = next(
(i for i, t in enumerate(previous_items) if key_func(t) == key_func(item)), -1
)
is_new = previous_rank == -1
if is_new:
return {"trend": "rising", "is_new": True}
elif current_rank == -1:
return {"trend": "falling", "is_new": False}
elif current_rank < previous_rank:
return {"trend": "rising", "is_new": False}
elif current_rank > previous_rank:
return {"trend": "falling", "is_new": False}
else:
return {"trend": "stable", "is_new": False}
def calculate_album_trend(
album_entry: Album, current_albums: list[Album], previous_albums: list[Album]
):
return calculate_trend(
album_entry, current_albums, previous_albums, lambda a: a.albumhash
)
def calculate_artist_trend(
artist: dict[str, Any],
current_artists: list[dict[str, Any]],
previous_artists: list[dict[str, Any]],
):
return calculate_trend(
artist, current_artists, previous_artists, lambda a: a["artisthash"]
)
def calculate_track_trend(
track: Track, current_tracks: list[Track], previous_tracks: list[Track]
):
return calculate_trend(
track, current_tracks, previous_tracks, lambda t: t.trackhash
)
def calculate_scrobble_trend(current_scrobbles: int, previous_scrobbles: int) -> str:
return (
"rising"
if current_scrobbles > previous_scrobbles
else ("falling" if current_scrobbles < previous_scrobbles else "stable")
)
def calculate_new_artists(
current_artists: list[dict[str, Any]], timestamp: int, userid: int | None = None
):
"""
Calculate the number of new artists based on the current and all previous scrobbles.
"""
current_artists_set = {artist["artisthash"] for artist in current_artists}
all_records = ScrobbleTable.get_all_in_period(0, timestamp, userid)
trackhashes = {record.trackhash for record in all_records}
previous_artists_set = set()
for record in trackhashes:
entry = TrackStore.trackhashmap.get(record)
if not entry:
continue
entry = entry.tracks[0]
for artist in entry.artists:
artisthash = artist["artisthash"]
previous_artists_set.add(artisthash)
return len(current_artists_set - previous_artists_set)
def calculate_new_albums(current_albums: list[Album], previous_albums: list[Album]):
current_albums_set = {album.albumhash for album in current_albums}
previous_albums_set = {album.albumhash for album in previous_albums}
return len(current_albums_set - previous_albums_set)
def get_track_group_stats(tracks: list[Track], is_album: bool = False):
if len(tracks) == 0:
return []
played_tracks = [track for track in tracks if track.playcount > 0]
unplayed_count = len(tracks) - len(played_tracks)
played_stat = StatItem(
"played",
"never played",
f"{unplayed_count}/{len(tracks)} tracks",
)
play_duration = sum(track.playduration for track in played_tracks)
play_duration_stat = StatItem(
"play_duration",
"listened all time",
f"{seconds_to_time_string(play_duration)}",
)
try:
top_track = max(played_tracks, key=lambda x: x.playduration)
except ValueError:
top_track = None
top_track_stat = (
StatItem(
"toptrack",
f"top track ({seconds_to_time_string(top_track.playduration)} listened)",
f"{top_track.title}",
top_track.image if top_track else None,
)
if top_track
else StatItem(
"toptrack",
"top track",
"",
)
)
albums_map = {}
for track in tracks:
if track.albumhash not in albums_map:
albums_map[track.albumhash] = {
"playcount": 0,
"playduration": 0,
"title": track.album,
"image": track.image if track.image else None,
}
albums_map[track.albumhash]["playcount"] += 1
albums_map[track.albumhash]["playduration"] += track.playduration
stats = [play_duration_stat, played_stat, top_track_stat]
if not is_album:
albums = list(albums_map.values())
albums.sort(key=lambda x: x["playduration"], reverse=True)
top_album = albums[0] if albums[0]["playduration"] else None
top_album_stat = (
StatItem(
"topalbum",
f"top album ({seconds_to_time_string(top_album['playduration'])} listened)",
f"{top_album['title']}",
top_album["image"],
)
if top_album
else StatItem(
"topalbum",
"top album",
"",
)
)
stats.append(top_album_stat)
if is_album:
tracktotal: int = max(
int(track.extra.get("track_total", 0) or 0) for track in tracks
)
percentage = (len(tracks) / tracktotal) * 100 if tracktotal > 0 else 101
completedness = int(percentage) if percentage <= 100 else "?"
completeness_stat = (
StatItem(
"completeness",
f"{len(tracks)}/{tracktotal} tracks available",
f"{completedness}% complete",
)
if tracktotal
else StatItem(
"completeness",
f"{len(tracks)}/? tracks available",
"?",
)
)
stats.append(completeness_stat)
return stats
+45
View File
@@ -0,0 +1,45 @@
import threading
from multiprocessing import Pipe, Process
def background(func):
"""
Runs the decorated function in a background thread.
"""
def background_func(*a, **kw):
threading.Thread(target=func, args=a, kwargs=kw).start()
return background_func
class ProcessWithReturnValue(Process):
"""
A process class that returns a value on join.
Uses a pipe to communicate the return value back to the parent process.
"""
def __init__(
self, group=None, target=None, name=None, args=(), kwargs=None, Verbose=None
):
if kwargs is None:
kwargs = {}
Process.__init__(
self, group=group, target=target, name=name, args=args, kwargs=kwargs
)
self._parent_conn, self._child_conn = Pipe()
self._target = target
self._args = args
self._kwargs = kwargs
def run(self):
if self._target is not None:
result = self._target(*self._args, **self._kwargs)
self._child_conn.send(result)
self._child_conn.close()
def join(self, *args):
Process.join(self, *args)
if self._parent_conn.poll():
return self._parent_conn.recv()
return None
+5
View File
@@ -0,0 +1,5 @@
def handle_unicode(string: str):
"""
Handles Unicode errors by ignoring unicode characters
"""
return string.encode("utf-16", "ignore").decode("utf-16")
+17
View File
@@ -0,0 +1,17 @@
import platform
IS_WIN = platform.system() == "Windows"
def is_windows():
"""
Returns True if the OS is Windows.
"""
return IS_WIN
def win_replace_slash(path: str):
if is_windows():
return path.replace("\\", "/").replace("//", "/")
return path