mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
implement artist split ingore list
+ move post processing of tags to the track model + rebuild stores on settings update via API + check files from the store instead of the db when streaming + remove deprecetated table columns +misc
This commit is contained in:
+14
-114
@@ -7,11 +7,7 @@ from app.api.auth import admin_required
|
||||
|
||||
from app.db.userdata import PluginTable
|
||||
from app.lib.index import index_everything
|
||||
from app.logger import log
|
||||
from app.settings import Info
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.artists import ArtistStore
|
||||
from app.store.tracks import TrackStore
|
||||
from app.config import UserConfig
|
||||
|
||||
bp_tag = Tag(name="Settings", description="Customize stuff")
|
||||
@@ -24,65 +20,6 @@ def get_child_dirs(parent: str, children: list[str]):
|
||||
return [_dir for _dir in children if _dir.startswith(parent) and _dir != parent]
|
||||
|
||||
|
||||
def reload_everything(instance_key: str):
|
||||
"""
|
||||
Reloads all stores using the current database items
|
||||
"""
|
||||
try:
|
||||
TrackStore.load_all_tracks(instance_key)
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
try:
|
||||
AlbumStore.load_albums(instance_key=instance_key)
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
try:
|
||||
ArtistStore.load_artists(instance_key)
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
|
||||
# CHECKPOINT: TEST SETTINGS API ENDPOINTS
|
||||
|
||||
# @background
|
||||
# def rebuild_store(db_dirs: list[str]):
|
||||
# """
|
||||
# Restarts watchdog and rebuilds the music library.
|
||||
# """
|
||||
# instance_key = get_random_str()
|
||||
|
||||
# log.info("Rebuilding library...")
|
||||
# trackdb.remove_tracks_not_in_folders(db_dirs)
|
||||
# reload_everything(instance_key)
|
||||
|
||||
# try:
|
||||
# populate.Populate(instance_key=instance_key)
|
||||
# except populate.PopulateCancelledError as e:
|
||||
# print(e)
|
||||
# reload_everything(instance_key)
|
||||
# return
|
||||
|
||||
# WatchDog().restart()
|
||||
|
||||
# log.info("Rebuilding library... ✅")
|
||||
|
||||
|
||||
# # I freaking don't know what this function does anymore
|
||||
# def finalize(new_: list[str], removed_: list[str], db_dirs_: list[str]):
|
||||
# """
|
||||
# Params:
|
||||
# new_: will be added to the database
|
||||
# removed_: will be removed from the database
|
||||
# db_dirs_: will be used to remove tracks that
|
||||
# are outside these directories from the database and store.
|
||||
# """
|
||||
# sdb.remove_root_dirs(removed_)
|
||||
# sdb.add_root_dirs(new_)
|
||||
# rebuild_store(db_dirs_)
|
||||
|
||||
|
||||
class AddRootDirsBody(BaseModel):
|
||||
new_dirs: list[str] = Field(
|
||||
description="The new directories to add",
|
||||
@@ -151,18 +88,6 @@ def get_root_dirs():
|
||||
return {"dirs": UserConfig().rootDirs}
|
||||
|
||||
|
||||
# maps settings to their parser flags
|
||||
# mapp = {
|
||||
# "artist_separators": SessionVarKeys.ARTIST_SEPARATORS,
|
||||
# "extract_feat": SessionVarKeys.EXTRACT_FEAT,
|
||||
# "remove_prod": SessionVarKeys.REMOVE_PROD,
|
||||
# "clean_album_title": SessionVarKeys.CLEAN_ALBUM_TITLE,
|
||||
# "remove_remaster": SessionVarKeys.REMOVE_REMASTER_FROM_TRACK,
|
||||
# "merge_albums": SessionVarKeys.MERGE_ALBUM_VERSIONS,
|
||||
# "show_albums_as_singles": SessionVarKeys.SHOW_ALBUMS_AS_SINGLES,
|
||||
# }
|
||||
|
||||
|
||||
@api.get("")
|
||||
def get_all_settings():
|
||||
"""
|
||||
@@ -176,11 +101,6 @@ def get_all_settings():
|
||||
return config
|
||||
|
||||
|
||||
# @background
|
||||
# def reload_all_for_set_setting():
|
||||
# reload_everything(get_random_str())
|
||||
|
||||
|
||||
class SetSettingBody(BaseModel):
|
||||
key: str = Field(
|
||||
description="The setting key",
|
||||
@@ -192,39 +112,6 @@ class SetSettingBody(BaseModel):
|
||||
)
|
||||
|
||||
|
||||
# @api.post("/set")
|
||||
# @admin_required()
|
||||
# def set_setting(body: SetSettingBody):
|
||||
# """
|
||||
# Set a setting.
|
||||
# """
|
||||
# key = body.key
|
||||
# value = body.value
|
||||
|
||||
# if key is None or value is None or key == "root_dirs":
|
||||
# return {"msg": "Invalid arguments!"}, 400
|
||||
|
||||
# root_dir = sdb.get_root_dirs()
|
||||
|
||||
# if not root_dir:
|
||||
# return {"msg": "No root directories set!"}, 400
|
||||
|
||||
# if key not in mapp:
|
||||
# return {"msg": "Invalid key!"}, 400
|
||||
|
||||
# if key == "artist_separators":
|
||||
# value = str(value).split(",")
|
||||
# value = set(value)
|
||||
|
||||
# reload_all_for_set_setting()
|
||||
|
||||
# # if value is a set, convert it to a string
|
||||
# # (artist_separators)
|
||||
# if type(value) == set:
|
||||
# value = ",".join(value)
|
||||
|
||||
# return {"result": value}
|
||||
|
||||
@api.get("/trigger-scan")
|
||||
def trigger_scan():
|
||||
"""
|
||||
@@ -256,7 +143,20 @@ def update_config(body: UpdateConfigBody):
|
||||
body.value = body.value.split(",")
|
||||
|
||||
setattr(config, body.key, body.value)
|
||||
print(getattr(config, body.key))
|
||||
|
||||
# INFO: Rebuild stores when these settings are updated
|
||||
reset_stores_lists = {
|
||||
"artistSeparators",
|
||||
"artistSplitIgnoreList",
|
||||
"removeProdBy",
|
||||
"removeRemasterInfo",
|
||||
"mergeAlbums",
|
||||
"cleanAlbumTitle",
|
||||
"showAlbumsAsSingles",
|
||||
}
|
||||
|
||||
if body.key in reset_stores_lists:
|
||||
index_everything()
|
||||
|
||||
return {
|
||||
"msg": "Config updated!",
|
||||
|
||||
+38
-8
@@ -10,8 +10,7 @@ from pydantic import BaseModel, Field
|
||||
from app.api.apischemas import TrackHashSchema
|
||||
from app.lib.trackslib import get_silence_paddings
|
||||
|
||||
# from app.store.tracks import TrackStore
|
||||
from app.db.libdata import TrackTable
|
||||
from app.store.tracks import TrackStore
|
||||
from app.utils.files import guess_mime_type
|
||||
|
||||
bp_tag = Tag(name="File", description="Audio files")
|
||||
@@ -35,10 +34,26 @@ def send_track_file_legacy(path: TrackHashSchema, query: SendTrackFileQuery):
|
||||
filepath = query.filepath
|
||||
msg = {"msg": "File Not Found"}
|
||||
|
||||
track = TrackTable.get_track_by_trackhash(trackhash, filepath)
|
||||
track_exists = track is not None and os.path.exists(track.filepath)
|
||||
track = None
|
||||
tracks = TrackStore.get_tracks_by_filepaths([filepath])
|
||||
|
||||
if track_exists:
|
||||
|
||||
if len(tracks) > 0 and os.path.exists(filepath):
|
||||
track = tracks[0]
|
||||
else:
|
||||
res = TrackStore.trackhashmap.get(trackhash)
|
||||
|
||||
# When finding by trackhash, sort by bitrate
|
||||
# and get the first track that exists
|
||||
if res is not None:
|
||||
tracks = sorted(res.tracks, key=lambda x: x.bitrate, reverse=True)
|
||||
|
||||
for t in tracks:
|
||||
if os.path.exists(t.filepath):
|
||||
track = t
|
||||
break
|
||||
|
||||
if track is not None:
|
||||
audio_type = guess_mime_type(filepath)
|
||||
return send_file(filepath, mimetype=audio_type, conditional=True)
|
||||
|
||||
@@ -57,10 +72,25 @@ def send_track_file(path: TrackHashSchema, query: SendTrackFileQuery):
|
||||
msg = {"msg": "File Not Found"}
|
||||
|
||||
# If filepath is provided, try to send that
|
||||
track = TrackTable.get_track_by_trackhash(trackhash, filepath)
|
||||
track_exists = track is not None and os.path.exists(track.filepath)
|
||||
track = None
|
||||
tracks = TrackStore.get_tracks_by_filepaths([filepath])
|
||||
|
||||
if track_exists:
|
||||
if len(tracks) > 0 and os.path.exists(filepath):
|
||||
track = tracks[0]
|
||||
else:
|
||||
res = TrackStore.trackhashmap.get(trackhash)
|
||||
|
||||
# When finding by trackhash, sort by bitrate
|
||||
# and get the first track that exists
|
||||
if res is not None:
|
||||
tracks = sorted(res.tracks, key=lambda x: x.bitrate, reverse=True)
|
||||
|
||||
for t in tracks:
|
||||
if os.path.exists(t.filepath):
|
||||
track = t
|
||||
break
|
||||
|
||||
if track is not None:
|
||||
audio_type = guess_mime_type(filepath)
|
||||
return send_file_as_chunks(track.filepath, audio_type)
|
||||
|
||||
|
||||
@@ -22,6 +22,13 @@ class UserConfig:
|
||||
rootDirs: list[str] = field(default_factory=list)
|
||||
excludeDirs: list[str] = field(default_factory=list)
|
||||
artistSeparators: set[str] = field(default_factory=lambda: {";", "/"})
|
||||
artistSplitIgnoreList: set[str] = field(
|
||||
default_factory=lambda: {
|
||||
"AC/DC",
|
||||
"Bob marley & the wailers",
|
||||
"Crosby, Stills, Nash & Young",
|
||||
}
|
||||
)
|
||||
genreSeparators: set[str] = field(default_factory=lambda: {"/", ";", "&"})
|
||||
|
||||
# tracks
|
||||
|
||||
+20
-7
@@ -109,10 +109,10 @@ class TrackTable(Base):
|
||||
|
||||
id: Mapped[int] = mapped_column(init=False, primary_key=True)
|
||||
album: Mapped[str] = mapped_column(String())
|
||||
albumartists: Mapped[list[dict[str, str]]] = mapped_column(JSON())
|
||||
albumartists: Mapped[str] = mapped_column(String())
|
||||
albumhash: Mapped[str] = mapped_column(String(), index=True)
|
||||
artisthashes: Mapped[list[str]] = mapped_column(JSON(), index=True)
|
||||
artists: Mapped[list[dict[str, str]]] = mapped_column(JSON(), index=True)
|
||||
# artisthashes: Mapped[list[str]] = mapped_column(JSON(), index=True)
|
||||
artists: Mapped[str] = mapped_column(String())
|
||||
bitrate: Mapped[int] = mapped_column(Integer())
|
||||
copyright: Mapped[Optional[str]] = mapped_column(String())
|
||||
date: Mapped[int] = mapped_column(Integer(), nullable=True)
|
||||
@@ -120,11 +120,11 @@ class TrackTable(Base):
|
||||
duration: Mapped[int] = mapped_column(Integer())
|
||||
filepath: Mapped[str] = mapped_column(String(), index=True, unique=True)
|
||||
folder: Mapped[str] = mapped_column(String(), index=True)
|
||||
genrehashes: Mapped[list[str]] = mapped_column(JSON(), index=True)
|
||||
genres: Mapped[Optional[list[dict[str, str]]]] = mapped_column(JSON())
|
||||
# genrehashes: Mapped[list[str]] = mapped_column(JSON(), index=True)
|
||||
genres: Mapped[Optional[str]] = mapped_column(String())
|
||||
last_mod: Mapped[float] = mapped_column(Integer())
|
||||
og_album: Mapped[str] = mapped_column(String())
|
||||
og_title: Mapped[str] = mapped_column(String())
|
||||
# og_album: Mapped[str] = mapped_column(String())
|
||||
# og_title: Mapped[str] = mapped_column(String())
|
||||
title: Mapped[str] = mapped_column(String())
|
||||
track: Mapped[int] = mapped_column(Integer())
|
||||
trackhash: Mapped[str] = mapped_column(String(), index=True)
|
||||
@@ -250,6 +250,19 @@ class TrackTable(Base):
|
||||
TrackTable, TrackTable.trackhash, trackhash, duration, timestamp
|
||||
)
|
||||
|
||||
# @classmethod
|
||||
# def update_artist_separators(cls, separators: set[str]):
|
||||
# tracks = cls.get_all()
|
||||
|
||||
# with DbEngine.manager(commit=True) as conn:
|
||||
# for track in tracks:
|
||||
# track.split_artists(separators)
|
||||
# conn.execute(
|
||||
# update(cls)
|
||||
# .where(cls.trackhash == track.trackhash)
|
||||
# .values(artists=track.artists, artisthashes=track.artisthashes)
|
||||
# )
|
||||
|
||||
|
||||
class AlbumTable(Base):
|
||||
__tablename__ = "album"
|
||||
|
||||
+4
-3
@@ -1,5 +1,6 @@
|
||||
from typing import Any
|
||||
|
||||
from app.config import UserConfig
|
||||
from app.models import Album as AlbumModel, Artist as ArtistModel, Track as TrackModel
|
||||
from app.models.favorite import Favorite
|
||||
from app.models.lastfm import SimilarArtist
|
||||
@@ -9,12 +10,12 @@ from app.models.plugins import Plugin
|
||||
from app.models.user import User
|
||||
|
||||
|
||||
def track_to_dataclass(track: Any):
|
||||
return TrackModel(**track._asdict())
|
||||
def track_to_dataclass(track: Any, config: UserConfig):
|
||||
return TrackModel(**track._asdict(), config=config)
|
||||
|
||||
|
||||
def tracks_to_dataclasses(tracks: Any):
|
||||
return [track_to_dataclass(track) for track in tracks]
|
||||
return [track_to_dataclass(track, UserConfig()) for track in tracks]
|
||||
|
||||
|
||||
def album_to_dataclass(album: Any):
|
||||
|
||||
+1
-1
@@ -124,7 +124,7 @@ class IndexTracks:
|
||||
log.warning("'Populate.tag_untagged': Populate key changed")
|
||||
return
|
||||
|
||||
tags = get_tags(file, artist_separators=config.artistSeparators)
|
||||
tags = get_tags(file, config=config)
|
||||
|
||||
if tags is not None:
|
||||
TrackTable.insert_one(tags)
|
||||
|
||||
+89
-88
@@ -14,13 +14,7 @@ from tinytag import TinyTag
|
||||
from app.config import UserConfig
|
||||
from app.settings import Defaults, Paths
|
||||
from app.utils.hashing import create_hash
|
||||
from app.utils.parsers import (
|
||||
clean_title,
|
||||
get_base_title_and_versions,
|
||||
parse_feat_from_title,
|
||||
remove_prod,
|
||||
split_artists,
|
||||
)
|
||||
from app.utils.parsers import split_artists
|
||||
from app.utils.wintools import win_replace_slash
|
||||
|
||||
|
||||
@@ -109,13 +103,13 @@ def clean_filename(filename: str):
|
||||
class ParseData:
|
||||
artist: str
|
||||
title: str
|
||||
artist_separators: set[str]
|
||||
config: UserConfig
|
||||
|
||||
def __post_init__(self):
|
||||
self.artist = split_artists(self.artist, self.artist_separators)
|
||||
self.artist = split_artists(self.artist, self.config)
|
||||
|
||||
|
||||
def extract_artist_title(filename: str, artist_separators: set[str]):
|
||||
def extract_artist_title(filename: str, config: UserConfig):
|
||||
path = Path(filename).with_suffix("")
|
||||
|
||||
path = clean_filename(str(path))
|
||||
@@ -123,24 +117,30 @@ def extract_artist_title(filename: str, artist_separators: set[str]):
|
||||
split_result = [x.strip() for x in split_result]
|
||||
|
||||
if len(split_result) == 1:
|
||||
return ParseData("", split_result[0], artist_separators)
|
||||
return ParseData(
|
||||
"",
|
||||
split_result[0],
|
||||
config,
|
||||
)
|
||||
|
||||
if len(split_result) > 2:
|
||||
try:
|
||||
int(split_result[0])
|
||||
|
||||
return ParseData(
|
||||
split_result[1], " - ".join(split_result[2:]), artist_separators
|
||||
split_result[1],
|
||||
" - ".join(split_result[2:]),
|
||||
config,
|
||||
)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
artist = split_result[0]
|
||||
title = split_result[1]
|
||||
return ParseData(artist, title, artist_separators)
|
||||
return ParseData(artist, title, config)
|
||||
|
||||
|
||||
def get_tags(filepath: str, artist_separators: set[str]):
|
||||
def get_tags(filepath: str, config: UserConfig):
|
||||
"""
|
||||
Returns the tags for a given audio file.
|
||||
"""
|
||||
@@ -173,17 +173,20 @@ def get_tags(filepath: str, artist_separators: set[str]):
|
||||
for tag in to_filename:
|
||||
p = getattr(tags, tag)
|
||||
if p == "" or p is None:
|
||||
parse_data = extract_artist_title(filename, artist_separators)
|
||||
title = parse_data.title
|
||||
parse_data = extract_artist_title(filename, config)
|
||||
title = parse_data.title.replace("_", " ")
|
||||
setattr(tags, tag, title)
|
||||
|
||||
# tags.title = tags.title.replace("_", " ")
|
||||
# tags.album = tags.album.replace("_", " ")
|
||||
|
||||
parse = ["artist", "albumartist"]
|
||||
for tag in parse:
|
||||
p = getattr(tags, tag)
|
||||
|
||||
if p == "" or p is None:
|
||||
if not parse_data:
|
||||
parse_data = extract_artist_title(filename, artist_separators)
|
||||
parse_data = extract_artist_title(filename, config)
|
||||
|
||||
artist = parse_data.artist
|
||||
|
||||
@@ -229,112 +232,110 @@ def get_tags(filepath: str, artist_separators: set[str]):
|
||||
tags.artists = tags.artist
|
||||
tags.albumartists = tags.albumartist
|
||||
|
||||
split_artist = split_artists(tags.artist, separators=artist_separators)
|
||||
split_albumartists = split_artists(tags.albumartist, separators=artist_separators)
|
||||
new_title = tags.title
|
||||
# split_artist = split_artists(tags.artist, separators=config.artistSeparators)
|
||||
# split_albumartists = split_artists(tags.albumartist, separators=config.artistSeparators)
|
||||
# new_title = tags.title
|
||||
|
||||
# TODO: Figure out which is the best spot to create these hashes
|
||||
# create albumhash using og_album
|
||||
tags.albumhash = create_hash(tags.album or "", tags.albumartist)
|
||||
|
||||
config = UserConfig()
|
||||
|
||||
# extract featured artists
|
||||
if config.extractFeaturedArtists:
|
||||
feat, new_title = parse_feat_from_title(
|
||||
tags.title, separators=artist_separators
|
||||
)
|
||||
original_lower = "-".join([create_hash(a) for a in split_artist])
|
||||
split_artist.extend(a for a in feat if create_hash(a) not in original_lower)
|
||||
# if config.extractFeaturedArtists:
|
||||
# feat, new_title = parse_feat_from_title(
|
||||
# tags.title, separators=config.artistSeparators
|
||||
# )
|
||||
# original_lower = "-".join([create_hash(a) for a in split_artist])
|
||||
# split_artist.extend(a for a in feat if create_hash(a) not in original_lower)
|
||||
|
||||
# if no albumartist, assign to the first artist
|
||||
if not tags.albumartist:
|
||||
tags.albumartist = split_artist[:1]
|
||||
tags.albumartist = split_artists(tags.artist, config)[:1]
|
||||
|
||||
# create json objects for artists and albumartists
|
||||
tags.artists = [
|
||||
{
|
||||
"artisthash": create_hash(a, decode=True),
|
||||
"name": a,
|
||||
}
|
||||
for a in split_artist
|
||||
]
|
||||
# tags.artists = [
|
||||
# {
|
||||
# "artisthash": create_hash(a, decode=True),
|
||||
# "name": a,
|
||||
# }
|
||||
# for a in split_artist
|
||||
# ]
|
||||
|
||||
tags.albumartists = [
|
||||
{
|
||||
"artisthash": create_hash(a, decode=True),
|
||||
"name": a,
|
||||
}
|
||||
for a in split_albumartists
|
||||
]
|
||||
# tags.albumartists = [
|
||||
# {
|
||||
# "artisthash": create_hash(a, decode=True),
|
||||
# "name": a,
|
||||
# }
|
||||
# for a in split_albumartists
|
||||
# ]
|
||||
|
||||
tags.artisthashes = list(
|
||||
{a["artisthash"] for a in tags.artists}
|
||||
)
|
||||
# tags.artisthashes = list(
|
||||
# {a["artisthash"] for a in tags.artists}
|
||||
# )
|
||||
|
||||
# remove prod by
|
||||
if config.removeProdBy:
|
||||
new_title = remove_prod(new_title)
|
||||
# if config.removeProdBy:
|
||||
# new_title = remove_prod(new_title)
|
||||
|
||||
# if track is a single, ie.
|
||||
# if og_title == album, rename album to new_title
|
||||
if tags.title == tags.album:
|
||||
tags.album = new_title
|
||||
# if tags.title == tags.album:
|
||||
# tags.album = new_title
|
||||
|
||||
# remove remaster from track title
|
||||
if config.removeRemasterInfo:
|
||||
new_title = clean_title(new_title)
|
||||
# if config.removeRemasterInfo:
|
||||
# new_title = clean_title(new_title)
|
||||
|
||||
# save final title
|
||||
tags.og_title = tags.title
|
||||
tags.title = new_title
|
||||
tags.og_album = tags.album
|
||||
# tags.og_title = tags.title
|
||||
# tags.title = new_title
|
||||
# tags.og_album = tags.album
|
||||
|
||||
# clean album title
|
||||
if config.cleanAlbumTitle:
|
||||
tags.album, _ = get_base_title_and_versions(tags.album, get_versions=False)
|
||||
# if config.cleanAlbumTitle:
|
||||
# tags.album, _ = get_base_title_and_versions(tags.album, get_versions=False)
|
||||
|
||||
# merge album versions
|
||||
if config.mergeAlbums:
|
||||
tags.albumhash = create_hash(
|
||||
tags.album, *(a["name"] for a in tags.albumartists)
|
||||
)
|
||||
# if config.mergeAlbums:
|
||||
# tags.albumhash = create_hash(
|
||||
# tags.album, *(a["name"] for a in tags.albumartists)
|
||||
# )
|
||||
|
||||
# process genres
|
||||
if tags.genre:
|
||||
src_genres: str = tags.genre
|
||||
src_genres = src_genres.lower()
|
||||
# separators = {"/", ";", "&"}
|
||||
separators = set(config.genreSeparators)
|
||||
# if tags.genre:
|
||||
# src_genres: str = tags.genre
|
||||
# src_genres = src_genres.lower()
|
||||
# # separators = {"/", ";", "&"}
|
||||
# separators = set(config.genreSeparators)
|
||||
|
||||
contains_rnb = "r&b" in src_genres
|
||||
contains_rock = "rock & roll" in src_genres
|
||||
# contains_rnb = "r&b" in src_genres
|
||||
# contains_rock = "rock & roll" in src_genres
|
||||
|
||||
if contains_rnb:
|
||||
src_genres = src_genres.replace("r&b", "RnB")
|
||||
# if contains_rnb:
|
||||
# src_genres = src_genres.replace("r&b", "RnB")
|
||||
|
||||
if contains_rock:
|
||||
src_genres = src_genres.replace("rock & roll", "rock")
|
||||
# if contains_rock:
|
||||
# src_genres = src_genres.replace("rock & roll", "rock")
|
||||
|
||||
for s in separators:
|
||||
src_genres = src_genres.replace(s, ",")
|
||||
# for s in separators:
|
||||
# src_genres = src_genres.replace(s, ",")
|
||||
|
||||
genres_list: list[str] = src_genres.split(",")
|
||||
tags.genres = [
|
||||
{"name": g.strip(), "genrehash": create_hash(g.strip())}
|
||||
for g in genres_list
|
||||
]
|
||||
tags.genrehashes = [g["genrehash"] for g in tags.genres]
|
||||
else:
|
||||
tags.genres = []
|
||||
tags.genrehashes = []
|
||||
# genres_list: list[str] = src_genres.split(",")
|
||||
# tags.genres = [
|
||||
# {"name": g.strip(), "genrehash": create_hash(g.strip())}
|
||||
# for g in genres_list
|
||||
# ]
|
||||
# tags.genrehashes = [g["genrehash"] for g in tags.genres]
|
||||
# else:
|
||||
# tags.genres = []
|
||||
# tags.genrehashes = []
|
||||
|
||||
tags.genres = tags.genre
|
||||
|
||||
# sub underscore with space
|
||||
tags.title = tags.title.replace("_", " ")
|
||||
tags.album = tags.album.replace("_", " ")
|
||||
tags.trackhash = create_hash(
|
||||
*[a["name"] for a in tags.artists], tags.album, tags.title
|
||||
)
|
||||
# tags.title = tags.title.replace("_", " ")
|
||||
# tags.album = tags.album.replace("_", " ")
|
||||
tags.trackhash = create_hash(tags.artists, tags.album, tags.title)
|
||||
|
||||
more_extra = {
|
||||
"audio_offset": tags.audio_offset,
|
||||
|
||||
+125
-5
@@ -1,6 +1,15 @@
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from app.config import UserConfig
|
||||
from app.utils.auth import get_current_userid
|
||||
from app.utils.hashing import create_hash
|
||||
from app.utils.parsers import (
|
||||
clean_title,
|
||||
get_base_title_and_versions,
|
||||
parse_feat_from_title,
|
||||
remove_prod,
|
||||
split_artists,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
@@ -13,7 +22,6 @@ class Track:
|
||||
album: str
|
||||
albumartists: list[dict[str, str]]
|
||||
albumhash: str
|
||||
artisthashes: list[str]
|
||||
artists: list[dict[str, str]]
|
||||
bitrate: int
|
||||
copyright: str
|
||||
@@ -22,11 +30,8 @@ class Track:
|
||||
duration: int
|
||||
filepath: str
|
||||
folder: str
|
||||
genres: list[dict[str, str]]
|
||||
genrehashes: list[str]
|
||||
genres: str | list[dict[str, str]]
|
||||
last_mod: int
|
||||
og_album: str
|
||||
og_title: str
|
||||
title: str
|
||||
track: int
|
||||
trackhash: str
|
||||
@@ -35,6 +40,12 @@ class Track:
|
||||
playcount: int
|
||||
playduration: int
|
||||
|
||||
config: UserConfig
|
||||
og_album: str = ""
|
||||
og_title: str = ""
|
||||
artisthashes: list[str] = field(default_factory=list)
|
||||
genrehashes: list[str] = field(default_factory=list)
|
||||
|
||||
_pos: int = 0
|
||||
_ati: str = ""
|
||||
image: str = ""
|
||||
@@ -55,9 +66,118 @@ class Track:
|
||||
self.fav_userids.append(userid)
|
||||
|
||||
def __post_init__(self):
|
||||
self.og_title = self.title
|
||||
self.og_album = self.album
|
||||
|
||||
self.image = self.albumhash + ".webp"
|
||||
self.extra = {
|
||||
"disc_total": self.extra.get("disc_total", 0),
|
||||
"track_total": self.extra.get("track_total", 0),
|
||||
"samplerate": self.extra.get("samplerate", -1),
|
||||
}
|
||||
|
||||
self.split_artists()
|
||||
self.map_with_config()
|
||||
self.process_genres()
|
||||
|
||||
# Remove duplicates from artists and albumartists
|
||||
seen_artists = set()
|
||||
self.artists = [
|
||||
d
|
||||
for d in self.artists
|
||||
if tuple(d.items()) not in seen_artists
|
||||
and not seen_artists.add(tuple(d.items()))
|
||||
]
|
||||
|
||||
seen_albumartists = set()
|
||||
self.albumartists = [
|
||||
d
|
||||
for d in self.albumartists
|
||||
if tuple(d.items()) not in seen_albumartists
|
||||
and not seen_albumartists.add(tuple(d.items()))
|
||||
]
|
||||
|
||||
self.config = None
|
||||
|
||||
def split_artists(self):
|
||||
"""
|
||||
Splits the artists and albumartists based on the given separators, and updates the artisthashes.
|
||||
"""
|
||||
|
||||
def split(artists: str):
|
||||
return [
|
||||
{"name": a, "artisthash": create_hash(a, decode=True)}
|
||||
for a in split_artists(artists, config=self.config)
|
||||
]
|
||||
|
||||
self.artists = split(self.artists)
|
||||
self.albumartists = split(self.albumartists)
|
||||
self.artisthashes = [a["artisthash"] for a in self.artists]
|
||||
|
||||
def map_with_config(self):
|
||||
new_title = self.title
|
||||
|
||||
# Extract featured artists
|
||||
if self.config.extractFeaturedArtists:
|
||||
feat, new_title = parse_feat_from_title(self.title, self.config)
|
||||
feat = [
|
||||
{"name": f, "artisthash": create_hash(f, decode=True)} for f in feat
|
||||
]
|
||||
feat = [f for f in feat if f["artisthash"] not in self.artisthashes]
|
||||
self.artists.extend(feat)
|
||||
self.artisthashes.extend([f["artisthash"] for f in feat])
|
||||
|
||||
# Update album title for singles
|
||||
# ie. album: "Title (feat. Artist)"
|
||||
# title: "Title (feat. Artist)"
|
||||
# becomes: album: "Title", title: "Title"
|
||||
if self.og_album == self.og_title:
|
||||
self.album = new_title
|
||||
|
||||
# Clean track title
|
||||
if self.config.removeProdBy:
|
||||
new_title = remove_prod(new_title)
|
||||
|
||||
# if self.title == new_title:
|
||||
# self.album = new_title
|
||||
|
||||
if self.config.removeRemasterInfo:
|
||||
new_title = clean_title(new_title)
|
||||
|
||||
self.title = new_title
|
||||
|
||||
# Clean album title
|
||||
if self.config.cleanAlbumTitle:
|
||||
self.album, _ = get_base_title_and_versions(self.album, get_versions=False)
|
||||
|
||||
if self.config.mergeAlbums:
|
||||
self.albumhash = create_hash(
|
||||
self.album, *(a["name"] for a in self.albumartists)
|
||||
)
|
||||
|
||||
def process_genres(self):
|
||||
if self.genres:
|
||||
src_genres: str = self.genres
|
||||
|
||||
src_genres = src_genres.lower()
|
||||
# separators = {"/", ";", "&"}
|
||||
separators = set(self.config.genreSeparators)
|
||||
|
||||
contains_rnb = "r&b" in src_genres
|
||||
contains_rock = "rock & roll" in src_genres
|
||||
|
||||
if contains_rnb:
|
||||
src_genres = src_genres.replace("r&b", "RnB")
|
||||
|
||||
if contains_rock:
|
||||
src_genres = src_genres.replace("rock & roll", "rock")
|
||||
|
||||
for s in separators:
|
||||
src_genres = src_genres.replace(s, ",")
|
||||
|
||||
genres_list: list[str] = src_genres.split(",")
|
||||
self.genres = [
|
||||
{"name": g.strip(), "genrehash": create_hash(g.strip())}
|
||||
for g in genres_list
|
||||
]
|
||||
self.genrehashes = [g["genrehash"] for g in self.genres]
|
||||
|
||||
@@ -215,8 +215,6 @@ class TrackStore:
|
||||
def get_tracks_by_filepaths(cls, paths: list[str]) -> list[Track]:
|
||||
"""
|
||||
Returns all tracks matching the given paths.
|
||||
|
||||
⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔⛔
|
||||
"""
|
||||
# tracks = sorted(cls.trackhashmap, key=lambda x: x.filepath)
|
||||
# tracks = use_bisection(tracks, "filepath", paths)
|
||||
|
||||
+44
-9
@@ -1,19 +1,54 @@
|
||||
import re
|
||||
|
||||
from app.config import UserConfig
|
||||
from app.enums.album_versions import AlbumVersionEnum, get_all_keywords
|
||||
|
||||
|
||||
def split_artists(src: str, separators: set[str]):
|
||||
def split_artists(src: str, config: UserConfig):
|
||||
"""
|
||||
Splits a string of artists into a list of artists.
|
||||
Splits a string of artists into a list of artists, preserving those in ignoreList.
|
||||
Case-insensitive matching is used for the ignoreList.
|
||||
"""
|
||||
for sep in separators:
|
||||
src = src.replace(sep, ",")
|
||||
result = []
|
||||
current = ""
|
||||
i = 0
|
||||
|
||||
artists = src.split(",")
|
||||
artists = [a.strip() for a in artists]
|
||||
while i < len(src):
|
||||
# Check if any ignored artist starts at this position (case-insensitive)
|
||||
ignored_match = next(
|
||||
(
|
||||
src[i : i + len(ignored)]
|
||||
for ignored in config.artistSplitIgnoreList
|
||||
if src.lower().startswith(ignored.lower(), i)
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
return [a for a in artists if a]
|
||||
if ignored_match:
|
||||
# If we have accumulated any current string, add it to result
|
||||
if current.strip():
|
||||
result.extend([a.strip() for a in current.split(",") if a.strip()])
|
||||
current = ""
|
||||
# Add the ignored artist to the result (preserving original case)
|
||||
result.append(ignored_match)
|
||||
# Move past the ignored artist
|
||||
i += len(ignored_match)
|
||||
elif src[i] in config.artistSeparators:
|
||||
# If we encounter a separator, process the current string
|
||||
if current.strip():
|
||||
result.extend([a.strip() for a in current.split(",") if a.strip()])
|
||||
current = ""
|
||||
i += 1
|
||||
else:
|
||||
# If it's not an ignored artist or a separator, add to current
|
||||
current += src[i]
|
||||
i += 1
|
||||
|
||||
# Process any remaining current string
|
||||
if current.strip():
|
||||
result.extend([a.strip() for a in current.split(",") if a.strip()])
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def remove_prod(title: str) -> str:
|
||||
@@ -36,7 +71,7 @@ def remove_prod(title: str) -> str:
|
||||
return title.strip()
|
||||
|
||||
|
||||
def parse_feat_from_title(title: str, separators: set[str]) -> tuple[list[str], str]:
|
||||
def parse_feat_from_title(title: str, config: UserConfig) -> tuple[list[str], str]:
|
||||
"""
|
||||
Extracts featured artists from a song title using regex.
|
||||
"""
|
||||
@@ -54,7 +89,7 @@ def parse_feat_from_title(title: str, separators: set[str]) -> tuple[list[str],
|
||||
return [], title
|
||||
|
||||
artists = match.group(1)
|
||||
artists = split_artists(artists, separators)
|
||||
artists = split_artists(artists, config)
|
||||
|
||||
# remove "feat" group from title
|
||||
new_title = re.sub(regex, "", title, flags=re.IGNORECASE)
|
||||
|
||||
Reference in New Issue
Block a user