Files
swingmusic-extended/app/lib/searchlib.py
T
2025-02-25 20:53:39 +03:00

325 lines
9.2 KiB
Python

"""
This library contains all the functions related to the search functionality.
"""
from typing import Any, Generator, List, TypeVar
from rapidfuzz import process, utils, fuzz
from unidecode import unidecode
from app import models
from app.config import UserConfig
# from app.db.libdata import AlbumTable, ArtistTable, TrackTable
# from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.models.album import Album
from app.models.artist import Artist
from app.models.enums import FavType
from app.models.playlist import Playlist
from app.models.track import Track
from app.serializers.album import serialize_for_card as serialize_album
from app.serializers.album import serialize_for_card_many as serialize_albums
from app.serializers.artist import serialize_for_card, serialize_for_cards
from app.serializers.track import serialize_track, serialize_tracks
from app.store.albums import AlbumStore
from app.store.artists import ArtistStore
from app.store.tracks import TrackStore
from app.utils.remove_duplicates import remove_duplicates
# ratio = fuzz.ratio
# wratio = fuzz.WRatio
class Cutoff:
"""
Holds all the default cutoff values.
"""
tracks: int = 50
albums: int = 50
artists: int = 50
playlists: int = 50
class Limit:
"""
Holds all the default limit values.
"""
tracks: int = 150
albums: int = 150
artists: int = 150
playlists: int = 150
class SearchTracks:
def __init__(self, query: str) -> None:
self.query = query
self.tracks = TrackStore.get_flat_list()
def __call__(self, limit: int = Limit.tracks) -> List[models.Track]:
"""
Gets all songs with a given title.
"""
track_titles = [unidecode(track.title).lower() for track in self.tracks]
results = process.extract(
self.query,
track_titles,
score_cutoff=Cutoff.tracks,
limit=limit,
processor=utils.default_process,
scorer=fuzz.WRatio,
)
tracks: list[Track] = []
for item in results:
track = self.tracks[item[2]]
track._score = item[1]
tracks.append(track)
return remove_duplicates(tracks)
class SearchArtists:
def __init__(self, query: str) -> None:
self.query = query
self.artists = ArtistStore.get_flat_list()
def __call__(self, limit: int = Limit.artists):
"""
Gets all artists with a given name.
"""
choices = [unidecode(a.name).lower() for a in self.artists]
results = process.extract(
self.query,
choices,
score_cutoff=Cutoff.artists,
limit=limit,
processor=utils.default_process,
scorer=fuzz.WRatio,
)
artists: list[Artist] = []
for item in results:
artist = self.artists[item[2]]
artist._score = item[1]
artists.append(artist)
return artists
class SearchAlbums:
def __init__(self, query: str) -> None:
self.query = query
self.albums = AlbumStore.get_flat_list()
def __call__(self, limit: int = Limit.albums):
"""
Gets all albums with a given title.
"""
choices = [unidecode(a.title).lower() for a in self.albums]
results = process.extract(
self.query,
choices,
score_cutoff=Cutoff.albums,
limit=limit,
processor=utils.default_process,
scorer=fuzz.token_sort_ratio,
)
albums: list[Album] = []
for item in results:
album = self.albums[item[2]]
album._score = item[1]
albums.append(album)
return albums
class SearchPlaylists:
def __init__(self, playlists: List[models.Playlist], query: str) -> None:
self.playlists = playlists
self.query = query
def __call__(self, limit: int = Limit.playlists):
choices = [p.name for p in self.playlists]
results = process.extract(
self.query,
choices,
score_cutoff=Cutoff.playlists,
limit=limit,
processor=utils.default_process,
scorer=fuzz.WRatio,
)
playlists: list[Playlist] = []
for item in results:
playlist = self.playlists[item[2]]
playlist._score = item[1]
playlists.append(playlist)
return playlists
_type = models.Track | models.Album | models.Artist
def get_titles(items: list[_type]):
for item in items:
if isinstance(item, models.Track):
text = item.og_title
elif isinstance(item, models.Album):
text = item.title
elif isinstance(item, models.Artist):
text = item.name
else:
text = None
yield text
class TopResults:
"""
Joins all tracks, albums and artists
then fuzzy searches them as a single unit.
"""
@staticmethod
def collect_all():
all_items: list[_type] = []
all_items.extend(ArtistStore.get_flat_list())
all_items.extend(TrackStore.get_flat_list())
all_items.extend(AlbumStore.get_flat_list())
return all_items, get_titles(all_items)
@staticmethod
def get_track_items(item: Track | Album | Artist, limit=5):
tracks: list[Track] = []
# INFO: If the item is a track, return empty list
# to be filled by the results from the top search
if isinstance(item, Track):
return tracks
# INFO: If the item is an album, get the tracks from the album
if isinstance(item, Album):
tracks = TrackStore.get_tracks_by_albumhash(item.albumhash)[:limit]
tracks.sort(key=lambda x: x.playduration, reverse=True)
return tracks
# INFO: If the item is an artist, get the tracks from the artist
if isinstance(item, Artist):
tracks = TrackStore.get_tracks_by_artisthash(item.artisthash)[:limit]
tracks.sort(key=lambda x: x.playduration, reverse=True)
return tracks
@staticmethod
def get_album_items(item: Track | Album | Artist, limit=6):
albums: list[Album] = []
# INFO: If the item is a track or album, search for albums
if isinstance(item, Track) or isinstance(item, Album):
return albums
# INFO: If the item is an artist, get the albums from the artist
if isinstance(item, Artist):
albums = AlbumStore.get_albums_by_artisthash(item.artisthash)[:limit]
return albums
@staticmethod
def search(
query: str,
limit: int = None,
albums_only=False,
tracks_only=False,
):
tracks_limit = Limit.tracks if tracks_only else 4
albums_limit = Limit.albums if albums_only else limit
artists_limit = limit
# INFO: Individually search all stores as each type has a different scorer
tracks = SearchTracks(query)(limit=tracks_limit) if not albums_only else []
albums = SearchAlbums(query)(limit=albums_limit)
artists = SearchArtists(query)(limit=artists_limit)
# INFO: Combine all results and sort them by score
all_results = artists + tracks + albums
all_results = sorted(all_results, key=lambda x: int(x._score), reverse=True)
# INFO: Get the top result
top_result = all_results[0]
top_tracks = []
if not albums_only:
top_tracks = TopResults.get_track_items(top_result, limit=tracks_limit)
# INFO: If there are not enough tracks, fill with search results
if len(top_tracks) < tracks_limit:
found_tracks_set = {track.trackhash for track in top_tracks}
for track in tracks:
if track.trackhash not in found_tracks_set:
top_tracks.append(track)
if len(top_tracks) >= tracks_limit:
break
top_tracks = serialize_tracks(top_tracks)
if tracks_only:
return top_tracks
top_albums = TopResults.get_album_items(top_result, limit=albums_limit)
# INFO: If there are not enough albums, fill with search results
if len(top_albums) < albums_limit:
found_albums_set = {album.albumhash for album in top_albums}
for album in albums:
if album.albumhash not in found_albums_set:
top_albums.append(album)
if len(top_albums) >= albums_limit:
break
top_albums = serialize_albums(top_albums)
if albums_only:
return top_albums
artists = serialize_for_cards(artists)
if isinstance(top_result, Track):
top_result = serialize_track(top_result)
top_result["type"] = "track"
if isinstance(top_result, Album):
top_result = serialize_album(top_result)
top_result["type"] = "album"
if isinstance(top_result, Artist):
top_result = serialize_for_card(top_result)
top_result["type"] = "artist"
return {
"top_result": top_result,
"tracks": top_tracks,
"artists": artists,
"albums": top_albums,
}