mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
rewrite search functions as classes
This commit is contained in:
@@ -40,7 +40,7 @@ def search():
|
||||
if artist_obj not in artists_dicts:
|
||||
artists_dicts.append(artist_obj)
|
||||
|
||||
_tracks = searchlib.get_tracks(query)
|
||||
_tracks = searchlib.SearchTracks(query)()
|
||||
tracks = [*_tracks, *artist_tracks]
|
||||
|
||||
SEARCH_RESULTS.clear()
|
||||
|
||||
@@ -52,6 +52,10 @@ def create_everything() -> List[models.Track]:
|
||||
def find_album(albums: List[models.Album], hash: str) -> int | None:
|
||||
"""
|
||||
Finds an album by album title and artist.
|
||||
|
||||
:param `albums`: List of album objects.
|
||||
:param `hash`: Hash of album.
|
||||
:return: Index of album in list.
|
||||
"""
|
||||
|
||||
left = 0
|
||||
@@ -62,8 +66,11 @@ def find_album(albums: List[models.Album], hash: str) -> int | None:
|
||||
iter += 1
|
||||
mid = (left + right) // 2
|
||||
|
||||
if albums[mid].hash == hash:
|
||||
return mid
|
||||
try:
|
||||
if albums[mid].hash == hash:
|
||||
return mid
|
||||
except AttributeError:
|
||||
print(albums)
|
||||
|
||||
if albums[mid].hash < hash:
|
||||
left = mid + 1
|
||||
@@ -151,7 +158,7 @@ def get_album_tracks(album: str, artist: str) -> List:
|
||||
return GetAlbumTracks(album, artist).find_tracks()
|
||||
|
||||
|
||||
def create_album(track: dict, tracklist: list) -> models.Album:
|
||||
def create_album(track: dict, tracklist: list) -> dict:
|
||||
"""
|
||||
Generates and returns an album object from a track object.
|
||||
"""
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from multiprocessing import Pool
|
||||
from copy import deepcopy
|
||||
|
||||
import os
|
||||
from os import path
|
||||
import time
|
||||
from typing import List
|
||||
@@ -46,7 +47,7 @@ class Populate:
|
||||
|
||||
def run(self):
|
||||
self.check_untagged()
|
||||
self.tag_all_files()
|
||||
self.get_all_tags()
|
||||
|
||||
if len(self.tagged_tracks) == 0:
|
||||
return
|
||||
@@ -76,6 +77,17 @@ class Populate:
|
||||
|
||||
Log(f"Found {len(self.files)} untagged tracks")
|
||||
|
||||
def process_tags(self, tags: dict):
|
||||
for t in tags:
|
||||
if t is None:
|
||||
continue
|
||||
|
||||
t["albumhash"] = create_album_hash(t["album"], t["albumartist"])
|
||||
self.tagged_tracks.append(t)
|
||||
api.DB_TRACKS.append(t)
|
||||
|
||||
self.folders.add(t["folder"])
|
||||
|
||||
def get_tags(self, file: str):
|
||||
tags = get_tags(file)
|
||||
|
||||
@@ -87,15 +99,22 @@ class Populate:
|
||||
self.tagged_tracks.append(tags)
|
||||
api.DB_TRACKS.append(tags)
|
||||
|
||||
def tag_all_files(self):
|
||||
def get_all_tags(self):
|
||||
"""
|
||||
Loops through all the untagged files and tags them.
|
||||
"""
|
||||
|
||||
s = time.time()
|
||||
print(f"Started tagging files")
|
||||
with ThreadPoolExecutor() as executor:
|
||||
executor.map(self.get_tags, self.files)
|
||||
# print(f"Started tagging files")
|
||||
# with ThreadPoolExecutor() as executor:
|
||||
# executor.map(self.get_tags, self.files)
|
||||
|
||||
with Pool(maxtasksperchild=10) as p:
|
||||
tags = p.map(get_tags, tqdm(self.files))
|
||||
self.process_tags(tags)
|
||||
|
||||
# for t in tqdm(self.files):
|
||||
# self.get_tags(t)
|
||||
|
||||
d = time.time() - s
|
||||
Log(f"Tagged {len(self.tagged_tracks)} files in {d} seconds")
|
||||
|
||||
@@ -3,17 +3,100 @@ This library contains all the functions related to the search functionality.
|
||||
"""
|
||||
|
||||
from typing import List
|
||||
from app import models, helpers
|
||||
|
||||
from app import api, helpers, models
|
||||
from app.lib import albumslib
|
||||
from app import api
|
||||
from rapidfuzz import fuzz, process
|
||||
|
||||
|
||||
def get_tracks(query: str) -> List[models.Track]:
|
||||
"""
|
||||
Gets all songs with a given title.
|
||||
"""
|
||||
tracks = [track for track in api.TRACKS if query.lower() in track.title.lower()]
|
||||
return helpers.remove_duplicates(tracks)
|
||||
class SearchTracks:
|
||||
def __init__(self, query) -> None:
|
||||
self.query = query
|
||||
|
||||
def __call__(self) -> List[models.Track]:
|
||||
"""
|
||||
Gets all songs with a given title.
|
||||
"""
|
||||
|
||||
tracks = [track.title for track in api.TRACKS]
|
||||
results = process.extract(
|
||||
self.query, tracks, scorer=fuzz.token_set_ratio, score_cutoff=50
|
||||
)
|
||||
print(results)
|
||||
return [api.TRACKS[i[2]] for i in results]
|
||||
|
||||
|
||||
class SearchArtists:
|
||||
def __init__(self, query) -> None:
|
||||
self.query = query
|
||||
|
||||
@staticmethod
|
||||
def get_all_artist_names() -> List[str]:
|
||||
"""
|
||||
Gets all artist names.
|
||||
"""
|
||||
|
||||
artists = [track.artists for track in api.TRACKS]
|
||||
|
||||
f_artists = []
|
||||
for artist in artists:
|
||||
aa = artist.split(",")
|
||||
f_artists.extend(aa)
|
||||
|
||||
return f_artists
|
||||
|
||||
@staticmethod
|
||||
def get_valid_name(name: str) -> str:
|
||||
"""
|
||||
returns a valid artist name
|
||||
"""
|
||||
|
||||
return "".join([i for i in name if i not in '/\\:*?"<>|'])
|
||||
|
||||
def __call__(self) -> list:
|
||||
"""
|
||||
Gets all artists with a given name.
|
||||
"""
|
||||
|
||||
artists = self.get_all_artist_names()
|
||||
results = process.extract(self.query, artists)
|
||||
|
||||
f_artists = []
|
||||
for artist in results:
|
||||
aa = {
|
||||
"name": artist[0],
|
||||
"image": self.get_valid_name(artist[0]) + ".webp",
|
||||
}
|
||||
f_artists.append(aa)
|
||||
|
||||
return f_artists
|
||||
|
||||
|
||||
class SearchAlbums:
|
||||
def __init__(self, query) -> None:
|
||||
self.query = query
|
||||
|
||||
def get_albums_by_name(self) -> List[models.Album]:
|
||||
"""
|
||||
Gets all albums with a given title.
|
||||
"""
|
||||
|
||||
albums = [album.title for album in api.ALBUMS]
|
||||
results = process.extract(self.query, albums)
|
||||
return [api.ALBUMS[i[2]] for i in results]
|
||||
|
||||
def __call__(self) -> List[models.Album]:
|
||||
"""
|
||||
Gets all albums with a given title.
|
||||
"""
|
||||
|
||||
artists = SearchArtists(self.query)()
|
||||
a_albums = []
|
||||
|
||||
# get all artists that matched the query
|
||||
# for get all albums from the artists
|
||||
# get all albums that matched the query
|
||||
# return [**artist_albums **albums]
|
||||
|
||||
|
||||
def get_search_albums(query: str) -> List[models.Album]:
|
||||
@@ -27,4 +110,6 @@ def get_artists(artist: str) -> List[models.Track]:
|
||||
"""
|
||||
Gets all songs with a given artist.
|
||||
"""
|
||||
return [track for track in api.TRACKS if artist.lower() in str(track.artists).lower()]
|
||||
return [
|
||||
track for track in api.TRACKS if artist.lower() in str(track.artists).lower()
|
||||
]
|
||||
|
||||
@@ -154,7 +154,7 @@ def parse_disk_number(audio):
|
||||
return disk_number
|
||||
|
||||
|
||||
def get_tags(fullpath: str) -> dict:
|
||||
def get_tags(fullpath: str) -> dict | None:
|
||||
"""
|
||||
Returns a dictionary of tags for a given file.
|
||||
"""
|
||||
|
||||
@@ -50,18 +50,19 @@ def add_track(filepath: str) -> None:
|
||||
tags = get_tags(filepath)
|
||||
|
||||
if tags is not None:
|
||||
tags["albumhash"] = create_album_hash(tags["album"], tags["albumartist"])
|
||||
hash = create_album_hash(tags["album"], tags["albumartist"])
|
||||
tags["albumhash"] = hash
|
||||
api.DB_TRACKS.append(tags)
|
||||
|
||||
albumindex = find_album(tags["album"], tags["albumartist"])
|
||||
albumindex = find_album(api.ALBUMS, hash)
|
||||
|
||||
if albumindex is not None:
|
||||
album = api.ALBUMS[albumindex]
|
||||
else:
|
||||
album_data = create_album(tags, api.DB_TRACKS)
|
||||
instances.album_instance.insert_album(album_data)
|
||||
|
||||
album = models.Album(album_data)
|
||||
|
||||
instances.album_instance.insert_album(album)
|
||||
api.ALBUMS.append(album)
|
||||
|
||||
tags["image"] = album.image
|
||||
@@ -152,7 +153,14 @@ class Handler(PatternMatchingEventHandler):
|
||||
Fired when a created file is closed.
|
||||
"""
|
||||
print("⚫ closed ~~~")
|
||||
self.files_to_process.remove(event.src_path)
|
||||
try:
|
||||
self.files_to_process.remove(event.src_path)
|
||||
except ValueError:
|
||||
"""
|
||||
The file was already removed from the list, or it was not in the list to begin with.
|
||||
"""
|
||||
pass
|
||||
|
||||
add_track(event.src_path)
|
||||
|
||||
|
||||
|
||||
+20
-1
@@ -32,8 +32,11 @@ class Track:
|
||||
albumhash: str
|
||||
|
||||
def __init__(self, tags):
|
||||
try:
|
||||
self.trackid = tags["_id"]["$oid"]
|
||||
except KeyError:
|
||||
print(tags)
|
||||
|
||||
self.trackid = tags["_id"]["$oid"]
|
||||
self.title = tags["title"]
|
||||
self.artists = tags["artists"].split(", ")
|
||||
self.albumartist = tags["albumartist"]
|
||||
@@ -49,6 +52,22 @@ class Track:
|
||||
self.albumhash = tags["albumhash"]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class Artist:
|
||||
"""
|
||||
Artist class
|
||||
"""
|
||||
|
||||
artistid: str
|
||||
name: str
|
||||
image: str
|
||||
|
||||
def __init__(self, tags):
|
||||
self.artistid = tags["_id"]["$oid"]
|
||||
self.name = tags["name"]
|
||||
self.image = tags["image"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Album:
|
||||
"""
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
type="search"
|
||||
name=""
|
||||
id=""
|
||||
placeholder="Search this playlist"
|
||||
placeholder="Search here"
|
||||
class="rounded"
|
||||
/>
|
||||
</form>
|
||||
|
||||
@@ -137,14 +137,14 @@ export default async (
|
||||
add_to_playlist,
|
||||
play_next,
|
||||
add_to_q,
|
||||
add_to_fav,
|
||||
// add_to_fav,
|
||||
separator,
|
||||
go_to_folder,
|
||||
go_to_artist,
|
||||
go_to_alb_artist,
|
||||
// go_to_artist,
|
||||
// go_to_alb_artist,
|
||||
go_to_album,
|
||||
separator,
|
||||
del_track,
|
||||
// separator,
|
||||
// del_track,
|
||||
];
|
||||
|
||||
return options;
|
||||
|
||||
Reference in New Issue
Block a user