mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
move to xxh3 hashing algorithm
+ port: search
This commit is contained in:
@@ -166,12 +166,7 @@ def add_item_to_playlist(path: PlaylistIDPath, body: AddItemToPlaylistBody):
|
||||
else:
|
||||
trackhashes = []
|
||||
|
||||
# insert_count = PL.add_tracks_to_playlist(int(playlist_id), trackhashes)
|
||||
PlaylistTable.append_to_playlist(int(playlist_id), trackhashes)
|
||||
|
||||
# if insert_count == 0:
|
||||
# return {"error": "Item already exists in playlist"}, 409
|
||||
|
||||
return {"msg": "Done"}, 200
|
||||
|
||||
|
||||
@@ -211,15 +206,12 @@ def get_playlist(path: PlaylistIDPath, query: GetPlaylistQuery):
|
||||
playlist, tracks = handler()
|
||||
return format_custom_playlist(playlist, tracks)
|
||||
|
||||
# playlist = PL.get_playlist_by_id(int(playlistid))
|
||||
playlist = PlaylistTable.get_by_id(playlistid)
|
||||
|
||||
if playlist is None:
|
||||
return {"msg": "Playlist not found"}, 404
|
||||
|
||||
# tracks = TrackStore.get_tracks_by_trackhashes(list(playlist.trackhashes))
|
||||
tracks = TrackTable.get_tracks_by_trackhashes(playlist.trackhashes)
|
||||
tracks = remove_duplicates(tracks)
|
||||
|
||||
duration = sum(t.duration for t in tracks)
|
||||
playlist.last_updated = date_string_to_time_passed(playlist.last_updated)
|
||||
@@ -250,7 +242,6 @@ def update_playlist_info(path: PlaylistIDPath, form: UpdatePlaylistForm):
|
||||
Update playlist
|
||||
"""
|
||||
playlistid = path.playlistid
|
||||
# db_playlist = PL.get_playlist_by_id(playlistid)
|
||||
db_playlist = PlaylistTable.get_by_id(playlistid)
|
||||
|
||||
if db_playlist is None:
|
||||
@@ -270,7 +261,6 @@ def update_playlist_info(path: PlaylistIDPath, form: UpdatePlaylistForm):
|
||||
"last_updated": create_new_date(),
|
||||
"name": str(form.name).strip(),
|
||||
"settings": settings,
|
||||
"trackhashes": json.dumps([]),
|
||||
}
|
||||
|
||||
if image:
|
||||
@@ -290,7 +280,6 @@ def update_playlist_info(path: PlaylistIDPath, form: UpdatePlaylistForm):
|
||||
|
||||
p_tuple = (*playlist.values(),)
|
||||
|
||||
# PL.update_playlist(playlistid, playlist)
|
||||
PlaylistTable.update_one(playlistid, playlist)
|
||||
|
||||
playlist = models.Playlist(*p_tuple)
|
||||
@@ -306,8 +295,6 @@ def pin_unpin_playlist(path: PlaylistIDPath):
|
||||
"""
|
||||
Pin playlist.
|
||||
"""
|
||||
# playlist = PL.get_playlist_by_id(path.playlistid)
|
||||
|
||||
playlist = PlaylistTable.get_by_id(path.playlistid)
|
||||
|
||||
if playlist is None:
|
||||
@@ -320,7 +307,6 @@ def pin_unpin_playlist(path: PlaylistIDPath):
|
||||
except KeyError:
|
||||
settings["pinned"] = True
|
||||
|
||||
# PL.update_settings(path.playlistid, settings)
|
||||
PlaylistTable.update_settings(path.playlistid, settings)
|
||||
return {"msg": "Done"}, 200
|
||||
|
||||
@@ -330,13 +316,11 @@ def remove_playlist_image(path: PlaylistIDPath):
|
||||
"""
|
||||
Clear playlist image.
|
||||
"""
|
||||
# playlist = PL.get_playlist_by_id(path.playlistid)
|
||||
playlist = PlaylistTable.get_by_id(path.playlistid)
|
||||
|
||||
if playlist is None:
|
||||
return {"error": "Playlist not found"}, 404
|
||||
|
||||
# PL.remove_banner(path.playlistid)
|
||||
PlaylistTable.remove_image(path.playlistid)
|
||||
|
||||
playlist.image = None
|
||||
@@ -355,7 +339,6 @@ def remove_playlist(path: PlaylistIDPath):
|
||||
"""
|
||||
Delete playlist
|
||||
"""
|
||||
# PL.delete_playlist(path.playlistid)
|
||||
PlaylistTable.remove_one(path.playlistid)
|
||||
|
||||
return {"msg": "Done"}, 200
|
||||
@@ -378,7 +361,6 @@ def remove_tracks_from_playlist(
|
||||
# index: int;
|
||||
# }
|
||||
|
||||
# PL.remove_tracks_from_playlist(path.playlistid, body.tracks)
|
||||
PlaylistTable.remove_from_playlist(path.playlistid, body.tracks)
|
||||
|
||||
return {"msg": "Done"}, 200
|
||||
|
||||
+23
-51
@@ -2,16 +2,17 @@
|
||||
Contains all the search routes.
|
||||
"""
|
||||
|
||||
from flask import request
|
||||
from unidecode import unidecode
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import Field
|
||||
from flask_openapi3 import Tag
|
||||
from flask_openapi3 import APIBlueprint
|
||||
|
||||
from app import models
|
||||
from app.api.apischemas import GenericLimitSchema
|
||||
from app.db.libdata import TrackTable
|
||||
from app.lib import searchlib
|
||||
from app.settings import Defaults
|
||||
from app.store.tracks import TrackStore
|
||||
|
||||
|
||||
tag = Tag(name="Search", description="Search for tracks, albums and artists")
|
||||
api = APIBlueprint("search", __name__, url_prefix="/search", abp_tags=[tag])
|
||||
@@ -20,30 +21,18 @@ SEARCH_COUNT = 30
|
||||
"""The max amount of items to return per request"""
|
||||
|
||||
|
||||
def query_in_quotes(query: str) -> bool:
|
||||
"""
|
||||
Returns True if the query is in quotes
|
||||
"""
|
||||
try:
|
||||
return query.startswith('"') and query.endswith('"')
|
||||
except AttributeError:
|
||||
return False
|
||||
|
||||
|
||||
class Search:
|
||||
def __init__(self, query: str) -> None:
|
||||
self.tracks: list[models.Track] = []
|
||||
self.query = unidecode(query)
|
||||
|
||||
def search_tracks(self, in_quotes=False):
|
||||
def search_tracks(self):
|
||||
"""
|
||||
Calls :class:`SearchTracks` which returns the tracks that fuzzily match
|
||||
the search terms. Then adds them to the `SearchResults` store.
|
||||
"""
|
||||
self.tracks = TrackStore.tracks
|
||||
return searchlib.TopResults().search(
|
||||
self.query, tracks_only=True, in_quotes=in_quotes
|
||||
)
|
||||
self.tracks = TrackTable.get_all()
|
||||
return searchlib.TopResults().search(self.query, tracks_only=True)
|
||||
|
||||
def search_artists(self):
|
||||
"""Calls :class:`SearchArtists` which returns the artists that fuzzily match
|
||||
@@ -51,25 +40,23 @@ class Search:
|
||||
"""
|
||||
return searchlib.SearchArtists(self.query)()
|
||||
|
||||
def search_albums(self, in_quotes=False):
|
||||
def search_albums(self):
|
||||
"""Calls :class:`SearchAlbums` which returns the albums that fuzzily match
|
||||
the search term. Then adds them to the `SearchResults` store.
|
||||
"""
|
||||
return searchlib.TopResults().search(
|
||||
self.query, albums_only=True, in_quotes=in_quotes
|
||||
)
|
||||
return searchlib.TopResults().search(self.query, albums_only=True)
|
||||
|
||||
def get_top_results(
|
||||
self,
|
||||
limit: int,
|
||||
in_quotes=False,
|
||||
):
|
||||
finder = searchlib.TopResults()
|
||||
return finder.search(self.query, in_quotes=in_quotes, limit=limit)
|
||||
return finder.search(self.query, limit=limit)
|
||||
|
||||
|
||||
class SearchQuery(BaseModel):
|
||||
class SearchQuery(GenericLimitSchema):
|
||||
q: str = Field(description="The search query", example=Defaults.API_ARTISTNAME)
|
||||
start: int = Field(description="The index to start from", default=0, example=0)
|
||||
|
||||
|
||||
@api.get("/tracks")
|
||||
@@ -77,11 +64,7 @@ def search_tracks(query: SearchQuery):
|
||||
"""
|
||||
Search tracks
|
||||
"""
|
||||
|
||||
query = query.q
|
||||
in_quotes = query_in_quotes(query)
|
||||
|
||||
tracks = Search(query).search_tracks(in_quotes)
|
||||
tracks = Search(query.q).search_tracks()
|
||||
|
||||
return {
|
||||
"tracks": tracks[:SEARCH_COUNT],
|
||||
@@ -90,15 +73,13 @@ def search_tracks(query: SearchQuery):
|
||||
|
||||
|
||||
@api.get("/albums")
|
||||
def search_albums(query: SearchQuery):
|
||||
def search_albums(
|
||||
query: SearchQuery,
|
||||
):
|
||||
"""
|
||||
Search albums.
|
||||
"""
|
||||
|
||||
query = query.q
|
||||
in_quotes = query_in_quotes(query)
|
||||
|
||||
albums = Search(query).search_albums(in_quotes)
|
||||
albums = Search(query.q).search_albums()
|
||||
|
||||
return {
|
||||
"albums": albums[:SEARCH_COUNT],
|
||||
@@ -111,13 +92,10 @@ def search_artists(query: SearchQuery):
|
||||
"""
|
||||
Search artists.
|
||||
"""
|
||||
|
||||
query = query.q
|
||||
|
||||
if not query:
|
||||
if not query.q:
|
||||
return {"error": "No query provided"}, 400
|
||||
|
||||
artists = Search(query).search_artists()
|
||||
artists = Search(query.q).search_artists()
|
||||
|
||||
return {
|
||||
"artists": artists[:SEARCH_COUNT],
|
||||
@@ -138,15 +116,10 @@ def get_top_results(query: TopResultsQuery):
|
||||
|
||||
Returns the top results for the given query.
|
||||
"""
|
||||
limit = query.limit
|
||||
query = query.q
|
||||
|
||||
in_quotes = query_in_quotes(query)
|
||||
|
||||
if not query:
|
||||
if not query.q:
|
||||
return {"error": "No query provided"}, 400
|
||||
|
||||
return Search(query).get_top_results(in_quotes=in_quotes, limit=limit)
|
||||
return Search(query.q).get_top_results(limit=query.limit)
|
||||
|
||||
|
||||
class SearchLoadMoreQuery(SearchQuery):
|
||||
@@ -166,17 +139,16 @@ def search_load_more(query: SearchLoadMoreQuery):
|
||||
query = query.q
|
||||
item_type = query.type
|
||||
index = query.index
|
||||
in_quotes = query_in_quotes(query)
|
||||
|
||||
if item_type == "tracks":
|
||||
t = Search(query).search_tracks(in_quotes)
|
||||
t = Search(query).search_tracks()
|
||||
return {
|
||||
"tracks": t[index : index + SEARCH_COUNT],
|
||||
"more": len(t) > index + SEARCH_COUNT,
|
||||
}
|
||||
|
||||
elif item_type == "albums":
|
||||
a = Search(query).search_albums(in_quotes)
|
||||
a = Search(query).search_albums()
|
||||
return {
|
||||
"albums": a[index : index + SEARCH_COUNT],
|
||||
"more": len(a) > index + SEARCH_COUNT,
|
||||
|
||||
+1
-1
@@ -73,7 +73,7 @@ def send_file_as_chunks(filepath: str, audio_type: str) -> Response:
|
||||
"""
|
||||
# NOTE: +1 makes sure the last byte is included in the range.
|
||||
# NOTE: -1 is used to convert the end index to a 0-based index.
|
||||
chunk_size = 1024 * 360 # 360 KB
|
||||
chunk_size = 1024 * 512 # 0.5MB
|
||||
|
||||
# Get file size
|
||||
file_size = os.path.getsize(filepath)
|
||||
|
||||
+50
-20
@@ -14,7 +14,7 @@ from app.models import Album as AlbumModel
|
||||
from app.utils.remove_duplicates import remove_duplicates
|
||||
from app.db import engine
|
||||
|
||||
from sqlalchemy import JSON, Boolean, Integer, String, and_, delete, select, update
|
||||
from sqlalchemy import JSON, Boolean, Integer, String, delete, select, update
|
||||
from sqlalchemy.orm import Mapped, mapped_column, DeclarativeBase
|
||||
|
||||
|
||||
@@ -32,16 +32,26 @@ class Base(MasterBase, DeclarativeBase):
|
||||
@classmethod
|
||||
def get_all_hashes(cls, create_date: int | None = None):
|
||||
with DbManager() as conn:
|
||||
if cls.__tablename__ == "track":
|
||||
stmt = select(TrackTable.trackhash).where(cls.last_mod < create_date)
|
||||
elif cls.__tablename__ == "album":
|
||||
stmt = select(AlbumTable.albumhash).where(
|
||||
cls.created_date < create_date
|
||||
)
|
||||
elif cls.__tablename__ == "artist":
|
||||
stmt = select(ArtistTable.artisthash).where(
|
||||
cls.created_date < create_date
|
||||
)
|
||||
if create_date:
|
||||
if cls.__tablename__ == "track":
|
||||
stmt = select(TrackTable.trackhash).where(
|
||||
cls.last_mod < create_date
|
||||
)
|
||||
elif cls.__tablename__ == "album":
|
||||
stmt = select(AlbumTable.albumhash).where(
|
||||
cls.created_date < create_date
|
||||
)
|
||||
elif cls.__tablename__ == "artist":
|
||||
stmt = select(ArtistTable.artisthash).where(
|
||||
cls.created_date < create_date
|
||||
)
|
||||
else:
|
||||
if cls.__tablename__ == "track":
|
||||
stmt = select(TrackTable.trackhash)
|
||||
elif cls.__tablename__ == "album":
|
||||
stmt = select(AlbumTable.albumhash)
|
||||
elif cls.__tablename__ == "artist":
|
||||
stmt = select(ArtistTable.artisthash)
|
||||
|
||||
result = conn.execute(stmt)
|
||||
return {row[0] for row in result.fetchall()}
|
||||
@@ -135,7 +145,9 @@ class TrackTable(Base):
|
||||
def get_tracks_by_filepaths(cls, filepaths: list[str]):
|
||||
with DbManager() as conn:
|
||||
result = conn.execute(
|
||||
select(TrackTable).where(TrackTable.filepath.in_(filepaths))
|
||||
select(TrackTable)
|
||||
.where(TrackTable.filepath.in_(filepaths))
|
||||
.order_by(TrackTable.last_mod)
|
||||
)
|
||||
return tracks_to_dataclasses(result.fetchall())
|
||||
|
||||
@@ -155,10 +167,8 @@ class TrackTable(Base):
|
||||
result = conn.execute(
|
||||
select(TrackTable)
|
||||
.where(
|
||||
and_(
|
||||
TrackTable.trackhash == hash,
|
||||
TrackTable.filepath == filepath,
|
||||
)
|
||||
(TrackTable.trackhash == hash)
|
||||
& (TrackTable.filepath == filepath),
|
||||
)
|
||||
.order_by(TrackTable.bitrate.desc())
|
||||
)
|
||||
@@ -194,9 +204,18 @@ class TrackTable(Base):
|
||||
def get_tracks_by_trackhashes(cls, hashes: Iterable[str], limit: int | None = None):
|
||||
with DbManager() as conn:
|
||||
result = conn.execute(
|
||||
select(TrackTable).where(TrackTable.trackhash.in_(hashes)).limit(limit)
|
||||
select(TrackTable)
|
||||
.where(TrackTable.trackhash.in_(hashes))
|
||||
.group_by(TrackTable.trackhash)
|
||||
.limit(limit)
|
||||
)
|
||||
return tracks_to_dataclasses(result.fetchall())
|
||||
tracks = tracks_to_dataclasses(result.fetchall())
|
||||
|
||||
# order the tracks in the same order as the hashes
|
||||
if type(hashes) == list:
|
||||
return sorted(tracks, key=lambda x: hashes.index(x.trackhash))
|
||||
|
||||
return tracks
|
||||
|
||||
@classmethod
|
||||
def get_recently_added(cls, start: int, limit: int):
|
||||
@@ -212,7 +231,12 @@ class TrackTable(Base):
|
||||
|
||||
@classmethod
|
||||
def get_recently_played(cls, limit: int):
|
||||
result = cls.execute(select(cls).order_by(cls.lastplayed.desc()).limit(limit))
|
||||
result = cls.execute(
|
||||
select(cls)
|
||||
.group_by(cls.trackhash)
|
||||
.order_by(cls.lastplayed.desc())
|
||||
.limit(limit)
|
||||
)
|
||||
return tracks_to_dataclasses(result.fetchall())
|
||||
|
||||
@classmethod
|
||||
@@ -276,7 +300,13 @@ class AlbumTable(Base):
|
||||
result = conn.execute(
|
||||
select(AlbumTable).where(AlbumTable.albumhash.in_(hashes)).limit(limit)
|
||||
)
|
||||
return albums_to_dataclasses(result.fetchall())
|
||||
albums = albums_to_dataclasses(result.fetchall())
|
||||
|
||||
# order the albums in the same order as the hashes
|
||||
if type(hashes) == list:
|
||||
return sorted(albums, key=lambda x: hashes.index(x.albumhash))
|
||||
|
||||
return albums
|
||||
|
||||
@classmethod
|
||||
def get_albums_by_artisthashes(cls, artisthashes: list[str]):
|
||||
|
||||
+50
-14
@@ -1,7 +1,7 @@
|
||||
import datetime
|
||||
import enum
|
||||
from shlex import join
|
||||
from typing import Any
|
||||
from flask_jwt_extended import current_user
|
||||
from sqlalchemy import (
|
||||
JSON,
|
||||
Boolean,
|
||||
@@ -313,36 +313,41 @@ class PlaylistTable(Base):
|
||||
|
||||
@classmethod
|
||||
def append_to_playlist(cls, id: int, trackhashes: list[str]):
|
||||
print("type(trackhashes):", type(trackhashes))
|
||||
dbtrackhashes = cls.get_trackhashes(id)
|
||||
if not dbtrackhashes:
|
||||
dbtrackhashes = []
|
||||
|
||||
return cls.execute(
|
||||
update(cls)
|
||||
.where((cls.id == id) & (cls.userid == get_current_userid()))
|
||||
.values(trackhashes=cls.trackhashes + trackhashes),
|
||||
.values(trackhashes=dbtrackhashes + trackhashes),
|
||||
commit=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def remove_from_playlist(cls, id: int, trackhashes: list[dict[str, Any]]):
|
||||
# CHECKPOINT: Properly remove tracks from a playlist
|
||||
# Without messing up the order in case of duplicates
|
||||
tracks = cls.execute(
|
||||
def get_trackhashes(cls, id: int) -> list[str]:
|
||||
result = cls.execute(
|
||||
select(cls.trackhashes).where(
|
||||
(cls.id == id) & (cls.userid == get_current_userid())
|
||||
)
|
||||
)
|
||||
result = result.fetchone()
|
||||
if result:
|
||||
return result[0]
|
||||
|
||||
results = tracks.fetchone()
|
||||
if results:
|
||||
dbhashes: list[str] = results[0]
|
||||
|
||||
@classmethod
|
||||
def remove_from_playlist(cls, id: int, trackhashes: list[dict[str, Any]]):
|
||||
# INFO: Get db trackhashes
|
||||
dbtrackhashes = cls.get_trackhashes(id)
|
||||
if dbtrackhashes:
|
||||
for item in trackhashes:
|
||||
if dbhashes.index(item["trackhash"]) == item["index"]:
|
||||
dbhashes.remove(item["trackhash"])
|
||||
if dbtrackhashes.index(item["trackhash"]) == item["index"]:
|
||||
dbtrackhashes.remove(item["trackhash"])
|
||||
|
||||
return cls.execute(
|
||||
update(cls)
|
||||
.where((cls.id == id) & (cls.userid == get_current_userid()))
|
||||
.values(trackhashes=dbhashes),
|
||||
.values(trackhashes=dbtrackhashes),
|
||||
commit=True,
|
||||
)
|
||||
|
||||
@@ -381,3 +386,34 @@ class PlaylistTable(Base):
|
||||
.values(image=None),
|
||||
commit=True,
|
||||
)
|
||||
|
||||
|
||||
# class PlaylistTrackTable(Base):
|
||||
# __tablename__ = "playlisttrack"
|
||||
|
||||
# id: Mapped[int] = mapped_column(primary_key=True)
|
||||
# trackhash: Mapped[str] = mapped_column(String(), index=True)
|
||||
# playlistid: Mapped[int] = mapped_column(
|
||||
# Integer(), ForeignKey("playlist.id", ondelete="cascade")
|
||||
# )
|
||||
# index: Mapped[int] = mapped_column(Integer())
|
||||
# userid: Mapped[int] = mapped_column(
|
||||
# Integer(), ForeignKey("user.id", ondelete="cascade")
|
||||
# )
|
||||
|
||||
# @classmethod
|
||||
# def count_by_playlist()
|
||||
|
||||
# @classmethod
|
||||
# def insert_many(cls, playlistid: int, trackhashes: list[str]):
|
||||
# userid = get_current_userid()
|
||||
# items = [
|
||||
# {
|
||||
# "index": index,
|
||||
# "userid": userid,
|
||||
# "trackhash": trackhash,
|
||||
# "playlistid": playlistid,
|
||||
# }
|
||||
# for index, trackhash in enumerate(trackhashes)
|
||||
# ]
|
||||
# return cls.execute(insert(cls).values(items), commit=True)
|
||||
|
||||
@@ -215,14 +215,6 @@ def get_recently_played(limit=7):
|
||||
return items
|
||||
|
||||
|
||||
def get_recently_played_tracks(limit: int):
|
||||
# records = db.get_recently_played(start=0, limit=limit)
|
||||
# last_updated = records[0].timestamp
|
||||
# tracks = TrackStore.get_tracks_by_trackhashes([r.trackhash for r in records])
|
||||
# return tracks, last_updated
|
||||
return TrackTable.get_recently_played(limit)
|
||||
|
||||
|
||||
def get_recently_played_playlist(limit: int = 100):
|
||||
playlist = Playlist(
|
||||
id="recentlyplayed",
|
||||
@@ -233,12 +225,12 @@ def get_recently_played_playlist(limit: int = 100):
|
||||
trackhashes=[],
|
||||
)
|
||||
|
||||
tracks = get_recently_played_tracks(limit)
|
||||
tracks = TrackTable.get_recently_played(limit)
|
||||
date = datetime.fromtimestamp(tracks[0].lastplayed)
|
||||
playlist.last_updated = date_string_to_time_passed(create_new_date(date))
|
||||
|
||||
images = get_first_4_images(tracks=tracks)
|
||||
playlist.images = images
|
||||
playlist.set_count(len(tracks))
|
||||
playlist.count = len(tracks)
|
||||
|
||||
return playlist, tracks
|
||||
|
||||
+38
-39
@@ -8,16 +8,21 @@ from rapidfuzz import process, utils
|
||||
from unidecode import unidecode
|
||||
|
||||
from app import models
|
||||
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
|
||||
from app.config import UserConfig
|
||||
from app.db.libdata import AlbumTable, ArtistTable, TrackTable
|
||||
|
||||
# from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
|
||||
from app.models.enums import FavType
|
||||
from app.models.track import Track
|
||||
from app.serializers.album import serialize_for_card as serialize_album
|
||||
from app.serializers.album import serialize_for_card_many as serialize_albums
|
||||
from app.serializers.artist import serialize_for_cards
|
||||
from app.serializers.track import serialize_track, serialize_tracks
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.artists import ArtistStore
|
||||
from app.store.tracks import TrackStore
|
||||
|
||||
# from app.store.albums import AlbumStore
|
||||
# from app.store.artists import ArtistStore
|
||||
# from app.store.tracks import TrackStore
|
||||
|
||||
from app.utils.remove_duplicates import remove_duplicates
|
||||
|
||||
# ratio = fuzz.ratio
|
||||
@@ -49,7 +54,8 @@ class Limit:
|
||||
class SearchTracks:
|
||||
def __init__(self, query: str) -> None:
|
||||
self.query = query
|
||||
self.tracks = TrackStore.tracks
|
||||
# self.tracks = TrackStore.tracks
|
||||
self.tracks = TrackTable.get_all()
|
||||
|
||||
def __call__(self) -> List[models.Track]:
|
||||
"""
|
||||
@@ -72,7 +78,8 @@ class SearchTracks:
|
||||
class SearchArtists:
|
||||
def __init__(self, query: str) -> None:
|
||||
self.query = query
|
||||
self.artists = ArtistStore.artists
|
||||
# self.artists = ArtistStore.artists
|
||||
self.artists = ArtistTable.get_all()
|
||||
|
||||
def __call__(self):
|
||||
"""
|
||||
@@ -94,7 +101,8 @@ class SearchArtists:
|
||||
class SearchAlbums:
|
||||
def __init__(self, query: str) -> None:
|
||||
self.query = query
|
||||
self.albums = AlbumStore.albums
|
||||
# self.albums = AlbumStore.albums
|
||||
self.albums = AlbumTable.get_all()
|
||||
|
||||
def __call__(self) -> List[models.Album]:
|
||||
"""
|
||||
@@ -137,7 +145,7 @@ _S2 = TypeVar("_S2")
|
||||
_ResultType = int | float
|
||||
|
||||
|
||||
def get_titles(items: _type):
|
||||
def get_titles(items: list[_type]):
|
||||
for item in items:
|
||||
if isinstance(item, models.Track):
|
||||
text = item.og_title
|
||||
@@ -161,9 +169,9 @@ class TopResults:
|
||||
def collect_all():
|
||||
all_items: list[_type] = []
|
||||
|
||||
all_items.extend(ArtistStore.artists)
|
||||
all_items.extend(TrackStore.tracks)
|
||||
all_items.extend(AlbumStore.albums)
|
||||
all_items.extend(ArtistTable.get_all())
|
||||
all_items.extend(TrackTable.get_all())
|
||||
all_items.extend(AlbumTable.get_all())
|
||||
|
||||
return all_items, get_titles(all_items)
|
||||
|
||||
@@ -186,22 +194,16 @@ class TopResults:
|
||||
return {"type": "track", "item": item}
|
||||
|
||||
if isinstance(item, models.Album):
|
||||
tracks = TrackStore.get_tracks_by_albumhash(item.albumhash)
|
||||
tracks = TrackTable.get_tracks_by_albumhash(item.albumhash)
|
||||
tracks = remove_duplicates(tracks)
|
||||
|
||||
item.get_date_from_tracks(tracks)
|
||||
try:
|
||||
item.duration = sum((t.duration for t in tracks))
|
||||
except AttributeError:
|
||||
item.duration = 0
|
||||
|
||||
item.is_single(tracks)
|
||||
|
||||
if not item.is_single:
|
||||
item.check_type()
|
||||
|
||||
item.is_favorite = favdb.check_is_favorite(
|
||||
item.albumhash, fav_type=FavType.album
|
||||
item.check_type(
|
||||
tracks, singleTrackAsSingle=UserConfig().showAlbumsAsSingles
|
||||
)
|
||||
|
||||
return {"type": "album", "item": item}
|
||||
@@ -210,15 +212,18 @@ class TopResults:
|
||||
track_count = 0
|
||||
duration = 0
|
||||
|
||||
for track in TrackStore.get_tracks_by_artisthash(item.artisthash):
|
||||
tracks = TrackTable.get_tracks_by_artisthash(item.artisthash)
|
||||
tracks = remove_duplicates(tracks)
|
||||
|
||||
for track in tracks:
|
||||
track_count += 1
|
||||
duration += track.duration
|
||||
|
||||
album_count = AlbumStore.count_albums_by_artisthash(item.artisthash)
|
||||
# album_count = AlbumStore.count_albums_by_artisthash(item.artisthash)
|
||||
|
||||
item.set_trackcount(track_count)
|
||||
item.set_albumcount(album_count)
|
||||
item.set_duration(duration)
|
||||
# item.set_trackcount(track_count)
|
||||
# item.set_albumcount(album_count)
|
||||
# item.set_duration(duration)
|
||||
|
||||
return {"type": "artist", "item": item}
|
||||
|
||||
@@ -230,7 +235,8 @@ class TopResults:
|
||||
tracks.extend(SearchTracks(query)())
|
||||
|
||||
if item["type"] == "album":
|
||||
t = TrackStore.get_tracks_by_albumhash(item["item"].albumhash)
|
||||
t = TrackTable.get_tracks_by_albumhash(item["item"].albumhash)
|
||||
# t = TrackStore.get_tracks_by_albumhash(item["item"].albumhash)
|
||||
t.sort(key=lambda x: x.last_mod)
|
||||
|
||||
# if there are less than the limit, get more tracks
|
||||
@@ -242,7 +248,8 @@ class TopResults:
|
||||
tracks.extend(t)
|
||||
|
||||
if item["type"] == "artist":
|
||||
t = TrackStore.get_tracks_by_artisthash(item["item"].artisthash)
|
||||
# t = TrackStore.get_tracks_by_artisthash(item["item"].artisthash)
|
||||
t = TrackTable.get_tracks_by_artisthash(item["item"].artisthash)
|
||||
|
||||
# if there are less than the limit, get more tracks
|
||||
if len(t) < limit:
|
||||
@@ -263,7 +270,8 @@ class TopResults:
|
||||
return SearchAlbums(query)()[:limit]
|
||||
|
||||
if item["type"] == "artist":
|
||||
albums = AlbumStore.get_albums_by_artisthash(item["item"].artisthash)
|
||||
# albums = AlbumStore.get_albums_by_artisthash(item["item"].artisthash)
|
||||
albums = AlbumTable.get_albums_by_artisthash(item["item"].artisthash)
|
||||
|
||||
# if there are less than the limit, get more albums
|
||||
if len(albums) < limit:
|
||||
@@ -279,7 +287,6 @@ class TopResults:
|
||||
limit: int = None,
|
||||
albums_only=False,
|
||||
tracks_only=False,
|
||||
in_quotes=False,
|
||||
):
|
||||
items, titles = TopResults.collect_all()
|
||||
results = TopResults.get_results(titles, query)
|
||||
@@ -307,21 +314,13 @@ class TopResults:
|
||||
|
||||
result = TopResults.map_with_type(result)
|
||||
|
||||
if in_quotes:
|
||||
top_tracks = SearchTracks(query)()[:tracks_limit]
|
||||
else:
|
||||
top_tracks = TopResults.get_track_items(result, query, limit=tracks_limit)
|
||||
|
||||
top_tracks = TopResults.get_track_items(result, query, limit=tracks_limit)
|
||||
top_tracks = serialize_tracks(top_tracks)
|
||||
|
||||
if tracks_only:
|
||||
return top_tracks
|
||||
|
||||
if in_quotes:
|
||||
albums = SearchAlbums(query)()[:albums_limit]
|
||||
else:
|
||||
albums = TopResults.get_album_items(result, query, limit=albums_limit)
|
||||
|
||||
albums = TopResults.get_album_items(result, query, limit=albums_limit)
|
||||
albums = serialize_albums(albums)
|
||||
|
||||
if albums_only:
|
||||
|
||||
+11
-6
@@ -112,21 +112,26 @@ class Album:
|
||||
Runs all the checks to determine the type of album.
|
||||
"""
|
||||
if self.is_single(tracks, singleTrackAsSingle):
|
||||
return "single"
|
||||
self.type = "single"
|
||||
return
|
||||
|
||||
if self.is_soundtrack():
|
||||
return "soundtrack"
|
||||
self.type = "soundtrack"
|
||||
return
|
||||
|
||||
if self.is_live_album():
|
||||
return "live album"
|
||||
self.type = "live album"
|
||||
return
|
||||
|
||||
if self.is_compilation():
|
||||
return "compilation"
|
||||
self.type = "compilation"
|
||||
return
|
||||
|
||||
if self.is_ep():
|
||||
return "ep"
|
||||
self.type = "ep"
|
||||
return
|
||||
|
||||
return "album"
|
||||
self.type = "album"
|
||||
|
||||
def is_soundtrack(self) -> bool:
|
||||
"""
|
||||
|
||||
+6
-12
@@ -5,6 +5,7 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from app import settings
|
||||
from app.utils.auth import get_current_userid
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
@@ -16,10 +17,10 @@ class Playlist:
|
||||
last_updated: str
|
||||
name: str
|
||||
settings: dict
|
||||
userid: int
|
||||
trackhashes: list[str]
|
||||
trackhashes: list[str] = dataclasses.field(default_factory=list)
|
||||
extra: dict[str, Any] = dataclasses.field(default_factory=dict)
|
||||
|
||||
userid: int | None = None
|
||||
thumb: str = ""
|
||||
count: int = 0
|
||||
duration: int = 0
|
||||
@@ -28,11 +29,10 @@ class Playlist:
|
||||
pinned: bool = False
|
||||
|
||||
def __post_init__(self):
|
||||
# self.trackhashes = json.loads(str(self.trackhashes))
|
||||
# self.count = len(self.trackhashes)
|
||||
self.count = len(self.trackhashes)
|
||||
|
||||
# if isinstance(self.settings, str):
|
||||
# self.settings = dict(json.loads(self.settings))
|
||||
if self.userid is None:
|
||||
self.userid = get_current_userid()
|
||||
|
||||
self.pinned = self.settings.get("pinned", False)
|
||||
self.has_image = (
|
||||
@@ -45,12 +45,6 @@ class Playlist:
|
||||
self.image = "None"
|
||||
self.thumb = "None"
|
||||
|
||||
# def set_duration(self, duration: int):
|
||||
# self.duration = duration
|
||||
|
||||
# def set_count(self, count: int):
|
||||
# self.count = count
|
||||
|
||||
def clear_lists(self):
|
||||
"""
|
||||
Removes data from lists to make it lighter for sending
|
||||
|
||||
+1
-1
@@ -126,7 +126,7 @@ class Defaults:
|
||||
SM_ARTIST_IMG_SIZE = 128
|
||||
MD_ARTIST_IMG_SIZE = 256
|
||||
|
||||
HASH_LENGTH = 10
|
||||
HASH_LENGTH = 16
|
||||
API_ALBUMHASH = "bfe300e966"
|
||||
API_ARTISTHASH = "cae59f1fc5"
|
||||
API_TRACKHASH = "0853280a12"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import locale
|
||||
from typing import TypeVar
|
||||
from typing import Iterable, TypeVar
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
@@ -16,5 +16,5 @@ def format_number(number: float) -> str:
|
||||
|
||||
|
||||
|
||||
def flatten(list_: list[list[T]]) -> list[T]:
|
||||
def flatten(list_: Iterable[list[T]]) -> list[T]:
|
||||
return [item for sublist in list_ for item in sublist]
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import hashlib
|
||||
import xxhash
|
||||
|
||||
from unidecode import unidecode
|
||||
|
||||
@@ -32,11 +33,12 @@ def create_hash(*args: str, decode=False, limit=10) -> str:
|
||||
str_ = unidecode(str_)
|
||||
|
||||
str_ = str_.encode("utf-8")
|
||||
str_ = hashlib.sha1(str_).hexdigest()
|
||||
return xxhash.xxh3_64(str_).hexdigest()
|
||||
# str_ = hashlib.sha1(str_).hexdigest()
|
||||
|
||||
# INFO: Return first 5 + last 5 characters
|
||||
return (
|
||||
str_[: limit // 2] + str_[-limit // 2 :]
|
||||
if limit % 2 == 0
|
||||
else str_[: limit // 2] + str_[-limit // 2 - 1 :]
|
||||
)
|
||||
# return (
|
||||
# str_[: limit // 2] + str_[-limit // 2 :]
|
||||
# if limit % 2 == 0
|
||||
# else str_[: limit // 2] + str_[-limit // 2 - 1 :]
|
||||
# )
|
||||
|
||||
Reference in New Issue
Block a user