implement saving mixes

+ add: get mixes
+ handle mixes on recently played
+ move modules around to fix circular deps
This commit is contained in:
cwilvx
2024-12-26 17:31:55 +03:00
parent 77485dd0a7
commit 98720466aa
14 changed files with 860 additions and 477 deletions
+1 -1
View File
@@ -4,7 +4,7 @@ from pydantic import BaseModel, Field
from app.api.apischemas import GenericLimitSchema
from app.lib.home.recentlyadded import get_recently_added_items
from app.lib.home.recentlyplayed import get_recently_played
from app.lib.home.get_recently_played import get_recently_played
from app.store.homepage import HomepageStore
bp_tag = Tag(name="Home", description="Homepage items")
+70 -25
View File
@@ -1,7 +1,9 @@
from typing import Literal
from flask_openapi3 import Tag
from flask_openapi3 import APIBlueprint
from pydantic import BaseModel, Field
from app.db.userdata import MixTable
from app.plugins.mixes import MixesPlugin
from app.store.homepage import HomepageStore
from app.store.tracks import TrackStore
@@ -13,37 +15,43 @@ api = APIBlueprint(
)
@api.post("/track")
def get_track_mix():
"""
Get a track mix
"""
mixes = MixesPlugin()
track = TrackStore.trackhashmap["9eeee292264ad01b"].get_best()
tracks = mixes.get_track_mix([track])
return {
"total": len(tracks),
"tracks": tracks,
}
class GetMixesBody(BaseModel):
mixtype: Literal["artists", "tracks"] = Field(description="The type of mix")
@api.post("/artist")
def get_artist_mix():
mixes = MixesPlugin()
# return mixes.create_artist_mixes()
# tracks = mixes.get_artist_mix("09306be8039b98ad")
@api.get("/<mixtype>")
def get_artist_mixes(path: GetMixesBody):
srcmixes = MixTable.get_all(with_userid=True)
mixes = []
# return {
# "total": len(tracks),
# "tracks": tracks,
# }
if path.mixtype == "artists":
mixes = [mix.to_dict(convert_timestamp=True) for mix in srcmixes]
elif path.mixtype == "tracks":
plugin = MixesPlugin()
return "hi"
for mix in srcmixes:
custom_mix = plugin.get_track_mix(mix)
if custom_mix:
mixes.append(custom_mix.to_dict(convert_timestamp=True))
seen_mixids = set()
# filter duplicates by trackshash
final_mixes = []
for mix in mixes:
# INFO: Ignore duplicates for artist mixes
if mix["id"] in seen_mixids and path.mixtype == "tracks":
continue
final_mixes.append(mix)
seen_mixids.add(mix["id"])
return final_mixes
class MixQuery(BaseModel):
mixid: str = Field(description="The mix id")
sourcehash: str = Field(description="The sourcehash of the mix")
@api.get("/")
@@ -58,8 +66,45 @@ def get_mix(query: MixQuery):
case _:
return {"msg": "Invalid mix ID"}, 400
mix = HomepageStore.get_mix(mixtype, query.mixid[1:])
if mix:
# INFO: Check if the mix is already in the homepage store
mix = HomepageStore.get_mix(mixtype, query.mixid)
if mix and mix["sourcehash"] == query.sourcehash:
return mix, 200
# INF0: Get the mix from the db
mix = MixTable.get_by_sourcehash(query.sourcehash)
if not mix:
return {"msg": "Mix not found"}, 404
if mixtype == "custom_mixes":
plugin = MixesPlugin()
mix = plugin.get_track_mix(mix)
if not mix:
return {"msg": "Mix not found"}, 404
return mix.to_full_dict(), 200
class SaveMixRequest(BaseModel):
mixid: str = Field(description="The id of the mix")
type: str = Field(description="The type of mix")
sourcehash: str = Field(description="The sourcehash of the mix")
@api.post("/save")
def save_mix(body: SaveMixRequest):
mix_type = body.type
mix_sourcehash = body.sourcehash
if mix_type == "artist":
state = MixTable.save_artist_mix(mix_sourcehash)
elif mix_type == "track":
state = MixTable.save_track_mix(mix_sourcehash)
mix = HomepageStore.find_mix(body.mixid)
if mix:
mix.saved = state
return {"msg": "Mixes saved"}, 200
+75 -2
View File
@@ -486,15 +486,22 @@ class MixTable(Base):
description: Mapped[str] = mapped_column(String())
timestamp: Mapped[int] = mapped_column(Integer())
sourcehash: Mapped[str] = mapped_column(String(), unique=True, index=True)
tracks: Mapped[list[str]] = mapped_column(JSON(), default_factory=list)
userid: Mapped[int] = mapped_column(
Integer(), ForeignKey("user.id", ondelete="cascade"), index=True
)
saved: Mapped[bool] = mapped_column(Boolean(), default=False)
tracks: Mapped[list[str]] = mapped_column(JSON(), default_factory=list)
extra: Mapped[dict[str, Any]] = mapped_column(
JSON(), nullable=True, default_factory=dict
)
@classmethod
def get_all(cls):
def get_all(cls, with_userid: bool = False):
if with_userid:
result = cls.execute(select(cls).where(cls.userid == get_current_userid()))
else:
result = cls.execute(select(cls))
return Mix.mixes_to_dataclasses(result.fetchall())
@classmethod
@@ -505,6 +512,13 @@ class MixTable(Base):
if res:
return Mix.mix_to_dataclass(res)
@classmethod
def get_by_mixid(cls, mixid: str):
result = cls.execute(select(cls).where(cls.mixid == mixid))
res = result.fetchone()
if res:
return Mix.mix_to_dataclass(res)
@classmethod
def insert_one(cls, mix: Mix):
mixdict = asdict(mix)
@@ -512,3 +526,62 @@ class MixTable(Base):
del mixdict["id"]
return cls.execute(insert(cls).values(mixdict), commit=True)
@classmethod
def update_one(cls, mixid: str, mix: Mix):
mixdict = asdict(mix)
mixdict["mixid"] = mix.id
del mixdict["id"]
return cls.execute(
update(cls)
.where(
and_(
cls.mixid == mixid,
cls.sourcehash == mix.sourcehash,
cls.userid == get_current_userid(),
)
)
.values(mixdict),
commit=True,
)
@classmethod
def save_artist_mix(cls, sourcehash: str):
"""
Toggles the saved status of an artist mix.
"""
mix = cls.get_by_sourcehash(sourcehash)
if not mix:
return False
mix.saved = not mix.saved
cls.update_one(mix.id, mix)
return mix.saved
@classmethod
def get_saved_track_mixes(cls):
"""
Return all mixes that have the extra.trackmix_saved set to True.
"""
result = cls.execute(select(cls).where(cls.extra.c.trackmix_saved == True))
return Mix.mixes_to_dataclasses(result.fetchall())
@classmethod
def save_track_mix(cls, sourcehash: str):
"""
Toggles the property extra.trackmix_saved to True.
"""
mix = cls.get_by_sourcehash(sourcehash)
if not mix:
return False
mix.extra["trackmix_saved"] = not mix.extra["trackmix_saved"]
cls.update_one(mix.id, mix)
return mix.extra["trackmix_saved"]
+303
View File
@@ -0,0 +1,303 @@
from app.store.albums import AlbumStore
from app.store.artists import ArtistStore
from app.store.folder import FolderStore
from app.store.tracks import TrackStore
from app.models.logger import TrackLog
from app.lib.playlistlib import get_first_4_images
from app.utils.dates import timestamp_to_time_passed
from app.lib.home.recentlyadded import get_recently_added_playlist
from app.db.userdata import FavoritesTable, MixTable, PlaylistTable
from app.lib.home.recentlyplayed import get_recently_played_playlist
from app.serializers.track import serialize_track
from app.serializers.album import album_serializer
from app.serializers.artist import serialize_for_card
from app.serializers.playlist import serialize_for_card as serialize_playlist
def create_items(entries: list[TrackLog], limit: int):
custom_playlists = [
{"name": "recentlyadded", "handler": get_recently_added_playlist},
{"name": "recentlyplayed", "handler": get_recently_played_playlist},
]
items = []
added = set()
for entry in entries:
if len(items) >= limit:
break
if entry.source in added:
continue
added.add(entry.source)
if entry.type == "mix":
if not entry.type_src:
continue
# INFO: Get mix from homepage store
from app.store.homepage import HomepageStore
mix = HomepageStore.find_mix(entry.type_src)
if not mix and entry.type_src.startswith("t"):
# mix is a track mix (not saved in the db)
continue
if not mix:
# INFO: Get mix from db
mix = MixTable.get_by_mixid(entry.type_src)
if not mix:
continue
items.append(
{
"type": "mix",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
)
continue
if entry.type == "album":
album = AlbumStore.albummap.get(entry.type_src)
if album is None:
continue
item = {
"type": "album",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
items.append(item)
continue
if entry.type == "artist":
artist = ArtistStore.artistmap.get(entry.type_src)
if artist is None:
continue
items.append(
{
"type": "artist",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
)
continue
if entry.type == "folder":
folder = entry.type_src
if not folder:
continue
if not folder.endswith("/"):
folder += "/"
is_home_dir = entry.type_src == "$home"
if is_home_dir:
folder = os.path.expanduser("~")
item = {
"type": "folder",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
items.append(item)
continue
if entry.type == "playlist":
is_custom = entry.type_src in [i["name"] for i in custom_playlists]
if is_custom:
items.append(
{
"type": "playlist",
"hash": entry.type_src,
"timestamp": entry.timestamp,
"is_custom": True,
}
)
continue
playlist = PlaylistTable.get_by_id(entry.type_src)
if playlist is None:
continue
item = {
"type": "playlist",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
items.append(item)
continue
if entry.type == "favorite":
items.append(
{
"type": "favorite",
"timestamp": entry.timestamp,
}
)
continue
t = TrackStore.trackhashmap.get(entry.trackhash)
if t is None:
continue
item = {
"type": "track",
"hash": entry.trackhash,
"timestamp": entry.timestamp,
}
items.append(item)
return items
def recover_items(items: list[dict]):
custom_playlists = [
{"name": "recentlyadded", "handler": get_recently_added_playlist},
{"name": "recentlyplayed", "handler": get_recently_played_playlist},
]
recovered = []
for item in items:
recovered_item = None
if item["type"] == "album":
album = AlbumStore.get_album_by_hash(item["hash"])
if album is None:
continue
album = album_serializer(
album,
to_remove={
"genres",
"date",
"count",
"duration",
"albumartists_hashes",
"og_title",
},
)
recovered_item = {
"type": "album",
"item": album,
}
elif item["type"] == "artist":
artist = ArtistStore.get_artist_by_hash(item["hash"])
if artist is None:
continue
recovered_item = {
"type": "artist",
"item": serialize_for_card(artist),
}
elif item["type"] == "folder":
count = FolderStore.count_tracks_containing_paths([item["hash"]])
recovered_item = {
"type": "folder",
"item": {
"path": item["hash"],
"count": count[0]["trackcount"],
},
}
elif item["type"] == "playlist":
if item.get("is_custom"):
playlist, _ = next(
i["handler"]()
for i in custom_playlists
if i["name"] == item["hash"]
)
playlist.images = [i["image"] for i in playlist.images]
playlist = serialize_playlist(
playlist, to_remove={"settings", "duration"}
)
recovered_item = {
"type": "playlist",
"item": playlist,
}
else:
playlist = PlaylistTable.get_by_id(item["hash"])
if playlist is None:
continue
tracks = TrackStore.get_tracks_by_trackhashes(playlist.trackhashes)
playlist.clear_lists()
if not playlist.has_image:
images = get_first_4_images(tracks)
images = [i["image"] for i in images]
playlist.images = images
recovered_item = {
"type": "playlist",
"item": serialize_playlist(playlist),
}
elif item["type"] == "favorite":
recovered_item = {
"type": "favorite",
"item": {
"count": FavoritesTable.count(),
},
}
elif item["type"] == "track":
track = TrackStore.trackhashmap.get(item["hash"])
if track is None:
continue
recovered_item = {
"type": "track",
"item": serialize_track(track.get_best()),
}
elif item["type"] == "mix":
from app.store.homepage import HomepageStore
mix = HomepageStore.find_mix(item["hash"])
if mix is None:
mix = MixTable.get_by_mixid(item["hash"])
if mix is None:
continue
mix = mix.to_dict()
recovered_item = {
"type": "mix",
"item": mix,
}
if recovered_item is not None:
helptext = item.get("help_text") or item.get("type")
secondary_text = item.get("secondary_text")
if "secondary_text" in item:
secondary_text = item["secondary_text"]
elif "timestamp" in item:
secondary_text = timestamp_to_time_passed(item["timestamp"])
if helptext:
recovered_item["item"]["help_text"] = helptext
if secondary_text:
recovered_item["item"]["time"] = secondary_text
recovered.append(recovered_item)
return recovered
+158
View File
@@ -0,0 +1,158 @@
from app.db.userdata import MixTable, PlaylistTable
from app.lib.home.recentlyadded import get_recently_added_playlist
from app.lib.home.recentlyplayed import get_recently_played_playlist
from app.models.logger import TrackLog
from app.store.albums import AlbumStore
from app.store.artists import ArtistStore
from app.store.homepage import HomepageStore
from app.store.tracks import TrackStore
def create_items(entries: list[TrackLog], limit: int):
custom_playlists = [
{"name": "recentlyadded", "handler": get_recently_added_playlist},
{"name": "recentlyplayed", "handler": get_recently_played_playlist},
]
items = []
added = set()
for entry in entries:
if len(items) >= limit:
break
if entry.source in added:
continue
added.add(entry.source)
if entry.type == "mix":
if not entry.type_src:
continue
# INFO: Get mix from homepage store
mix = HomepageStore.find_mix(entry.type_src)
if not mix and entry.type_src.startswith("t"):
# mix is a track mix (not saved in the db)
continue
if not mix:
# INFO: Get mix from db
mix = MixTable.get_by_mixid(entry.type_src)
if not mix:
continue
items.append(
{
"type": "mix",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
)
continue
if entry.type == "album":
album = AlbumStore.albummap.get(entry.type_src)
if album is None:
continue
item = {
"type": "album",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
items.append(item)
continue
if entry.type == "artist":
artist = ArtistStore.artistmap.get(entry.type_src)
if artist is None:
continue
items.append(
{
"type": "artist",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
)
continue
if entry.type == "folder":
folder = entry.type_src
if not folder:
continue
if not folder.endswith("/"):
folder += "/"
is_home_dir = entry.type_src == "$home"
if is_home_dir:
folder = os.path.expanduser("~")
item = {
"type": "folder",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
items.append(item)
continue
if entry.type == "playlist":
is_custom = entry.type_src in [i["name"] for i in custom_playlists]
if is_custom:
items.append(
{
"type": "playlist",
"hash": entry.type_src,
"timestamp": entry.timestamp,
"is_custom": True,
}
)
continue
playlist = PlaylistTable.get_by_id(entry.type_src)
if playlist is None:
continue
item = {
"type": "playlist",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
items.append(item)
continue
if entry.type == "favorite":
items.append(
{
"type": "favorite",
"timestamp": entry.timestamp,
}
)
continue
t = TrackStore.trackhashmap.get(entry.trackhash)
if t is None:
continue
item = {
"type": "track",
"hash": entry.trackhash,
"timestamp": entry.timestamp,
}
items.append(item)
return items
+47
View File
@@ -0,0 +1,47 @@
from app.db.userdata import ScrobbleTable
from app.lib.home.create_items import create_items
from app.models.logger import TrackLog
def get_recently_played(
limit: int, userid: int | None = None, _entries: list[TrackLog] = []
):
"""
Get the recently played items for the homepage.
Pass a list of track log entries to use a subset of the scrobble table.
"""
# TODO: Paginate this
items = []
BATCH_SIZE = 200
current_index = 0
if len(_entries):
entries = _entries
limit = 1
else:
entries = ScrobbleTable.get_all(0, BATCH_SIZE, userid=userid)
max_iterations = 20
iterations = 0
while len(items) < limit and iterations < max_iterations:
items.extend(create_items(entries, limit))
current_index += BATCH_SIZE
if len(items) < limit:
entries = ScrobbleTable.get_all(
start=current_index + 1, limit=BATCH_SIZE, userid=userid
)
if not entries:
break
iterations += 1
if iterations == max_iterations:
print(
f"Warning: Reached maximum iterations ({max_iterations}) while fetching recently played items"
)
return items
-294
View File
@@ -1,307 +1,13 @@
from datetime import datetime
import os
from app.db.userdata import FavoritesTable, PlaylistTable, ScrobbleTable
from app.models.logger import TrackLog
from app.models.playlist import Playlist
from app.serializers.track import serialize_track
from app.serializers.album import album_serializer
from app.lib.playlistlib import get_first_4_images
from app.store.folder import FolderStore
from app.utils.dates import (
create_new_date,
date_string_to_time_passed,
timestamp_to_time_passed,
)
from app.serializers.artist import serialize_for_card
from app.serializers.playlist import serialize_for_card as serialize_playlist
from app.lib.home.recentlyadded import get_recently_added_playlist
from app.store.albums import AlbumStore
from app.store.tracks import TrackStore
from app.store.artists import ArtistStore
def get_recently_played(
limit: int, userid: int | None = None, _entries: list[TrackLog] = []
):
"""
Get the recently played items for the homepage.
Pass a list of track log entries to use a subset of the scrobble table.
"""
# TODO: Paginate this
items = []
added = set()
custom_playlists = [
{"name": "recentlyadded", "handler": get_recently_added_playlist},
{"name": "recentlyplayed", "handler": get_recently_played_playlist},
]
def create_items(entries: list[TrackLog]):
for entry in entries:
if len(items) >= limit:
break
if entry.source in added:
continue
added.add(entry.source)
if entry.type == "album":
album = AlbumStore.albummap.get(entry.type_src)
if album is None:
continue
item = {
"type": "album",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
items.append(item)
continue
if entry.type == "artist":
artist = ArtistStore.artistmap.get(entry.type_src)
if artist is None:
continue
items.append(
{
"type": "artist",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
)
continue
if entry.type == "folder":
folder = entry.type_src
if not folder:
continue
if not folder.endswith("/"):
folder += "/"
is_home_dir = entry.type_src == "$home"
if is_home_dir:
folder = os.path.expanduser("~")
item = {
"type": "folder",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
items.append(item)
continue
if entry.type == "playlist":
is_custom = entry.type_src in [i["name"] for i in custom_playlists]
if is_custom:
items.append(
{
"type": "playlist",
"hash": entry.type_src,
"timestamp": entry.timestamp,
"is_custom": True,
}
)
continue
playlist = PlaylistTable.get_by_id(entry.type_src)
if playlist is None:
continue
item = {
"type": "playlist",
"hash": entry.type_src,
"timestamp": entry.timestamp,
}
items.append(item)
continue
if entry.type == "favorite":
items.append(
{
"type": "favorite",
"timestamp": entry.timestamp,
}
)
continue
t = TrackStore.trackhashmap.get(entry.trackhash)
if t is None:
continue
item = {
"type": "track",
"hash": entry.trackhash,
"timestamp": entry.timestamp,
}
items.append(item)
BATCH_SIZE = 200
current_index = 0
if len(_entries):
entries = _entries
limit = 1
else:
entries = ScrobbleTable.get_all(0, BATCH_SIZE, userid=userid)
max_iterations = 20
iterations = 0
while len(items) < limit and iterations < max_iterations:
create_items(entries)
current_index += BATCH_SIZE
if len(items) < limit:
entries = ScrobbleTable.get_all(
start=current_index + 1, limit=BATCH_SIZE, userid=userid
)
if not entries:
break
iterations += 1
if iterations == max_iterations:
print(
f"Warning: Reached maximum iterations ({max_iterations}) while fetching recently played items"
)
return items
def recover_items(items: list[dict]):
custom_playlists = [
{"name": "recentlyadded", "handler": get_recently_added_playlist},
{"name": "recentlyplayed", "handler": get_recently_played_playlist},
]
recovered = []
for item in items:
recovered_item = None
if item["type"] == "album":
album = AlbumStore.get_album_by_hash(item["hash"])
if album is None:
continue
album = album_serializer(
album,
to_remove={
"genres",
"date",
"count",
"duration",
"albumartists_hashes",
"og_title",
},
)
recovered_item = {
"type": "album",
"item": album,
}
elif item["type"] == "artist":
artist = ArtistStore.get_artist_by_hash(item["hash"])
if artist is None:
continue
recovered_item = {
"type": "artist",
"item": serialize_for_card(artist),
}
elif item["type"] == "folder":
count = FolderStore.count_tracks_containing_paths([item["hash"]])
recovered_item = {
"type": "folder",
"item": {
"path": item["hash"],
"count": count[0]["trackcount"],
},
}
elif item["type"] == "playlist":
if item.get("is_custom"):
playlist, _ = next(
i["handler"]()
for i in custom_playlists
if i["name"] == item["hash"]
)
playlist.images = [i["image"] for i in playlist.images]
playlist = serialize_playlist(
playlist, to_remove={"settings", "duration"}
)
recovered_item = {
"type": "playlist",
"item": playlist,
}
else:
playlist = PlaylistTable.get_by_id(item["hash"])
if playlist is None:
continue
tracks = TrackStore.get_tracks_by_trackhashes(playlist.trackhashes)
playlist.clear_lists()
if not playlist.has_image:
images = get_first_4_images(tracks)
images = [i["image"] for i in images]
playlist.images = images
recovered_item = {
"type": "playlist",
"item": serialize_playlist(playlist),
}
elif item["type"] == "favorite":
recovered_item = {
"type": "favorite",
"item": {
"count": FavoritesTable.count(),
},
}
elif item["type"] == "track":
track = TrackStore.trackhashmap.get(item["hash"])
if track is None:
continue
recovered_item = {
"type": "track",
"item": serialize_track(track.get_best()),
}
if recovered_item is not None:
helptext = item.get("help_text") or item.get("type")
secondary_text = item.get("secondary_text")
if "secondary_text" in item:
secondary_text = item["secondary_text"]
elif "timestamp" in item:
secondary_text = timestamp_to_time_passed(item["timestamp"])
if helptext:
recovered_item["item"]["help_text"] = helptext
if secondary_text:
recovered_item["item"]["time"] = secondary_text
recovered.append(recovered_item)
return recovered
def get_recently_played_playlist(limit: int = 100):
+1 -4
View File
@@ -25,14 +25,11 @@ class ArtistMixes(HomepageRoutine):
custom_mixes = []
for _mix in mixes:
custom_mix = mix.get_custom_mix_items(_mix)
custom_mix = mix.get_track_mix(_mix)
if custom_mix:
custom_mixes.append(custom_mix)
for index, custom_mix in enumerate(custom_mixes):
custom_mix.title = f"Mix {index + 1}"
HomepageStore.set_mixes(
custom_mixes, entrykey="custom_mixes", userid=user.id
)
+1 -1
View File
@@ -1,7 +1,7 @@
import pprint
from app.db.userdata import ScrobbleTable, UserTable
from app.lib.home.recentlyadded import get_recently_added_items
from app.lib.home.recentlyplayed import get_recently_played
from app.lib.home.get_recently_played import get_recently_played
from app.lib.recipes import HomepageRoutine
from app.store.homepage import HomepageStore
+2 -1
View File
@@ -27,10 +27,11 @@ class TrackLog:
def __post_init__(self):
prefix_map = {
"mix:": "mix",
"al:": "album",
"ar:": "artist",
"pl:": "playlist",
"fo:": "folder",
"pl:": "playlist",
"favorite": "favorite",
}
+9 -2
View File
@@ -5,7 +5,8 @@ from typing import Any
from app.lib.playlistlib import get_first_4_images
from app.serializers.track import serialize_tracks
from app.store.tracks import TrackStore
from app.utils.dates import seconds_to_time_string
from app.utils.dates import seconds_to_time_string, timestamp_to_time_passed
from app.utils.hashing import create_hash
@dataclass
@@ -15,6 +16,7 @@ class Mix:
description: str
tracks: list[str]
sourcehash: str
userid: int
"""
A hash of the tracks used to generate the mix.
"""
@@ -41,8 +43,13 @@ class Mix:
return _dict
def to_dict(self):
def to_dict(self, convert_timestamp: bool = False):
item = asdict(self)
item["trackshash"] = create_hash(*self.tracks[:40])
if convert_timestamp:
item["time"] = timestamp_to_time_passed(item["timestamp"])
del item["tracks"]
del item["extra"]["albums"]
+52 -28
View File
@@ -32,7 +32,7 @@ class MixAlreadyExists(Exception):
class MixesPlugin(Plugin):
MAX_TRACKS_TO_FETCH = 5
MIN_TRACK_MIX_LENGTH = 15
MIX_TRACKS = 40
MIX_TRACKS_LENGTH = 40
MIN_DAY_LISTEN_DURATION = 3 * 60 # 3 minutes
MIN_WEEK_LISTEN_DURATION = 10 * 60 # 10 minutes
@@ -65,7 +65,7 @@ class MixesPlugin(Plugin):
return False
@plugin_method
def get_track_mix(self, tracks: list[Track], with_help: bool = False):
def get_track_mix_data(self, tracks: list[Track], with_help: bool = False):
"""
Given a list of tracks, creates a mix by fetching data from the
Swing Music Cloud recommendation server.
@@ -137,25 +137,25 @@ class MixesPlugin(Plugin):
return trackmatches, results["albums"], results["artists"]
@plugin_method
def get_artist_mix(self, artisthash: str):
"""
Given an artisthash, creates an artist mix using the
self.MAX_TRACKS_TO_FETCH most listened to tracks.
# def get_artist_mix(self, artisthash: str):
# """
# Given an artisthash, creates an artist mix using the
# self.MAX_TRACKS_TO_FETCH most listened to tracks.
Returns a tuple of the mix and the sourcehash.
"""
artist = ArtistStore.artistmap[artisthash]
tracks = TrackStore.get_tracks_by_trackhashes(artist.trackhashes)
# Returns a tuple of the mix and the sourcehash.
# """
# artist = ArtistStore.artistmap[artisthash]
# tracks = TrackStore.get_tracks_by_trackhashes(artist.trackhashes)
tracks = sorted(tracks, key=lambda x: x.playduration, reverse=True)
sourcetracks = tracks[: self.MAX_TRACKS_TO_FETCH]
sourcehash = create_hash(*[t.trackhash for t in sourcetracks])
# tracks = sorted(tracks, key=lambda x: x.playduration, reverse=True)
# sourcetracks = tracks[: self.MAX_TRACKS_TO_FETCH]
# sourcehash = create_hash(*[t.trackhash for t in sourcetracks])
if MixTable.get_by_sourcehash(sourcehash):
raise MixAlreadyExists()
# if MixTable.get_by_sourcehash(sourcehash):
# raise MixAlreadyExists()
tracks, albums, artists = self.get_track_mix(tracks[: self.MAX_TRACKS_TO_FETCH])
return (tracks, albums, artists, sourcehash)
# tracks, albums, artists = self.get_track_mix(tracks[: self.MAX_TRACKS_TO_FETCH])
# return (tracks, albums, artists, sourcehash)
@plugin_method
def create_artist_mixes(self, userid: int):
@@ -225,7 +225,7 @@ class MixesPlugin(Plugin):
)
mix = self.create_artist_mix(
artist, trackhashes[: self.MAX_TRACKS_TO_FETCH]
artist, trackhashes[: self.MAX_TRACKS_TO_FETCH], userid=userid
)
if mix:
@@ -262,7 +262,9 @@ class MixesPlugin(Plugin):
return f"Featuring {tracks[0].artists[0]['name']}"
def create_artist_mix(self, artist: dict[str, str], trackhashes: list[str]):
def create_artist_mix(
self, artist: dict[str, str], trackhashes: list[str], userid: int
):
"""
Given an artist dict, creates an artist mix.
"""
@@ -286,7 +288,7 @@ class MixesPlugin(Plugin):
print(db_mix.title)
return db_mix
mix_tracks, albums, artists = self.get_track_mix(tracks)
mix_tracks, albums, artists = self.get_track_mix_data(tracks)
if len(mix_tracks) < self.MIN_TRACK_MIX_LENGTH:
return None
@@ -300,11 +302,12 @@ class MixesPlugin(Plugin):
mix = Mix(
# the a prefix indicates that this is an artist mix
id=f"a{artist['artisthash']}",
id=f"a{userid}{artist['artisthash']}",
title=artist["artist"] + " Radio",
description=self.get_mix_description(mix_tracks, artist["artisthash"]),
tracks=[t.trackhash for t in mix_tracks],
sourcehash=sourcehash,
userid=userid,
extra={
"type": "artist",
"artisthash": artist["artisthash"],
@@ -435,31 +438,47 @@ class MixesPlugin(Plugin):
"""
pass
def get_custom_mix_items(self, mix: Mix):
def get_track_mix(self, mix: Mix):
"""
Given a mix, returns the excess tracks as a custom mix.
"""
# INFO: If the mix can't have more than 20 tracks, return None
if len(mix.tracks) <= self.MIX_TRACKS + 20:
if len(mix.tracks) <= self.MIX_TRACKS_LENGTH + 20:
return None
tracks = TrackStore.get_tracks_by_trackhashes(mix.tracks[self.MIX_TRACKS :])
og_track = TrackStore.trackhashmap.get(mix.tracks[0])
return Mix(
id=f"t{mix.extra['artisthash']}",
title="", # INFO: Will be filled after all mixes are created.
if not og_track:
return None
og_track = og_track.get_best()
tracks = [og_track] + TrackStore.get_tracks_by_trackhashes(
mix.tracks[self.MIX_TRACKS_LENGTH :]
)
trackmix = Mix(
id=f"t{mix.userid}{mix.extra['artisthash']}",
title=og_track.title,
description=self.get_mix_description(tracks, mix.extra["artisthash"]),
tracks=[t.trackhash for t in tracks],
sourcehash=create_hash(*[t.trackhash for t in tracks]),
userid=mix.userid,
extra={
"type": "track",
"og_sourcehash": mix.sourcehash,
"images": self.get_custom_mix_images(tracks),
"artists": None,
"albums": None,
},
)
# INFO: Write track mix save state
if mix.extra.get("trackmix_saved"):
trackmix.saved = True
return trackmix
def get_custom_mix_images(self, tracks: list[Track]):
first_album = tracks[0].albumhash
@@ -478,12 +497,17 @@ class MixesPlugin(Plugin):
if artisthash in seen:
continue
artist = ArtistStore.artistmap.get(artisthash)
if not artist:
continue
seen.add(artisthash)
image = {
"image": artisthash + ".webp",
"type": "artist",
"color": ArtistStore.get_artist_by_hash(artisthash).color,
"color": artist.artist.color,
}
images.append(image)
+20 -116
View File
@@ -1,121 +1,14 @@
from abc import ABC
from typing import Any
from app.lib.home.recentlyplayed import recover_items
from app.models.mix import Mix
from app.utils.auth import get_current_userid
class HomepageEntry(ABC):
"""
Base class for all homepage entries.
items is a dict of userid to a dict of stuff.
"""
title: str
description: str
items: dict[int, Any]
def __init__(self, title: str, description: str):
self.title = title
self.description = description
def get_items(self, userid: int, limit: int | None = None):
"""
Return usable items for the homepage.
"""
...
class MixHomepageEntry(HomepageEntry):
"""
A homepage entry for mixes.
self.items is a dict of userid to a dict of mixid to mix.
"""
items: dict[int, dict[str, Mix]]
def __init__(self, title: str, description: str):
super().__init__(title, description)
self.items = {}
def get_items(self, userid: int, limit: int | None = None):
items = []
for mix in self.items.get(userid, {}).values():
if limit and len(items) >= limit:
break
items.append(
{
"type": "mix",
"item": mix.to_dict(),
}
from app.store.homepageentries import (
BecauseYouListenedToArtistHomepageEntry,
GenericRecoverableEntry,
HomepageEntry,
MixHomepageEntry,
RecentlyAddedHomepageEntry,
RecentlyPlayedHomepageEntry,
)
return {
"title": self.title,
"description": self.description,
"items": items,
}
class RecentlyPlayedHomepageEntry(HomepageEntry):
"""
A homepage entry for recently played.
"""
items: dict[int, list[dict[str, Any]]]
def __init__(self, title: str, description: str = ""):
super().__init__(title, description)
self.items = {}
def get_items(self, userid: int, limit: int | None = None):
items = self.items.get(userid, [])[:limit]
return {
"title": self.title,
"description": self.description,
"items": recover_items(items),
}
class RecentlyAddedHomepageEntry(RecentlyPlayedHomepageEntry):
"""
A homepage entry for recently added.
"""
def get_items(self, userid: int, limit: int | None = None):
return super().get_items(0, limit)
class GenericRecoverableEntry(RecentlyPlayedHomepageEntry):
"""
A homepage entry for top streamed.
"""
# NOTE: This extends RecentlyPlayedHomepageEntry because
# the shape of the data is the same.
pass
class BecauseYouListenedToArtistHomepageEntry(RecentlyPlayedHomepageEntry):
"""
A homepage entry for because you listened to artist.
"""
# SHAPE: {userid: {title: str, items: list[RecoverableItem]}}
items: dict[int, dict[str, Any]]
def get_items(self, userid: int, limit: int | None = None):
title = self.items.get(userid, {}).get("title")
items = self.items.get(userid, {}).get("items", [])[:limit]
return {
"title": title,
"items": recover_items(items),
}
from app.utils.auth import get_current_userid
class HomepageStore:
@@ -159,7 +52,7 @@ class HomepageStore:
@classmethod
def set_mixes(cls, items: list[Any], entrykey: str, userid: int | None = None):
idmap = {item.id[1:]: item for item in items}
idmap = {item.id: item for item in items}
cls.entries[entrykey].items[userid or get_current_userid()] = idmap
@classmethod
@@ -175,3 +68,14 @@ class HomepageStore:
for entry in cls.entries.keys()
if len(cls.entries[entry].items)
]
@classmethod
def find_mix(cls, mixid: str):
mixentries = ["artist_mixes", "custom_mixes"]
for entry in mixentries:
mix = cls.entries[entry].items.get(get_current_userid(), {}).get(mixid)
if mix:
return mix
return None
+118
View File
@@ -0,0 +1,118 @@
from abc import ABC
from typing import Any
from app.lib.home import recover_items
from app.models.mix import Mix
class HomepageEntry(ABC):
"""
Base class for all homepage entries.
items is a dict of userid to a dict of stuff.
"""
title: str
description: str
items: dict[int, Any]
def __init__(self, title: str, description: str):
self.title = title
self.description = description
def get_items(self, userid: int, limit: int | None = None):
"""
Return usable items for the homepage.
"""
...
class MixHomepageEntry(HomepageEntry):
"""
A homepage entry for mixes.
self.items is a dict of userid to a dict of mixid to mix.
"""
items: dict[int, dict[str, Mix]]
def __init__(self, title: str, description: str):
super().__init__(title, description)
self.items = {}
def get_items(self, userid: int, limit: int | None = None):
items = []
for mix in self.items.get(userid, {}).values():
if limit and len(items) >= limit:
break
items.append(
{
"type": "mix",
"item": mix.to_dict(),
}
)
return {
"title": self.title,
"description": self.description,
"items": items,
}
class RecentlyPlayedHomepageEntry(HomepageEntry):
"""
A homepage entry for recently played.
"""
items: dict[int, list[dict[str, Any]]]
def __init__(self, title: str, description: str = ""):
super().__init__(title, description)
self.items = {}
def get_items(self, userid: int, limit: int | None = None):
items = self.items.get(userid, [])[:limit]
return {
"title": self.title,
"description": self.description,
"items": recover_items(items),
}
class RecentlyAddedHomepageEntry(RecentlyPlayedHomepageEntry):
"""
A homepage entry for recently added.
"""
def get_items(self, userid: int, limit: int | None = None):
return super().get_items(0, limit)
class GenericRecoverableEntry(RecentlyPlayedHomepageEntry):
"""
A homepage entry for top streamed.
"""
# NOTE: This extends RecentlyPlayedHomepageEntry because
# the shape of the data is the same.
pass
class BecauseYouListenedToArtistHomepageEntry(RecentlyPlayedHomepageEntry):
"""
A homepage entry for because you listened to artist.
"""
# SHAPE: {userid: {title: str, items: list[RecoverableItem]}}
items: dict[int, dict[str, Any]]
def get_items(self, userid: int, limit: int | None = None):
title = self.items.get(userid, {}).get("title")
items = self.items.get(userid, {}).get("items", [])[:limit]
return {
"title": title,
"items": recover_items(items),
}