mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
merge mixes pr
Mixes
This commit is contained in:
@@ -31,3 +31,4 @@ logs.txt
|
||||
testdata.py
|
||||
test.py
|
||||
nohup.out
|
||||
*s.json
|
||||
|
||||
@@ -97,3 +97,72 @@ Swing Music is looking for contributors. If you're interested, please join us at
|
||||
This software is provided to you with terms stated in the MIT License. Read the full text in the `LICENSE` file located at the root of this repository.
|
||||
|
||||
**[MIT License](https://opensource.org/licenses/MIT) | Copyright (c) 2023 Mungai Njoroge**
|
||||
|
||||
### Contributors
|
||||
|
||||
Shout out to the following code contributors who have helped maintain and improve Swing Music:
|
||||
|
||||
<div align="left">
|
||||
<table>
|
||||
<tr>
|
||||
<td align="center">
|
||||
<a href="https://github.com/cwilvx">
|
||||
<img src="https://github.com/cwilvx.png" width="80px;"/>
|
||||
<br />
|
||||
<sub><b>@cwilvx</b></sub>
|
||||
</a>
|
||||
</td>
|
||||
<td align="center">
|
||||
<a href="https://github.com/Ericgacoki">
|
||||
<img src="https://github.com/Ericgacoki.png" width="80px;" alt=""/>
|
||||
<br />
|
||||
<sub><b>@Ericgacoki</b></sub>
|
||||
</a>
|
||||
</td>
|
||||
<td align="center">
|
||||
<a href="https://github.com/Simonh2o">
|
||||
<img src="https://github.com/Simonh2o.png" width="80px;"/>
|
||||
<br />
|
||||
<sub><b>@Simonh2o</b></sub>
|
||||
</a>
|
||||
</td>
|
||||
<td align="center">
|
||||
<a href="https://github.com/tcsenpai">
|
||||
<img src="https://github.com/tcsenpai.png" width="80px;"/>
|
||||
<br />
|
||||
<sub><b>@tcsenpai</b></sub>
|
||||
</a>
|
||||
</td>
|
||||
<td align="center">
|
||||
<a href="https://github.com/jensgrunzer1">
|
||||
<img src="https://github.com/jensgrunzer1.png" width="80px;"/>
|
||||
<br />
|
||||
<sub><b>@jensgrunzer1</b></sub>
|
||||
</a>
|
||||
</td>
|
||||
<td align="center">
|
||||
<a href="https://github.com/Type-Delta">
|
||||
<img src="https://github.com/Type-Delta.png" width="80px;" alt=""/>
|
||||
<br />
|
||||
<sub><b>@Type-Delta</b></sub>
|
||||
</a>
|
||||
</td>
|
||||
<td align="center">
|
||||
<a href="https://github.com/MarcOrfilaCarreras">
|
||||
<img src="https://github.com/MarcOrfilaCarreras.png" width="80px;" alt=""/>
|
||||
<br />
|
||||
<sub><b>@MarcOrfilaCarreras</b></sub>
|
||||
</a>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td align="center">
|
||||
<a href="https://github.com/tralph3">
|
||||
<img src="https://github.com/tralph3.png" width="80px;" alt=""/>
|
||||
<br />
|
||||
<sub><b>@tralph3</b></sub>
|
||||
</a>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
<!-- CHECKPOINT -->
|
||||
<!-- ALBUM PAGE! -->
|
||||
|
||||
# DONE
|
||||
# DONE
|
||||
|
||||
- Support auth headers
|
||||
- Add recently played playlist
|
||||
|
||||
@@ -14,6 +14,7 @@ from app.config import UserConfig
|
||||
from app.db.userdata import UserTable
|
||||
from app.settings import Info as AppInfo
|
||||
from .plugins import lyrics as lyrics_plugin
|
||||
from .plugins import mixes as mixes_plugin
|
||||
from app.api import (
|
||||
album,
|
||||
artist,
|
||||
@@ -113,6 +114,7 @@ def create_api():
|
||||
# Plugins
|
||||
app.register_api(plugins.api)
|
||||
app.register_api(lyrics_plugin.api)
|
||||
app.register_api(mixes_plugin.api)
|
||||
|
||||
# Logger
|
||||
app.register_api(scrobble.api)
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
from flask_jwt_extended import current_user
|
||||
from flask_openapi3 import Tag
|
||||
from flask_openapi3 import APIBlueprint
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from app.api.apischemas import GenericLimitSchema
|
||||
from app.lib.home.recentlyadded import get_recently_added_items
|
||||
from app.lib.home.recentlyplayed import get_recently_played
|
||||
from app.lib.home.get_recently_played import get_recently_played
|
||||
from app.store.homepage import HomepageStore
|
||||
|
||||
bp_tag = Tag(name="Home", description="Homepage items")
|
||||
api = APIBlueprint("home", __name__, url_prefix="/home", abp_tags=[bp_tag])
|
||||
@@ -24,3 +25,14 @@ def get_recent_plays(query: GenericLimitSchema):
|
||||
Get recently played
|
||||
"""
|
||||
return {"items": get_recently_played(query.limit)}
|
||||
|
||||
|
||||
class HomepageItem(BaseModel):
|
||||
limit: int = Field(
|
||||
default=9, description="The max number of items per group to return"
|
||||
)
|
||||
|
||||
|
||||
@api.get("/")
|
||||
def homepage_items(query: HomepageItem):
|
||||
return HomepageStore.get_homepage_items(limit=query.limit)
|
||||
@@ -140,3 +140,21 @@ def send_playlist_image(path: PlaylistImagePath):
|
||||
"""
|
||||
folder = Paths.get_playlist_img_path()
|
||||
return send_file_or_fallback(folder, path.imgpath, "playlist.svg")
|
||||
|
||||
# MIXES
|
||||
@api.get("/mix/medium/<imgpath>")
|
||||
def send_md_mix_image(path: ImagePath):
|
||||
"""
|
||||
Get medium mix image
|
||||
"""
|
||||
folder = Paths.get_md_mixes_img_path()
|
||||
return send_file_or_fallback(folder, path.imgpath, "playlist.svg")
|
||||
|
||||
|
||||
@api.get("/mix/small/<imgpath>")
|
||||
def send_sm_mix_image(path: ImagePath):
|
||||
"""
|
||||
Get small mix image
|
||||
"""
|
||||
folder = Paths.get_sm_mixes_img_path()
|
||||
return send_file_or_fallback(folder, path.imgpath, "playlist.svg")
|
||||
|
||||
@@ -0,0 +1,109 @@
|
||||
from typing import Literal
|
||||
from flask_openapi3 import Tag
|
||||
from flask_openapi3 import APIBlueprint
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from app.db.userdata import MixTable
|
||||
from app.plugins.mixes import MixesPlugin
|
||||
from app.store.homepage import HomepageStore
|
||||
from app.store.tracks import TrackStore
|
||||
|
||||
|
||||
bp_tag = Tag(name="Mixes Plugin", description="Mixes plugin hehe")
|
||||
api = APIBlueprint(
|
||||
"mixesplugin", __name__, url_prefix="/plugins/mixes", abp_tags=[bp_tag]
|
||||
)
|
||||
|
||||
|
||||
class GetMixesBody(BaseModel):
|
||||
mixtype: Literal["artists", "tracks"] = Field(description="The type of mix")
|
||||
|
||||
|
||||
@api.get("/<mixtype>")
|
||||
def get_artist_mixes(path: GetMixesBody):
|
||||
srcmixes = MixTable.get_all(with_userid=True)
|
||||
mixes = []
|
||||
|
||||
if path.mixtype == "artists":
|
||||
mixes = [mix.to_dict(convert_timestamp=True) for mix in srcmixes]
|
||||
elif path.mixtype == "tracks":
|
||||
plugin = MixesPlugin()
|
||||
|
||||
for mix in srcmixes:
|
||||
custom_mix = plugin.get_track_mix(mix)
|
||||
if custom_mix:
|
||||
mixes.append(custom_mix.to_dict(convert_timestamp=True))
|
||||
|
||||
seen_mixids = set()
|
||||
|
||||
# filter duplicates by trackshash
|
||||
final_mixes = []
|
||||
for mix in mixes:
|
||||
# INFO: Ignore duplicates for artist mixes
|
||||
if mix["id"] in seen_mixids and path.mixtype == "tracks":
|
||||
continue
|
||||
|
||||
final_mixes.append(mix)
|
||||
seen_mixids.add(mix["id"])
|
||||
|
||||
return final_mixes
|
||||
|
||||
|
||||
class MixQuery(BaseModel):
|
||||
mixid: str = Field(description="The mix id")
|
||||
sourcehash: str = Field(description="The sourcehash of the mix")
|
||||
|
||||
|
||||
@api.get("/")
|
||||
def get_mix(query: MixQuery):
|
||||
mixtype = ""
|
||||
|
||||
match query.mixid[0]:
|
||||
case "a":
|
||||
mixtype = "artist_mixes"
|
||||
case "t":
|
||||
mixtype = "custom_mixes"
|
||||
case _:
|
||||
return {"msg": "Invalid mix ID"}, 400
|
||||
|
||||
# INFO: Check if the mix is already in the homepage store
|
||||
mix = HomepageStore.get_mix(mixtype, query.mixid)
|
||||
if mix and mix["sourcehash"] == query.sourcehash:
|
||||
return mix, 200
|
||||
|
||||
# INF0: Get the mix from the db
|
||||
mix = MixTable.get_by_sourcehash(query.sourcehash)
|
||||
|
||||
if not mix:
|
||||
return {"msg": "Mix not found"}, 404
|
||||
|
||||
if mixtype == "custom_mixes":
|
||||
mix = MixesPlugin.get_track_mix(mix)
|
||||
|
||||
if not mix:
|
||||
return {"msg": "Mix not found"}, 404
|
||||
|
||||
return mix.to_full_dict(), 200
|
||||
|
||||
|
||||
class SaveMixRequest(BaseModel):
|
||||
mixid: str = Field(description="The id of the mix")
|
||||
type: str = Field(description="The type of mix")
|
||||
sourcehash: str = Field(description="The sourcehash of the mix")
|
||||
|
||||
|
||||
@api.post("/save")
|
||||
def save_mix(body: SaveMixRequest):
|
||||
mix_type = body.type
|
||||
mix_sourcehash = body.sourcehash
|
||||
|
||||
if mix_type == "artist":
|
||||
state = MixTable.save_artist_mix(mix_sourcehash)
|
||||
elif mix_type == "track":
|
||||
state = MixTable.save_track_mix(mix_sourcehash)
|
||||
|
||||
mix = HomepageStore.find_mix(body.mixid)
|
||||
|
||||
if mix:
|
||||
mix.saved = state
|
||||
return {"msg": "Mixes saved"}, 200
|
||||
@@ -1,20 +1,15 @@
|
||||
from dataclasses import dataclass
|
||||
from gettext import ngettext
|
||||
from itertools import groupby
|
||||
from math import e
|
||||
from pprint import pprint
|
||||
from flask_openapi3 import Tag
|
||||
from flask_openapi3 import APIBlueprint
|
||||
import pendulum
|
||||
from pydantic import Field, BaseModel
|
||||
from app.api.apischemas import TrackHashSchema
|
||||
from typing import Literal
|
||||
from datetime import datetime, timedelta
|
||||
from collections import defaultdict
|
||||
import locale
|
||||
|
||||
from app.db.userdata import FavoritesTable, ScrobbleTable
|
||||
from app.lib.extras import get_extra_info
|
||||
from app.lib.recipes.recents import RecentlyPlayed
|
||||
from app.models.album import Album
|
||||
from app.models.stats import StatItem
|
||||
from app.models.track import Track
|
||||
@@ -77,9 +72,14 @@ def log_track(body: LogTrackBody):
|
||||
return {"msg": "Track not found."}, 404
|
||||
|
||||
scrobble_data = dict(body)
|
||||
# REVIEW: Do we need to store the extra info in the database?
|
||||
# OR .... can we just write it to the backup file on demand?
|
||||
scrobble_data["extra"] = get_extra_info(body.trackhash, "track")
|
||||
ScrobbleTable.add(scrobble_data)
|
||||
|
||||
# NOTE: Update the recently played homepage for this userid
|
||||
RecentlyPlayed(userid=scrobble_data["userid"])
|
||||
|
||||
# Update play data on the in-memory stores
|
||||
track = trackentry.tracks[0]
|
||||
album = AlbumStore.albummap.get(track.albumhash)
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
import time
|
||||
import schedule
|
||||
|
||||
from app.crons.mixes import Mixes
|
||||
from app.lib.recipes.recents import RecentlyAdded, RecentlyPlayed
|
||||
from app.lib.recipes.topstreamed import TopArtists
|
||||
from app.utils.threading import background
|
||||
|
||||
|
||||
@background
|
||||
def start_cron_jobs():
|
||||
"""
|
||||
This is the function that triggers the cron jobs.
|
||||
"""
|
||||
# NOTE: RecentlyPlayed is not a CRON job, it's triggered here to
|
||||
# populate the values for the very first time.
|
||||
RecentlyPlayed()
|
||||
RecentlyAdded()
|
||||
|
||||
# Initialized CRON jobs
|
||||
TopArtists()
|
||||
TopArtists(duration="week")
|
||||
Mixes()
|
||||
|
||||
# Trigger all CRON jobs when the app is started.
|
||||
schedule.run_all()
|
||||
|
||||
# Run all CRON jobs on a loop.
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(1)
|
||||
@@ -0,0 +1,23 @@
|
||||
import schedule
|
||||
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class CronJob(ABC):
|
||||
"""
|
||||
A cron job that will be run on a regular interval.
|
||||
"""
|
||||
|
||||
name: str
|
||||
hours: int = 1
|
||||
|
||||
def __init__(self):
|
||||
schedule.every(self.hours).hours.do(self.run)
|
||||
|
||||
@abstractmethod
|
||||
def run(self):
|
||||
"""
|
||||
The function that will be called by the cron job.
|
||||
"""
|
||||
...
|
||||
@@ -0,0 +1,26 @@
|
||||
from app.crons.cron import CronJob
|
||||
from app.lib.recipes.artistmixes import ArtistMixes
|
||||
from app.lib.recipes.because import BecauseYouListened
|
||||
|
||||
|
||||
class Mixes(CronJob):
|
||||
"""
|
||||
This cron job creates mixes displayed on the homepage.
|
||||
"""
|
||||
|
||||
name: str = "mixes"
|
||||
hours: int = 6
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Creates the artist mixes
|
||||
"""
|
||||
print("⭐⭐⭐⭐ Mixes cron job running")
|
||||
ArtistMixes()
|
||||
|
||||
# INFO: Because you listened to artist items are generated using
|
||||
# the artist mixes, so run them after the artist mixes are created.
|
||||
BecauseYouListened()
|
||||
@@ -8,92 +8,6 @@ from sqlalchemy.orm import Mapped, mapped_column
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
# def create_all():
|
||||
# """
|
||||
# Create all the tables defined in this file.
|
||||
|
||||
# NOTE: We need this function because the MasterBase does not collect
|
||||
# the tables defined here (as they are grand-children of the MasterBase)
|
||||
# """
|
||||
# Base.metadata.create_all(DbEngine.engine)
|
||||
|
||||
|
||||
# class Base(MasterBase, DeclarativeBase):
|
||||
# pass
|
||||
# @classmethod
|
||||
# def get_all_hashes(cls, create_date: int | None = None):
|
||||
# with DbEngine.manager() as conn:
|
||||
# if create_date:
|
||||
# if cls.__tablename__ == "track":
|
||||
# stmt = select(TrackTable.trackhash).where(
|
||||
# cls.last_mod < create_date
|
||||
# )
|
||||
# elif cls.__tablename__ == "album":
|
||||
# stmt = select(AlbumTable.albumhash).where(
|
||||
# cls.created_date < create_date
|
||||
# )
|
||||
# elif cls.__tablename__ == "artist":
|
||||
# stmt = select(ArtistTable.artisthash).where(
|
||||
# cls.created_date < create_date
|
||||
# )
|
||||
# else:
|
||||
# if cls.__tablename__ == "track":
|
||||
# stmt = select(TrackTable.trackhash)
|
||||
# elif cls.__tablename__ == "album":
|
||||
# stmt = select(AlbumTable.albumhash)
|
||||
# elif cls.__tablename__ == "artist":
|
||||
# stmt = select(ArtistTable.artisthash)
|
||||
|
||||
# result = conn.execute(stmt)
|
||||
# return {row[0] for row in result.fetchall()}
|
||||
|
||||
# @classmethod
|
||||
# def set_is_favorite(cls, hash: str, is_favorite: bool):
|
||||
# """
|
||||
# Set the 'is_favorite' flag for a specific hash.
|
||||
|
||||
# Args:
|
||||
# hash (str): The hash value.
|
||||
# is_favorite (bool): The value of the 'is_favorite' flag.
|
||||
# """
|
||||
# with DbEngine.manager(commit=True) as conn:
|
||||
# if cls.__tablename__ == "track":
|
||||
# stmt = (
|
||||
# update(cls)
|
||||
# .where(TrackTable.trackhash == hash)
|
||||
# .values(is_favorite=is_favorite)
|
||||
# )
|
||||
# elif cls.__tablename__ == "album":
|
||||
# stmt = (
|
||||
# update(cls)
|
||||
# .where(AlbumTable.albumhash == hash)
|
||||
# .values(is_favorite=is_favorite)
|
||||
# )
|
||||
# elif cls.__tablename__ == "artist":
|
||||
# stmt = (
|
||||
# update(cls)
|
||||
# .where(ArtistTable.artisthash == hash)
|
||||
# .values(is_favorite=is_favorite)
|
||||
# )
|
||||
|
||||
# conn.execute(stmt)
|
||||
|
||||
# @classmethod
|
||||
# def increment_scrobblecount(
|
||||
# cls, table: Any, field: Any, hash: str, duration: int, timestamp: int
|
||||
# ):
|
||||
# cls.execute(
|
||||
# update(table)
|
||||
# .where(field == hash)
|
||||
# .values(
|
||||
# playcount=table.playcount + 1,
|
||||
# playduration=table.playduration + duration,
|
||||
# lastplayed=timestamp,
|
||||
# ),
|
||||
# commit=True,
|
||||
# )
|
||||
|
||||
|
||||
class TrackTable(Base):
|
||||
__tablename__ = "track"
|
||||
|
||||
@@ -101,7 +15,6 @@ class TrackTable(Base):
|
||||
album: Mapped[str] = mapped_column(String())
|
||||
albumartists: Mapped[str] = mapped_column(String())
|
||||
albumhash: Mapped[str] = mapped_column(String(), index=True)
|
||||
# artisthashes: Mapped[list[str]] = mapped_column(JSON(), index=True)
|
||||
artists: Mapped[str] = mapped_column(String())
|
||||
bitrate: Mapped[int] = mapped_column(Integer())
|
||||
copyright: Mapped[Optional[str]] = mapped_column(String())
|
||||
@@ -110,11 +23,8 @@ class TrackTable(Base):
|
||||
duration: Mapped[int] = mapped_column(Integer())
|
||||
filepath: Mapped[str] = mapped_column(String(), index=True, unique=True)
|
||||
folder: Mapped[str] = mapped_column(String(), index=True)
|
||||
# genrehashes: Mapped[list[str]] = mapped_column(JSON(), index=True)
|
||||
genres: Mapped[Optional[str]] = mapped_column(String())
|
||||
last_mod: Mapped[float] = mapped_column(Integer())
|
||||
# og_album: Mapped[str] = mapped_column(String())
|
||||
# og_title: Mapped[str] = mapped_column(String())
|
||||
title: Mapped[str] = mapped_column(String())
|
||||
track: Mapped[int] = mapped_column(Integer())
|
||||
trackhash: Mapped[str] = mapped_column(String(), index=True)
|
||||
@@ -141,45 +51,6 @@ class TrackTable(Base):
|
||||
)
|
||||
return tracks_to_dataclasses(result.fetchall())
|
||||
|
||||
# @classmethod
|
||||
# def get_tracks_by_albumhash(cls, albumhash: str):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(TrackTable).where(TrackTable.albumhash == albumhash)
|
||||
# )
|
||||
# tracks = tracks_to_dataclasses(result.fetchall())
|
||||
# return remove_duplicates(tracks, is_album_tracks=True)
|
||||
|
||||
# @classmethod
|
||||
# def get_track_by_trackhash(cls, hash: str, filepath: str = ""):
|
||||
# with DbEngine.manager() as conn:
|
||||
# if filepath:
|
||||
# result = conn.execute(
|
||||
# select(TrackTable)
|
||||
# .where(
|
||||
# (TrackTable.trackhash == hash)
|
||||
# & (TrackTable.filepath == filepath),
|
||||
# )
|
||||
# .order_by(TrackTable.bitrate.desc())
|
||||
# )
|
||||
# else:
|
||||
# result = conn.execute(
|
||||
# select(TrackTable).where(TrackTable.trackhash == hash)
|
||||
# )
|
||||
|
||||
# track = result.fetchone()
|
||||
|
||||
# if track:
|
||||
# return track_to_dataclass(track)
|
||||
|
||||
# @classmethod
|
||||
# def get_tracks_by_artisthash(cls, artisthash: str):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(TrackTable).where(TrackTable.artists.contains(artisthash))
|
||||
# )
|
||||
# return tracks_to_dataclasses(result.fetchall())
|
||||
|
||||
@classmethod
|
||||
def get_tracks_in_path(cls, path: str):
|
||||
with DbEngine.manager() as conn:
|
||||
@@ -190,230 +61,7 @@ class TrackTable(Base):
|
||||
)
|
||||
return tracks_to_dataclasses(result.fetchall())
|
||||
|
||||
# @classmethod
|
||||
# def get_tracks_by_trackhashes(cls, hashes: Iterable[str], limit: int | None = None):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(TrackTable)
|
||||
# .where(TrackTable.trackhash.in_(hashes))
|
||||
# .group_by(TrackTable.trackhash)
|
||||
# .limit(limit)
|
||||
# )
|
||||
# tracks = tracks_to_dataclasses(result.fetchall())
|
||||
|
||||
# # order the tracks in the same order as the hashes
|
||||
# if type(hashes) == list:
|
||||
# return sorted(tracks, key=lambda x: hashes.index(x.trackhash))
|
||||
|
||||
# return tracks
|
||||
|
||||
# @classmethod
|
||||
# def get_recently_added(cls, start: int, limit: int):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(TrackTable)
|
||||
# .order_by(TrackTable.last_mod.desc())
|
||||
# .offset(start)
|
||||
# .limit(limit)
|
||||
# )
|
||||
|
||||
# return tracks_to_dataclasses(result.fetchall())
|
||||
|
||||
@classmethod
|
||||
# def get_recently_played(cls, limit: int):
|
||||
# result = cls.execute(
|
||||
# select(cls)
|
||||
# .group_by(cls.trackhash)
|
||||
# .order_by(cls.lastplayed.desc())
|
||||
# .limit(limit)
|
||||
# )
|
||||
# return tracks_to_dataclasses(result.fetchall())
|
||||
|
||||
@classmethod
|
||||
def remove_tracks_by_filepaths(cls, filepaths: set[str]):
|
||||
with DbEngine.manager(commit=True) as conn:
|
||||
conn.execute(delete(TrackTable).where(TrackTable.filepath.in_(filepaths)))
|
||||
|
||||
# @classmethod
|
||||
# def increment_playcount(cls, trackhash: str, duration: int, timestamp: int):
|
||||
# cls.increment_scrobblecount(
|
||||
# TrackTable, TrackTable.trackhash, trackhash, duration, timestamp
|
||||
# )
|
||||
|
||||
# @classmethod
|
||||
# def update_artist_separators(cls, separators: set[str]):
|
||||
# tracks = cls.get_all()
|
||||
|
||||
# with DbEngine.manager(commit=True) as conn:
|
||||
# for track in tracks:
|
||||
# track.split_artists(separators)
|
||||
# conn.execute(
|
||||
# update(cls)
|
||||
# .where(cls.trackhash == track.trackhash)
|
||||
# .values(artists=track.artists, artisthashes=track.artisthashes)
|
||||
# )
|
||||
|
||||
|
||||
# class AlbumTable(Base):
|
||||
# __tablename__ = "album"
|
||||
|
||||
# id: Mapped[int] = mapped_column(primary_key=True)
|
||||
# albumartists: Mapped[list[dict[str, str]]] = mapped_column(JSON(), index=True)
|
||||
# artisthashes: Mapped[list[str]] = mapped_column(JSON(), index=True)
|
||||
# albumhash: Mapped[str] = mapped_column(String(), unique=True, index=True)
|
||||
# base_title: Mapped[str] = mapped_column(String())
|
||||
# color: Mapped[Optional[str]] = mapped_column(String())
|
||||
# created_date: Mapped[int] = mapped_column(Integer())
|
||||
# date: Mapped[int] = mapped_column(Integer())
|
||||
# duration: Mapped[int] = mapped_column(Integer())
|
||||
# genrehashes: Mapped[list[str]] = mapped_column(JSON(), nullable=True, index=True)
|
||||
# genres: Mapped[str] = mapped_column(JSON())
|
||||
# og_title: Mapped[str] = mapped_column(String())
|
||||
# title: Mapped[str] = mapped_column(String())
|
||||
# trackcount: Mapped[int] = mapped_column(Integer())
|
||||
# lastplayed: Mapped[int] = mapped_column(Integer(), default=0)
|
||||
# playcount: Mapped[int] = mapped_column(Integer(), default=0)
|
||||
# playduration: Mapped[int] = mapped_column(Integer(), default=0)
|
||||
# extra: Mapped[Optional[dict[str, Any]]] = mapped_column(
|
||||
# JSON(), default_factory=dict
|
||||
# )
|
||||
|
||||
# @classmethod
|
||||
# def get_all(cls):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(select(AlbumTable))
|
||||
# all = result.fetchall()
|
||||
# return albums_to_dataclasses(all)
|
||||
|
||||
# @classmethod
|
||||
# def get_album_by_albumhash(cls, hash: str):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(AlbumTable).where(AlbumTable.albumhash == hash)
|
||||
# )
|
||||
# album = result.fetchone()
|
||||
|
||||
# if album:
|
||||
# return album_to_dataclass(album)
|
||||
|
||||
# @classmethod
|
||||
# def get_albums_by_albumhashes(cls, hashes: Iterable[str], limit: int | None = None):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(AlbumTable).where(AlbumTable.albumhash.in_(hashes)).limit(limit)
|
||||
# )
|
||||
# albums = albums_to_dataclasses(result.fetchall())
|
||||
|
||||
# # order the albums in the same order as the hashes
|
||||
# if type(hashes) == list:
|
||||
# return sorted(albums, key=lambda x: hashes.index(x.albumhash))
|
||||
|
||||
# return albums
|
||||
|
||||
# @classmethod
|
||||
# def get_albums_by_artisthashes(cls, artisthashes: list[str]):
|
||||
# with DbEngine.manager() as conn:
|
||||
# albums: dict[str, list[AlbumModel]] = {}
|
||||
|
||||
# for artist in artisthashes:
|
||||
# result = conn.execute(
|
||||
# select(AlbumTable).where(AlbumTable.artisthashes.contains(artist))
|
||||
# )
|
||||
# albums[artist] = albums_to_dataclasses(result.fetchall())
|
||||
|
||||
# return albums
|
||||
|
||||
# @classmethod
|
||||
# def get_albums_by_base_title(cls, base_title: str):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(AlbumTable).where(AlbumTable.base_title == base_title)
|
||||
# )
|
||||
# return albums_to_dataclasses(result.fetchall())
|
||||
|
||||
# @classmethod
|
||||
# def get_albums_by_artisthash(cls, artisthash: str):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(AlbumTable).where(AlbumTable.artisthashes.contains(artisthash))
|
||||
# )
|
||||
# return albums_to_dataclasses(result.all())
|
||||
|
||||
# @classmethod
|
||||
# def increment_playcount(cls, albumhash: str, duration: int, timestamp: int):
|
||||
# return cls.increment_scrobblecount(
|
||||
# AlbumTable, AlbumTable.albumhash, albumhash, duration, timestamp
|
||||
# )
|
||||
|
||||
|
||||
# class ArtistTable(Base):
|
||||
# __tablename__ = "artist"
|
||||
|
||||
# id: Mapped[int] = mapped_column(primary_key=True)
|
||||
# albumcount: Mapped[int] = mapped_column(Integer())
|
||||
# artisthash: Mapped[str] = mapped_column(String(), unique=True, index=True)
|
||||
# created_date: Mapped[int] = mapped_column(Integer())
|
||||
# date: Mapped[int] = mapped_column(Integer())
|
||||
# duration: Mapped[int] = mapped_column(Integer())
|
||||
# genrehashes: Mapped[list[str]] = mapped_column(JSON(), nullable=True, index=True)
|
||||
# genres: Mapped[str] = mapped_column(JSON())
|
||||
# name: Mapped[str] = mapped_column(String(), index=True)
|
||||
# trackcount: Mapped[int] = mapped_column(Integer())
|
||||
# lastplayed: Mapped[int] = mapped_column(Integer(), default=0)
|
||||
# playcount: Mapped[int] = mapped_column(Integer(), default=0)
|
||||
# playduration: Mapped[int] = mapped_column(Integer(), default=0)
|
||||
# extra: Mapped[Optional[dict[str, Any]]] = mapped_column(
|
||||
# JSON(), default_factory=dict
|
||||
# )
|
||||
|
||||
# @classmethod
|
||||
# def get_all(cls):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(select(cls))
|
||||
# all = result.fetchall()
|
||||
# return artists_to_dataclasses(all)
|
||||
|
||||
# @classmethod
|
||||
# def get_artist_by_hash(cls, artisthash: str):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(ArtistTable).where(ArtistTable.artisthash == artisthash)
|
||||
# )
|
||||
# return artist_to_dataclass(result.fetchone())
|
||||
|
||||
# @classmethod
|
||||
# def get_artisthashes_not_in(cls, artisthashes: list[str]):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(ArtistTable.artisthash, ArtistTable.name).where(
|
||||
# ~ArtistTable.artisthash.in_(artisthashes)
|
||||
# )
|
||||
# )
|
||||
# return [{"artisthash": row[0], "name": row[1]} for row in result.fetchall()]
|
||||
|
||||
# @classmethod
|
||||
# def get_artists_by_artisthashes(
|
||||
# cls, hashes: Iterable[str], limit: int | None = None
|
||||
# ):
|
||||
# with DbEngine.manager() as conn:
|
||||
# result = conn.execute(
|
||||
# select(ArtistTable)
|
||||
# .where(ArtistTable.artisthash.in_(hashes))
|
||||
# .limit(limit)
|
||||
# )
|
||||
# return artists_to_dataclasses(result.fetchall())
|
||||
|
||||
# @classmethod
|
||||
# def increment_playcount(
|
||||
# cls, artisthashes: list[str], duration: int, timestamp: int
|
||||
# ):
|
||||
# cls.execute(
|
||||
# update(cls)
|
||||
# .where(ArtistTable.artisthash.in_(artisthashes))
|
||||
# .values(
|
||||
# playcount=ArtistTable.playcount + 1,
|
||||
# playduration=ArtistTable.playduration + duration,
|
||||
# lastplayed=timestamp,
|
||||
# ),
|
||||
# commit=True,
|
||||
# )
|
||||
|
||||
+140
-9
@@ -1,3 +1,4 @@
|
||||
from dataclasses import asdict
|
||||
import datetime
|
||||
from typing import Any, Literal
|
||||
from sqlalchemy import (
|
||||
@@ -25,12 +26,14 @@ from app.db.utils import (
|
||||
plugin_to_dataclasses,
|
||||
similar_artist_to_dataclass,
|
||||
similar_artists_to_dataclass,
|
||||
tracklog_to_dataclass,
|
||||
tracklog_to_dataclasses,
|
||||
user_to_dataclass,
|
||||
user_to_dataclasses,
|
||||
)
|
||||
|
||||
from app.db import Base
|
||||
from app.models.mix import Mix
|
||||
from app.utils.auth import get_current_userid, hash_password
|
||||
|
||||
|
||||
@@ -223,9 +226,7 @@ class FavoritesTable(Base):
|
||||
# .select_from(join(table, cls, field == cls.hash))
|
||||
.where(and_(cls.type == type, cls.userid == get_current_userid()))
|
||||
.order_by(cls.timestamp.desc())
|
||||
.offset(
|
||||
start
|
||||
)
|
||||
.offset(start)
|
||||
# INFO: If start is 0, fetch all so we can get the total count
|
||||
.limit(limit if start != 0 else None)
|
||||
)
|
||||
@@ -293,10 +294,10 @@ class ScrobbleTable(Base):
|
||||
return cls.insert_one(item)
|
||||
|
||||
@classmethod
|
||||
def get_all(cls, start: int, limit: int | None = None):
|
||||
def get_all(cls, start: int, limit: int | None = None, userid: int | None = None):
|
||||
result = cls.execute(
|
||||
select(cls)
|
||||
.where(cls.userid == get_current_userid())
|
||||
.where(cls.userid == (userid if userid else get_current_userid()))
|
||||
.order_by(cls.timestamp.desc())
|
||||
.offset(start)
|
||||
.limit(limit)
|
||||
@@ -305,15 +306,27 @@ class ScrobbleTable(Base):
|
||||
return tracklog_to_dataclasses(result.fetchall())
|
||||
|
||||
@classmethod
|
||||
def get_all_in_period(cls, start_time: int, end_time: int):
|
||||
def get_all_in_period(cls, start_time: int, end_time: int, userid: int | None):
|
||||
# UserId will be None if function is called from the API
|
||||
# In that case, we use the request userid
|
||||
if userid is None:
|
||||
userid = get_current_userid()
|
||||
|
||||
result = cls.execute(
|
||||
select(cls)
|
||||
.where(cls.userid == get_current_userid())
|
||||
.where(cls.userid == userid)
|
||||
.where(and_(cls.timestamp >= start_time, cls.timestamp <= end_time))
|
||||
.order_by(cls.timestamp.desc())
|
||||
)
|
||||
return tracklog_to_dataclasses(result.fetchall())
|
||||
|
||||
@classmethod
|
||||
def get_last_entry(cls, userid: int):
|
||||
result = cls.execute(
|
||||
select(cls).where(cls.userid == userid).order_by(cls.timestamp.desc())
|
||||
)
|
||||
return tracklog_to_dataclass(result.fetchone())
|
||||
|
||||
|
||||
class PlaylistTable(Base):
|
||||
__tablename__ = "playlist"
|
||||
@@ -332,8 +345,12 @@ class PlaylistTable(Base):
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_all(cls):
|
||||
result = cls.all()
|
||||
def get_all(cls, current_user: bool = True):
|
||||
if current_user:
|
||||
result = cls.execute(select(cls).where(cls.userid == get_current_userid()))
|
||||
else:
|
||||
result = cls.execute(select(cls))
|
||||
|
||||
return playlists_to_dataclasses(result)
|
||||
|
||||
@classmethod
|
||||
@@ -458,3 +475,117 @@ class LibDataTable(Base):
|
||||
select(cls.itemhash, cls.color).where(cls.itemtype == type)
|
||||
)
|
||||
return [{"itemhash": r[0], "color": r[1]} for r in result.fetchall()]
|
||||
|
||||
|
||||
class MixTable(Base):
|
||||
__tablename__ = "mix"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
mixid: Mapped[str] = mapped_column(String(), index=True)
|
||||
title: Mapped[str] = mapped_column(String())
|
||||
description: Mapped[str] = mapped_column(String())
|
||||
timestamp: Mapped[int] = mapped_column(Integer())
|
||||
sourcehash: Mapped[str] = mapped_column(String(), unique=True, index=True)
|
||||
userid: Mapped[int] = mapped_column(
|
||||
Integer(), ForeignKey("user.id", ondelete="cascade"), index=True
|
||||
)
|
||||
saved: Mapped[bool] = mapped_column(Boolean(), default=False)
|
||||
tracks: Mapped[list[str]] = mapped_column(JSON(), default_factory=list)
|
||||
extra: Mapped[dict[str, Any]] = mapped_column(
|
||||
JSON(), nullable=True, default_factory=dict
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_all(cls, with_userid: bool = False):
|
||||
if with_userid:
|
||||
result = cls.execute(
|
||||
select(cls)
|
||||
.where(cls.userid == get_current_userid())
|
||||
.order_by(cls.timestamp.desc())
|
||||
)
|
||||
else:
|
||||
result = cls.execute(select(cls).order_by(cls.timestamp.desc()))
|
||||
|
||||
return Mix.mixes_to_dataclasses(result.fetchall())
|
||||
|
||||
@classmethod
|
||||
def get_by_sourcehash(cls, sourcehash: str):
|
||||
result = cls.execute(select(cls).where(cls.sourcehash == sourcehash))
|
||||
|
||||
res = result.fetchone()
|
||||
if res:
|
||||
return Mix.mix_to_dataclass(res)
|
||||
|
||||
@classmethod
|
||||
def get_by_mixid(cls, mixid: str):
|
||||
result = cls.execute(select(cls).where(cls.mixid == mixid))
|
||||
res = result.fetchone()
|
||||
if res:
|
||||
return Mix.mix_to_dataclass(res)
|
||||
|
||||
@classmethod
|
||||
def insert_one(cls, mix: Mix):
|
||||
mixdict = asdict(mix)
|
||||
mixdict["mixid"] = mix.id
|
||||
del mixdict["id"]
|
||||
|
||||
return cls.execute(insert(cls).values(mixdict), commit=True)
|
||||
|
||||
@classmethod
|
||||
def update_one(cls, mixid: str, mix: Mix):
|
||||
mixdict = asdict(mix)
|
||||
mixdict["mixid"] = mix.id
|
||||
del mixdict["id"]
|
||||
|
||||
return cls.execute(
|
||||
update(cls)
|
||||
.where(
|
||||
and_(
|
||||
cls.mixid == mixid,
|
||||
cls.sourcehash == mix.sourcehash,
|
||||
cls.userid == get_current_userid(),
|
||||
)
|
||||
)
|
||||
.values(mixdict),
|
||||
commit=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def save_artist_mix(cls, sourcehash: str):
|
||||
"""
|
||||
Toggles the saved status of an artist mix.
|
||||
"""
|
||||
|
||||
mix = cls.get_by_sourcehash(sourcehash)
|
||||
|
||||
if not mix:
|
||||
return False
|
||||
|
||||
mix.saved = not mix.saved
|
||||
cls.update_one(mix.id, mix)
|
||||
|
||||
return mix.saved
|
||||
|
||||
@classmethod
|
||||
def get_saved_track_mixes(cls):
|
||||
"""
|
||||
Return all mixes that have the extra.trackmix_saved set to True.
|
||||
"""
|
||||
|
||||
result = cls.execute(select(cls).where(cls.extra.c.trackmix_saved == True))
|
||||
return Mix.mixes_to_dataclasses(result.fetchall())
|
||||
|
||||
@classmethod
|
||||
def save_track_mix(cls, sourcehash: str):
|
||||
"""
|
||||
Toggles the property extra.trackmix_saved to True.
|
||||
"""
|
||||
|
||||
mix = cls.get_by_sourcehash(sourcehash)
|
||||
if not mix:
|
||||
return False
|
||||
|
||||
mix.extra["trackmix_saved"] = not mix.extra.get("trackmix_saved", False)
|
||||
cls.update_one(mix.id, mix)
|
||||
|
||||
return mix.extra["trackmix_saved"]
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
from app.db.userdata import MixTable
|
||||
from app.plugins.mixes import MixesPlugin
|
||||
|
||||
|
||||
def find_mix(mixid: str, sourcehash: str):
|
||||
"""
|
||||
Find a mix in the homepage store or the db.
|
||||
"""
|
||||
from app.store.homepage import HomepageStore
|
||||
|
||||
mixtype = "custom_mixes" if mixid[0] == "t" else "artist_mixes"
|
||||
|
||||
# INFO: Try getting the mix from the homepage store
|
||||
mix = HomepageStore.get_mix(mixtype, mixid)
|
||||
if mix and mix["sourcehash"] == sourcehash:
|
||||
return mix
|
||||
|
||||
# INFO: Get the mix from the db
|
||||
mix = MixTable.get_by_sourcehash(sourcehash)
|
||||
|
||||
if not mix:
|
||||
return None
|
||||
|
||||
if mixtype == "custom_mixes":
|
||||
mix = MixesPlugin.get_track_mix(mix)
|
||||
|
||||
if not mix:
|
||||
return None
|
||||
|
||||
return mix.to_dict()
|
||||
@@ -0,0 +1,158 @@
|
||||
import os
|
||||
from app.db.userdata import PlaylistTable
|
||||
from app.lib.home import find_mix
|
||||
from app.lib.home.recentlyadded import get_recently_added_playlist
|
||||
from app.lib.home.recentlyplayed import get_recently_played_playlist
|
||||
from app.models.logger import TrackLog
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.artists import ArtistStore
|
||||
from app.store.tracks import TrackStore
|
||||
|
||||
|
||||
def create_items(entries: list[TrackLog], limit: int):
|
||||
custom_playlists = [
|
||||
{"name": "recentlyadded", "handler": get_recently_added_playlist},
|
||||
{"name": "recentlyplayed", "handler": get_recently_played_playlist},
|
||||
]
|
||||
|
||||
items = []
|
||||
added = set()
|
||||
|
||||
for entry in entries:
|
||||
if len(items) >= limit:
|
||||
break
|
||||
|
||||
if entry.source in added:
|
||||
continue
|
||||
|
||||
added.add(entry.source)
|
||||
|
||||
if entry.type == "mix":
|
||||
if not entry.type_src:
|
||||
continue
|
||||
|
||||
splits = entry.type_src.split(".")
|
||||
|
||||
try:
|
||||
mixid = splits[0]
|
||||
sourcehash = splits[1]
|
||||
except IndexError:
|
||||
continue
|
||||
|
||||
# INFO: Get mix from homepage store
|
||||
mix = find_mix(mixid, sourcehash)
|
||||
if not mix:
|
||||
continue
|
||||
|
||||
items.append(
|
||||
{
|
||||
"type": "mix",
|
||||
"hash": entry.type_src,
|
||||
"timestamp": entry.timestamp,
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
if entry.type == "album":
|
||||
album = AlbumStore.albummap.get(entry.type_src)
|
||||
|
||||
if album is None:
|
||||
continue
|
||||
|
||||
item = {
|
||||
"type": "album",
|
||||
"hash": entry.type_src,
|
||||
"timestamp": entry.timestamp,
|
||||
}
|
||||
|
||||
items.append(item)
|
||||
continue
|
||||
|
||||
if entry.type == "artist":
|
||||
artist = ArtistStore.artistmap.get(entry.type_src)
|
||||
|
||||
if artist is None:
|
||||
continue
|
||||
|
||||
items.append(
|
||||
{
|
||||
"type": "artist",
|
||||
"hash": entry.type_src,
|
||||
"timestamp": entry.timestamp,
|
||||
}
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
if entry.type == "folder":
|
||||
folder = entry.type_src
|
||||
|
||||
if not folder:
|
||||
continue
|
||||
|
||||
if not folder.endswith("/"):
|
||||
folder += "/"
|
||||
|
||||
is_home_dir = entry.type_src == "$home"
|
||||
|
||||
if is_home_dir:
|
||||
folder = os.path.expanduser("~")
|
||||
|
||||
item = {
|
||||
"type": "folder",
|
||||
"hash": entry.type_src,
|
||||
"timestamp": entry.timestamp,
|
||||
}
|
||||
|
||||
items.append(item)
|
||||
continue
|
||||
|
||||
if entry.type == "playlist":
|
||||
is_custom = entry.type_src in [i["name"] for i in custom_playlists]
|
||||
|
||||
if is_custom:
|
||||
items.append(
|
||||
{
|
||||
"type": "playlist",
|
||||
"hash": entry.type_src,
|
||||
"timestamp": entry.timestamp,
|
||||
"is_custom": True,
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
playlist = PlaylistTable.get_by_id(entry.type_src)
|
||||
if playlist is None:
|
||||
continue
|
||||
|
||||
item = {
|
||||
"type": "playlist",
|
||||
"hash": entry.type_src,
|
||||
"timestamp": entry.timestamp,
|
||||
}
|
||||
|
||||
items.append(item)
|
||||
continue
|
||||
|
||||
if entry.type == "favorite":
|
||||
items.append(
|
||||
{
|
||||
"type": "favorite",
|
||||
"timestamp": entry.timestamp,
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
t = TrackStore.trackhashmap.get(entry.trackhash)
|
||||
|
||||
if t is None:
|
||||
continue
|
||||
|
||||
item = {
|
||||
"type": "track",
|
||||
"hash": entry.trackhash,
|
||||
"timestamp": entry.timestamp,
|
||||
}
|
||||
items.append(item)
|
||||
|
||||
return items
|
||||
@@ -0,0 +1,47 @@
|
||||
from app.db.userdata import ScrobbleTable
|
||||
from app.lib.home.create_items import create_items
|
||||
from app.models.logger import TrackLog
|
||||
|
||||
|
||||
def get_recently_played(
|
||||
limit: int, userid: int | None = None, _entries: list[TrackLog] = []
|
||||
):
|
||||
"""
|
||||
Get the recently played items for the homepage.
|
||||
|
||||
Pass a list of track log entries to use a subset of the scrobble table.
|
||||
"""
|
||||
# TODO: Paginate this
|
||||
items = []
|
||||
|
||||
BATCH_SIZE = 200
|
||||
current_index = 0
|
||||
|
||||
if len(_entries):
|
||||
entries = _entries
|
||||
limit = 1
|
||||
else:
|
||||
entries = ScrobbleTable.get_all(0, BATCH_SIZE, userid=userid)
|
||||
|
||||
max_iterations = 20
|
||||
iterations = 0
|
||||
|
||||
while len(items) < limit and iterations < max_iterations:
|
||||
items.extend(create_items(entries, limit))
|
||||
current_index += BATCH_SIZE
|
||||
|
||||
if len(items) < limit:
|
||||
entries = ScrobbleTable.get_all(
|
||||
start=current_index + 1, limit=BATCH_SIZE, userid=userid
|
||||
)
|
||||
if not entries:
|
||||
break
|
||||
|
||||
iterations += 1
|
||||
|
||||
if iterations == max_iterations:
|
||||
print(
|
||||
f"Warning: Reached maximum iterations ({max_iterations}) while fetching recently played items"
|
||||
)
|
||||
|
||||
return items
|
||||
@@ -1,5 +1,4 @@
|
||||
from datetime import datetime
|
||||
from time import time
|
||||
|
||||
from app.lib.playlistlib import get_first_4_images
|
||||
from app.models.playlist import Playlist
|
||||
@@ -9,17 +8,12 @@ from app.store.tracks import TrackStore
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.artists import ArtistStore
|
||||
|
||||
from app.serializers.track import serialize_track
|
||||
from app.serializers.album import album_serializer
|
||||
from app.serializers.artist import serialize_for_card
|
||||
|
||||
from itertools import groupby
|
||||
|
||||
from app.utils import flatten
|
||||
from app.utils.dates import (
|
||||
create_new_date,
|
||||
date_string_to_time_passed,
|
||||
timestamp_to_time_passed,
|
||||
)
|
||||
|
||||
older_albums = set()
|
||||
@@ -59,15 +53,13 @@ def create_track(t: Track):
|
||||
"""
|
||||
Creates a recently added track entry.
|
||||
"""
|
||||
track = serialize_track(t, to_remove={"created_date"})
|
||||
track["help_text"] = "NEW TRACK"
|
||||
|
||||
return {
|
||||
"type": "track",
|
||||
"item": track,
|
||||
"hash": t.trackhash,
|
||||
"timestamp": t.last_mod,
|
||||
"help_text": "NEW TRACK",
|
||||
}
|
||||
|
||||
|
||||
# INFO: Keys: folder, tracks, time (timestamp)
|
||||
# group_type = dict[str, str | list[Track] | float]
|
||||
|
||||
@@ -83,7 +75,7 @@ def check_folder_type(group_: dict):
|
||||
|
||||
if len(tracks) == 1:
|
||||
entry = create_track(tracks[0])
|
||||
entry["item"]["time"] = timestamp_to_time_passed(time)
|
||||
entry["timestamp"] = time
|
||||
return entry
|
||||
|
||||
is_album, albumhash, _ = check_is_album_folder(tracks)
|
||||
@@ -94,26 +86,13 @@ def check_folder_type(group_: dict):
|
||||
if entry is None:
|
||||
return None
|
||||
|
||||
album = album_serializer(
|
||||
entry.album,
|
||||
to_remove={
|
||||
"genres",
|
||||
"og_title",
|
||||
"date",
|
||||
"duration",
|
||||
"count",
|
||||
"albumartists_hashes",
|
||||
"base_title",
|
||||
},
|
||||
)
|
||||
album["help_text"] = (
|
||||
"NEW ALBUM" if albumhash in existing_album_hashes else "NEW TRACKS"
|
||||
)
|
||||
album["time"] = timestamp_to_time_passed(time)
|
||||
|
||||
return {
|
||||
"type": "album",
|
||||
"item": album,
|
||||
"hash": albumhash,
|
||||
"timestamp": time,
|
||||
"help_text": (
|
||||
"NEW ALBUM" if albumhash in existing_album_hashes else "NEW TRACKS"
|
||||
),
|
||||
}
|
||||
|
||||
is_artist, artisthash, trackcount = check_is_artist_folder(tracks)
|
||||
@@ -123,16 +102,13 @@ def check_folder_type(group_: dict):
|
||||
if entry is None:
|
||||
return None
|
||||
|
||||
artist = serialize_for_card(entry.artist)
|
||||
artist["trackcount"] = trackcount
|
||||
artist["help_text"] = (
|
||||
"NEW ARTIST" if artisthash not in existing_artist_hashes else "NEW MUSIC"
|
||||
)
|
||||
artist["time"] = timestamp_to_time_passed(time)
|
||||
|
||||
return {
|
||||
"type": "artist",
|
||||
"item": artist,
|
||||
"hash": artisthash,
|
||||
"timestamp": time,
|
||||
"help_text": (
|
||||
"NEW ARTIST" if artisthash not in existing_artist_hashes else "NEW MUSIC"
|
||||
),
|
||||
}
|
||||
|
||||
is_track_folder = check_is_track_folder(tracks)
|
||||
@@ -142,12 +118,9 @@ def check_folder_type(group_: dict):
|
||||
if is_track_folder
|
||||
else {
|
||||
"type": "folder",
|
||||
"item": {
|
||||
"path": key,
|
||||
"count": len(tracks),
|
||||
"hash": key,
|
||||
"timestamp": time,
|
||||
"help_text": "NEW MUSIC",
|
||||
"time": timestamp_to_time_passed(time),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -1,229 +1,13 @@
|
||||
from datetime import datetime
|
||||
import os
|
||||
from app.db.userdata import FavoritesTable, PlaylistTable, ScrobbleTable
|
||||
|
||||
from app.models.logger import TrackLog
|
||||
from app.models.playlist import Playlist
|
||||
from app.serializers.track import serialize_track
|
||||
from app.serializers.album import album_serializer
|
||||
from app.lib.playlistlib import get_first_4_images
|
||||
from app.store.folder import FolderStore
|
||||
from app.utils.dates import (
|
||||
create_new_date,
|
||||
date_string_to_time_passed,
|
||||
timestamp_to_time_passed,
|
||||
)
|
||||
from app.serializers.artist import serialize_for_card
|
||||
from app.serializers.playlist import serialize_for_card as serialize_playlist
|
||||
from app.lib.home.recentlyadded import get_recently_added_playlist
|
||||
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.tracks import TrackStore
|
||||
from app.store.artists import ArtistStore
|
||||
|
||||
|
||||
def get_recently_played(limit=7):
|
||||
# TODO: Paginate this
|
||||
items = []
|
||||
added = set()
|
||||
|
||||
custom_playlists = [
|
||||
{"name": "recentlyadded", "handler": get_recently_added_playlist},
|
||||
{"name": "recentlyplayed", "handler": get_recently_played_playlist},
|
||||
]
|
||||
|
||||
def create_items(entries: list[TrackLog]):
|
||||
for entry in entries:
|
||||
if len(items) >= limit:
|
||||
break
|
||||
|
||||
if entry.source in added:
|
||||
continue
|
||||
|
||||
added.add(entry.source)
|
||||
|
||||
if entry.type == "album":
|
||||
album = AlbumStore.get_album_by_hash(entry.type_src)
|
||||
|
||||
if album is None:
|
||||
continue
|
||||
|
||||
album = album_serializer(
|
||||
album,
|
||||
to_remove={
|
||||
"genres",
|
||||
"date",
|
||||
"count",
|
||||
"duration",
|
||||
"albumartists_hashes",
|
||||
"og_title",
|
||||
},
|
||||
)
|
||||
album["help_text"] = "album"
|
||||
album["time"] = timestamp_to_time_passed(entry.timestamp)
|
||||
|
||||
items.append(
|
||||
{
|
||||
"type": "album",
|
||||
"item": album,
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
if entry.type == "artist":
|
||||
artist = ArtistStore.get_artist_by_hash(entry.type_src)
|
||||
|
||||
if artist is None:
|
||||
continue
|
||||
|
||||
artist = serialize_for_card(artist)
|
||||
artist["help_text"] = "artist"
|
||||
artist["time"] = timestamp_to_time_passed(entry.timestamp)
|
||||
|
||||
items.append(
|
||||
{
|
||||
"type": "artist",
|
||||
"item": artist,
|
||||
}
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
if entry.type == "folder":
|
||||
folder = entry.type_src
|
||||
|
||||
if not folder:
|
||||
continue
|
||||
|
||||
if not folder.endswith("/"):
|
||||
folder += "/"
|
||||
|
||||
is_home_dir = entry.type_src == "$home"
|
||||
|
||||
if is_home_dir:
|
||||
folder = os.path.expanduser("~")
|
||||
|
||||
# print(folder)
|
||||
# folder = os.path.join("/", folder, "")
|
||||
# print(folder)
|
||||
# count = len([t for t in TrackStore.tracks if t.folder == folder])
|
||||
count = FolderStore.count_tracks_containing_paths([folder])
|
||||
items.append(
|
||||
{
|
||||
"type": "folder",
|
||||
"item": {
|
||||
"path": folder,
|
||||
"count": count[0]["trackcount"],
|
||||
"help_text": "folder",
|
||||
"time": timestamp_to_time_passed(entry.timestamp),
|
||||
},
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
if entry.type == "playlist":
|
||||
is_custom = entry.type_src in [i["name"] for i in custom_playlists]
|
||||
# is_recently_added = entry.type_src == "recentlyadded"
|
||||
|
||||
if is_custom:
|
||||
playlist, _ = next(
|
||||
i["handler"]()
|
||||
for i in custom_playlists
|
||||
if i["name"] == entry.type_src
|
||||
)
|
||||
playlist.images = [i["image"] for i in playlist.images]
|
||||
|
||||
playlist = serialize_playlist(
|
||||
playlist, to_remove={"settings", "duration"}
|
||||
)
|
||||
|
||||
playlist["help_text"] = "playlist"
|
||||
playlist["time"] = timestamp_to_time_passed(entry.timestamp)
|
||||
|
||||
items.append(
|
||||
{
|
||||
"type": "playlist",
|
||||
"item": playlist,
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
playlist = PlaylistTable.get_by_id(entry.type_src)
|
||||
if playlist is None:
|
||||
continue
|
||||
|
||||
tracks = TrackStore.get_tracks_by_trackhashes(playlist.trackhashes)
|
||||
playlist.clear_lists()
|
||||
|
||||
if not playlist.has_image:
|
||||
images = get_first_4_images(tracks)
|
||||
images = [i["image"] for i in images]
|
||||
playlist.images = images
|
||||
|
||||
items.append(
|
||||
{
|
||||
"type": "playlist",
|
||||
"item": {
|
||||
"help_text": "playlist",
|
||||
"time": timestamp_to_time_passed(entry.timestamp),
|
||||
**serialize_playlist(playlist),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
if entry.type == "favorite":
|
||||
items.append(
|
||||
{
|
||||
"type": "favorite_tracks",
|
||||
"item": {
|
||||
"help_text": "playlist",
|
||||
"count": FavoritesTable.count(),
|
||||
"time": timestamp_to_time_passed(entry.timestamp),
|
||||
},
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
t = TrackStore.trackhashmap.get(entry.trackhash)
|
||||
|
||||
if t is None:
|
||||
continue
|
||||
|
||||
track = serialize_track(t.get_best())
|
||||
track["help_text"] = "track"
|
||||
track["time"] = timestamp_to_time_passed(entry.timestamp)
|
||||
|
||||
items.append(
|
||||
{
|
||||
"type": "track",
|
||||
"item": track,
|
||||
}
|
||||
)
|
||||
|
||||
BATCH_SIZE = 200
|
||||
current_index = 0
|
||||
|
||||
entries = ScrobbleTable.get_all(0, BATCH_SIZE)
|
||||
max_iterations = 20 # Safeguard against unexpected infinite loops
|
||||
iterations = 0
|
||||
|
||||
while len(items) < limit and iterations < max_iterations:
|
||||
create_items(entries)
|
||||
current_index += BATCH_SIZE
|
||||
|
||||
if len(items) < limit:
|
||||
entries = ScrobbleTable.get_all(current_index + 1, BATCH_SIZE)
|
||||
if not entries:
|
||||
break
|
||||
|
||||
iterations += 1
|
||||
|
||||
if iterations == max_iterations:
|
||||
print(
|
||||
f"Warning: Reached maximum iterations ({max_iterations}) while fetching recently played items"
|
||||
)
|
||||
|
||||
return items
|
||||
|
||||
|
||||
def get_recently_played_playlist(limit: int = 100):
|
||||
|
||||
@@ -0,0 +1,151 @@
|
||||
from app.db.userdata import FavoritesTable, MixTable, PlaylistTable
|
||||
from app.lib.home import find_mix
|
||||
from app.lib.home.recentlyadded import get_recently_added_playlist
|
||||
from app.lib.home.recentlyplayed import get_recently_played_playlist
|
||||
from app.lib.playlistlib import get_first_4_images
|
||||
from app.serializers.album import album_serializer
|
||||
from app.serializers.artist import serialize_for_card
|
||||
from app.serializers.playlist import serialize_for_card as serialize_playlist
|
||||
from app.serializers.track import serialize_track
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.artists import ArtistStore
|
||||
from app.store.folder import FolderStore
|
||||
from app.store.tracks import TrackStore
|
||||
from app.utils.dates import timestamp_to_time_passed
|
||||
|
||||
|
||||
def recover_items(items: list[dict]):
|
||||
custom_playlists = [
|
||||
{"name": "recentlyadded", "handler": get_recently_added_playlist},
|
||||
{"name": "recentlyplayed", "handler": get_recently_played_playlist},
|
||||
]
|
||||
recovered = []
|
||||
|
||||
for item in items:
|
||||
recovered_item = None
|
||||
|
||||
if item["type"] == "album":
|
||||
album = AlbumStore.get_album_by_hash(item["hash"])
|
||||
if album is None:
|
||||
continue
|
||||
|
||||
album = album_serializer(
|
||||
album,
|
||||
to_remove={
|
||||
"genres",
|
||||
"date",
|
||||
"count",
|
||||
"duration",
|
||||
"albumartists_hashes",
|
||||
"og_title",
|
||||
},
|
||||
)
|
||||
|
||||
recovered_item = {
|
||||
"type": "album",
|
||||
"item": album,
|
||||
}
|
||||
elif item["type"] == "artist":
|
||||
artist = ArtistStore.get_artist_by_hash(item["hash"])
|
||||
if artist is None:
|
||||
continue
|
||||
|
||||
recovered_item = {
|
||||
"type": "artist",
|
||||
"item": serialize_for_card(artist),
|
||||
}
|
||||
elif item["type"] == "folder":
|
||||
count = FolderStore.count_tracks_containing_paths([item["hash"]])
|
||||
|
||||
recovered_item = {
|
||||
"type": "folder",
|
||||
"item": {
|
||||
"path": item["hash"],
|
||||
"count": count[0]["trackcount"],
|
||||
},
|
||||
}
|
||||
elif item["type"] == "playlist":
|
||||
if item.get("is_custom"):
|
||||
playlist, _ = next(
|
||||
i["handler"]()
|
||||
for i in custom_playlists
|
||||
if i["name"] == item["hash"]
|
||||
)
|
||||
playlist.images = [i["image"] for i in playlist.images]
|
||||
|
||||
playlist = serialize_playlist(
|
||||
playlist, to_remove={"settings", "duration"}
|
||||
)
|
||||
recovered_item = {
|
||||
"type": "playlist",
|
||||
"item": playlist,
|
||||
}
|
||||
else:
|
||||
playlist = PlaylistTable.get_by_id(item["hash"])
|
||||
if playlist is None:
|
||||
continue
|
||||
|
||||
tracks = TrackStore.get_tracks_by_trackhashes(playlist.trackhashes)
|
||||
playlist.clear_lists()
|
||||
|
||||
if not playlist.has_image:
|
||||
images = get_first_4_images(tracks)
|
||||
images = [i["image"] for i in images]
|
||||
playlist.images = images
|
||||
|
||||
recovered_item = {
|
||||
"type": "playlist",
|
||||
"item": serialize_playlist(playlist),
|
||||
}
|
||||
elif item["type"] == "favorite":
|
||||
recovered_item = {
|
||||
"type": "favorite",
|
||||
"item": {
|
||||
"count": FavoritesTable.count(),
|
||||
},
|
||||
}
|
||||
elif item["type"] == "track":
|
||||
track = TrackStore.trackhashmap.get(item["hash"])
|
||||
if track is None:
|
||||
continue
|
||||
|
||||
recovered_item = {
|
||||
"type": "track",
|
||||
"item": serialize_track(track.get_best()),
|
||||
}
|
||||
|
||||
elif item["type"] == "mix":
|
||||
try:
|
||||
splits = item["hash"].split(".")
|
||||
mixid = splits[0]
|
||||
sourcehash = splits[1]
|
||||
except IndexError:
|
||||
continue
|
||||
|
||||
mix = find_mix(mixid, sourcehash)
|
||||
if mix is None:
|
||||
continue
|
||||
|
||||
recovered_item = {
|
||||
"type": "mix",
|
||||
"item": mix,
|
||||
}
|
||||
|
||||
if recovered_item is not None:
|
||||
helptext = item.get("help_text") or item.get("type")
|
||||
secondary_text = item.get("secondary_text")
|
||||
|
||||
if "secondary_text" in item:
|
||||
secondary_text = item["secondary_text"]
|
||||
elif "timestamp" in item:
|
||||
secondary_text = timestamp_to_time_passed(item["timestamp"])
|
||||
|
||||
if helptext:
|
||||
recovered_item["item"]["help_text"] = helptext
|
||||
|
||||
if secondary_text:
|
||||
recovered_item["item"]["time"] = secondary_text
|
||||
|
||||
recovered.append(recovered_item)
|
||||
|
||||
return recovered
|
||||
@@ -7,6 +7,7 @@ from app.lib.mapstuff import (
|
||||
map_scrobble_data,
|
||||
)
|
||||
from app.lib.populate import CordinateMedia
|
||||
from app.lib.recipes.recents import RecentlyAdded
|
||||
from app.lib.tagger import IndexTracks
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.artists import ArtistStore
|
||||
@@ -25,6 +26,9 @@ class IndexEverything:
|
||||
ArtistStore.load_artists(key)
|
||||
FolderStore.load_filepaths()
|
||||
|
||||
# NOTE: Rebuild recently added items on the homepage store
|
||||
RecentlyAdded()
|
||||
|
||||
# map colors
|
||||
map_album_colors()
|
||||
map_artist_colors()
|
||||
|
||||
@@ -10,7 +10,6 @@ from typing import Any
|
||||
from PIL import Image, ImageSequence
|
||||
|
||||
from app import settings
|
||||
from app.db.libdata import TrackTable
|
||||
from app.models.track import Track
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.tracks import TrackStore
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
"""
|
||||
Recipes are a way to create mixes.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, List
|
||||
|
||||
class HomepageRoutine(ABC):
|
||||
"""
|
||||
A routine creates a row of homepage items.
|
||||
"""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def is_valid(self) -> bool: ...
|
||||
|
||||
def __init__(self) -> None:
|
||||
if not self.is_valid:
|
||||
return
|
||||
|
||||
self.run()
|
||||
|
||||
@abstractmethod
|
||||
def run(self) -> List[Any]:
|
||||
"""
|
||||
Creates the homepage items and saves them to the
|
||||
homepage store if self.is_valid is true.
|
||||
"""
|
||||
...
|
||||
@@ -0,0 +1,38 @@
|
||||
from app.db.userdata import UserTable
|
||||
from app.lib.recipes import HomepageRoutine
|
||||
from app.plugins.mixes import MixesPlugin
|
||||
from app.store.homepage import HomepageStore
|
||||
|
||||
|
||||
class ArtistMixes(HomepageRoutine):
|
||||
store_key = "artist_mixes"
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
return MixesPlugin().enabled
|
||||
|
||||
def run(self):
|
||||
users = UserTable.get_all()
|
||||
|
||||
for user in users:
|
||||
mix = MixesPlugin()
|
||||
mixes = mix.create_artist_mixes(user.id)
|
||||
|
||||
if not mixes:
|
||||
continue
|
||||
|
||||
HomepageStore.set_mixes(mixes, entrykey=self.store_key, userid=user.id)
|
||||
|
||||
custom_mixes = []
|
||||
for _mix in mixes:
|
||||
custom_mix = MixesPlugin.get_track_mix(_mix)
|
||||
|
||||
if custom_mix:
|
||||
custom_mixes.append(custom_mix)
|
||||
|
||||
HomepageStore.set_mixes(
|
||||
custom_mixes, entrykey="custom_mixes", userid=user.id
|
||||
)
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
@@ -0,0 +1,37 @@
|
||||
from pprint import pprint
|
||||
from app.db.userdata import UserTable
|
||||
from app.lib.recipes import HomepageRoutine
|
||||
from app.lib.recipes.artistmixes import ArtistMixes
|
||||
from app.models.mix import Mix
|
||||
from app.plugins.mixes import MixesPlugin
|
||||
from app.store.homepage import HomepageStore
|
||||
|
||||
|
||||
class BecauseYouListened(HomepageRoutine):
|
||||
store_keys = ["because_you_listened_to_artist", "artists_you_might_like"]
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
return MixesPlugin().enabled
|
||||
|
||||
def run(self):
|
||||
users = UserTable.get_all()
|
||||
|
||||
for user in users:
|
||||
entry: dict[str, Mix] = HomepageStore.entries.get(
|
||||
ArtistMixes.store_key
|
||||
).items.get(user.id) # type: ignore
|
||||
|
||||
if not entry:
|
||||
continue
|
||||
|
||||
because_you_listened_to_artist, artists_you_might_like = (
|
||||
MixesPlugin().get_because_items(list(entry.values()))
|
||||
)
|
||||
|
||||
HomepageStore.entries[self.store_keys[0]].items[
|
||||
user.id
|
||||
] = because_you_listened_to_artist
|
||||
HomepageStore.entries[self.store_keys[1]].items[
|
||||
user.id
|
||||
] = artists_you_might_like
|
||||
@@ -0,0 +1,91 @@
|
||||
import pprint
|
||||
from app.db.userdata import ScrobbleTable, UserTable
|
||||
from app.lib.home.recentlyadded import get_recently_added_items
|
||||
from app.lib.home.get_recently_played import get_recently_played
|
||||
from app.lib.recipes import HomepageRoutine
|
||||
from app.store.homepage import HomepageStore
|
||||
|
||||
|
||||
class RecentlyPlayed(HomepageRoutine):
|
||||
ITEM_LIMIT = 15
|
||||
store_key = "recently_played"
|
||||
|
||||
def __init__(self, userid: int | None = None) -> None:
|
||||
"""
|
||||
The userid is provided when we are running this routine
|
||||
outside a cron job. ie. when a user records a new scrobble.
|
||||
"""
|
||||
self.userids = [userid] if userid else [user.id for user in UserTable.get_all()]
|
||||
|
||||
# NOTE: When the userid is provided
|
||||
# we need to update the store for that userid only
|
||||
# using the last scrobble entry.
|
||||
self.update_only = userid is not None
|
||||
super().__init__()
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
return True
|
||||
|
||||
def run(self):
|
||||
if self.update_only:
|
||||
last_entry = ScrobbleTable.get_last_entry(self.userids[0])
|
||||
|
||||
if last_entry:
|
||||
items = get_recently_played(
|
||||
limit=self.ITEM_LIMIT, userid=self.userids[0], _entries=[last_entry]
|
||||
)
|
||||
|
||||
item = items[0]
|
||||
store_entry = HomepageStore.entries[self.store_key].items[
|
||||
self.userids[0]
|
||||
][0]
|
||||
|
||||
if (
|
||||
item["type"] + item["hash"]
|
||||
== store_entry["type"] + store_entry["hash"]
|
||||
):
|
||||
# If the item is the same as the one in the store
|
||||
# only update the timestamp
|
||||
HomepageStore.entries[self.store_key].items[self.userids[0]][0][
|
||||
"timestamp"
|
||||
] = item["timestamp"]
|
||||
else:
|
||||
# Otherwise, insert the new item
|
||||
# and remove the oldest item if there are more than 15 items
|
||||
HomepageStore.entries[self.store_key].items[self.userids[0]].insert(
|
||||
0, item
|
||||
)
|
||||
|
||||
if (
|
||||
len(
|
||||
HomepageStore.entries[self.store_key].items[self.userids[0]]
|
||||
)
|
||||
> self.ITEM_LIMIT
|
||||
):
|
||||
HomepageStore.entries[self.store_key].items[
|
||||
self.userids[0]
|
||||
].pop()
|
||||
|
||||
for userid in self.userids:
|
||||
items = get_recently_played(limit=self.ITEM_LIMIT, userid=userid)
|
||||
HomepageStore.entries[self.store_key].items[userid] = items
|
||||
|
||||
|
||||
class RecentlyAdded(HomepageRoutine):
|
||||
ITEM_LIMIT = 15
|
||||
store_key = "recently_added"
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
return True
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def run(self):
|
||||
items = get_recently_added_items(limit=self.ITEM_LIMIT)
|
||||
|
||||
# NOTE: Recently added is a global entry
|
||||
# So we don't need a userid
|
||||
HomepageStore.entries[self.store_key].items[0] = items
|
||||
@@ -0,0 +1,83 @@
|
||||
from gettext import ngettext
|
||||
import pendulum
|
||||
|
||||
from app.crons.cron import CronJob
|
||||
from app.db.userdata import UserTable
|
||||
from app.lib.recipes import HomepageRoutine
|
||||
from app.store.homepage import HomepageStore
|
||||
from app.utils.dates import get_date_range, seconds_to_time_string
|
||||
from app.utils.stats import get_artists_in_period
|
||||
|
||||
|
||||
class TopArtists(CronJob, HomepageRoutine):
|
||||
"""
|
||||
A routine to populate the top streamed artists/albums in the last week or month
|
||||
"""
|
||||
|
||||
hours = 1
|
||||
ITEM_LIMIT = 15
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
"""
|
||||
Only valid if it's the middle or last 2 days of this month.
|
||||
|
||||
When the duration is "week", it's valid on the weekend.
|
||||
"""
|
||||
if self.duration == "month":
|
||||
now = pendulum.now()
|
||||
middle_day = now.days_in_month // 2
|
||||
|
||||
return (
|
||||
now.day in range(middle_day, middle_day + 2)
|
||||
or now.day > now.days_in_month - 2
|
||||
)
|
||||
if self.duration == "week":
|
||||
return pendulum.now().isoweekday() in (5, 6, 7)
|
||||
|
||||
return False
|
||||
|
||||
def __init__(self, duration: str = "month") -> None:
|
||||
super().__init__()
|
||||
self.duration = duration
|
||||
|
||||
if not self.is_valid:
|
||||
return
|
||||
|
||||
def run(self):
|
||||
if not self.is_valid:
|
||||
self.destroy()
|
||||
return
|
||||
|
||||
self.userids = [user.id for user in UserTable.get_all()]
|
||||
|
||||
for userid in self.userids:
|
||||
date_range = get_date_range(self.duration)
|
||||
artists = get_artists_in_period(date_range[0], date_range[1], userid)[
|
||||
: self.ITEM_LIMIT
|
||||
]
|
||||
|
||||
artists = [
|
||||
{
|
||||
"type": "artist",
|
||||
"hash": artist["artisthash"],
|
||||
"help_text": seconds_to_time_string(artist["playduration"]),
|
||||
"secondary_text": str(artist["playcount"])
|
||||
+ " "
|
||||
+ ngettext("play", "plays", artist["playcount"]),
|
||||
}
|
||||
for artist in artists
|
||||
]
|
||||
|
||||
HomepageStore.entries[f"top_streamed_{self.duration}ly_artists"].items[
|
||||
userid
|
||||
] = artists
|
||||
|
||||
def destroy(self):
|
||||
"""
|
||||
Clear the top streamed entry from the homepage store.
|
||||
"""
|
||||
keys = [f"top_streamed_{self.duration}ly_artists"]
|
||||
|
||||
for key in keys:
|
||||
HomepageStore.entries[key].items = {}
|
||||
@@ -36,6 +36,7 @@ class Album:
|
||||
image: str = ""
|
||||
versions: list[str] = dataclasses.field(default_factory=list)
|
||||
fav_userids: list[int] = dataclasses.field(default_factory=list)
|
||||
weakhash: str = ""
|
||||
|
||||
@property
|
||||
def is_favorite(self):
|
||||
@@ -54,6 +55,9 @@ class Album:
|
||||
def __post_init__(self):
|
||||
self.image = self.albumhash + ".webp"
|
||||
self.populate_versions()
|
||||
self.weakhash = create_hash(
|
||||
self.og_title, ",".join(a["name"] for a in self.albumartists)
|
||||
)
|
||||
|
||||
def populate_versions(self):
|
||||
_, self.versions = get_base_title_and_versions(self.og_title, get_versions=True)
|
||||
|
||||
@@ -13,18 +13,25 @@ class TrackLog:
|
||||
duration: int
|
||||
timestamp: int
|
||||
source: str
|
||||
"""
|
||||
The full source string, eg. "al:123456"
|
||||
"""
|
||||
userid: int
|
||||
extra: dict[str, Any]
|
||||
|
||||
type = "track"
|
||||
type_src = None
|
||||
"""
|
||||
The source identifier string, eg. albumhash, artisthash, etc.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
prefix_map = {
|
||||
"mix:": "mix",
|
||||
"al:": "album",
|
||||
"ar:": "artist",
|
||||
"pl:": "playlist",
|
||||
"fo:": "folder",
|
||||
"pl:": "playlist",
|
||||
"favorite": "favorite",
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
import time
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from app.lib.playlistlib import get_first_4_images
|
||||
from app.serializers.track import serialize_tracks
|
||||
from app.store.tracks import TrackStore
|
||||
from app.utils.dates import seconds_to_time_string, timestamp_to_time_passed
|
||||
from app.utils.hashing import create_hash
|
||||
|
||||
|
||||
@dataclass
|
||||
class Mix:
|
||||
id: str
|
||||
title: str
|
||||
description: str
|
||||
tracks: list[str]
|
||||
sourcehash: str
|
||||
userid: int
|
||||
"""
|
||||
A hash of the tracks used to generate the mix.
|
||||
"""
|
||||
|
||||
timestamp: int = field(default_factory=lambda: int(time.time()))
|
||||
extra: dict = field(default_factory=dict)
|
||||
saved: bool = False
|
||||
|
||||
def to_full_dict(self):
|
||||
tracks = TrackStore.get_tracks_by_trackhashes(self.tracks)[:40]
|
||||
serialized_tracks = serialize_tracks(tracks)
|
||||
|
||||
_dict = asdict(self)
|
||||
_dict["tracks"] = serialized_tracks
|
||||
|
||||
# if not self.extra.get("image"):
|
||||
# _dict["images"] = get_first_4_images(tracks)
|
||||
|
||||
_dict["duration"] = seconds_to_time_string(sum(t.duration for t in tracks))
|
||||
_dict["trackcount"] = len(tracks)
|
||||
|
||||
del _dict["extra"]["albums"]
|
||||
del _dict["extra"]["artists"]
|
||||
|
||||
return _dict
|
||||
|
||||
def to_dict(self, convert_timestamp: bool = False):
|
||||
item = asdict(self)
|
||||
item["trackshash"] = create_hash(*self.tracks[:40])
|
||||
|
||||
if convert_timestamp:
|
||||
item["time"] = timestamp_to_time_passed(item["timestamp"])
|
||||
|
||||
del item["tracks"]
|
||||
|
||||
del item["extra"]["albums"]
|
||||
del item["extra"]["artists"]
|
||||
|
||||
return item
|
||||
|
||||
@classmethod
|
||||
def mix_to_dataclass(cls, entry: Any):
|
||||
entry_dict = entry._asdict()
|
||||
entry_dict["id"] = entry_dict["mixid"]
|
||||
del entry_dict["mixid"]
|
||||
|
||||
return Mix(**entry_dict)
|
||||
|
||||
@classmethod
|
||||
def mixes_to_dataclasses(cls, entries: Any):
|
||||
return [cls.mix_to_dataclass(entry) for entry in entries]
|
||||
@@ -45,6 +45,7 @@ class Track:
|
||||
og_title: str = ""
|
||||
artisthashes: list[str] = field(default_factory=list)
|
||||
genrehashes: list[str] = field(default_factory=list)
|
||||
weakhash: str = ""
|
||||
|
||||
_pos: int = 0
|
||||
_ati: str = ""
|
||||
@@ -76,6 +77,7 @@ class Track:
|
||||
self.og_title = self.title
|
||||
self.og_album = self.album
|
||||
self.folder = self.folder + "/"
|
||||
self.weakhash = create_hash(self.title, self.artists)
|
||||
|
||||
self.image = self.albumhash + ".webp"
|
||||
self.extra = {
|
||||
|
||||
@@ -0,0 +1,606 @@
|
||||
from gettext import ngettext
|
||||
from io import BytesIO
|
||||
import json
|
||||
import random
|
||||
import time
|
||||
from urllib.parse import quote
|
||||
import requests
|
||||
from PIL import Image
|
||||
|
||||
from app.db.userdata import MixTable
|
||||
from app.models.artist import Artist
|
||||
from app.models.mix import Mix
|
||||
from app.models.track import Track
|
||||
from app.plugins import Plugin, plugin_method
|
||||
from app.settings import Paths
|
||||
from app.store.albums import AlbumStore
|
||||
from app.store.artists import ArtistStore
|
||||
from app.store.tracks import TrackStore
|
||||
from app.utils.dates import get_date_range, get_duration_ago
|
||||
from app.utils.hashing import create_hash
|
||||
from app.utils.mixes import balance_mix
|
||||
from app.utils.stats import get_artists_in_period
|
||||
|
||||
|
||||
class MixAlreadyExists(Exception):
|
||||
"""
|
||||
Raised when a mix with the same sourcehash already exists.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class MixesPlugin(Plugin):
|
||||
MAX_TRACKS_TO_FETCH = 5
|
||||
MIN_TRACK_MIX_LENGTH = 15
|
||||
MIX_TRACKS_LENGTH = 40
|
||||
|
||||
MIN_DAY_LISTEN_DURATION = 3 * 60 # 3 minutes
|
||||
MIN_WEEK_LISTEN_DURATION = 10 * 60 # 10 minutes
|
||||
MIN_MONTH_LISTEN_DURATION = 20 * 60 # 20 minutes
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("mixes", "Mixes")
|
||||
self.server = "https://smcloud.mungaist.com"
|
||||
# self.server = "http://localhost:1956"
|
||||
|
||||
# server_online = self.ping_server()
|
||||
self.set_active(True)
|
||||
|
||||
def ping_server(self):
|
||||
max_retries = 3
|
||||
retry_delay = 2 # seconds
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
requests.get(self.server, timeout=10)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(
|
||||
f"Failed to connect to the recommendation server (attempt {attempt + 1}/{max_retries})"
|
||||
)
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(retry_delay)
|
||||
continue
|
||||
|
||||
return False
|
||||
|
||||
@plugin_method
|
||||
def get_track_mix_data(self, tracks: list[Track], with_help: bool = False):
|
||||
"""
|
||||
Given a list of tracks, creates a mix by fetching data from the
|
||||
Swing Music Cloud recommendation server.
|
||||
|
||||
The server returns a list of weak trackhashes. We use these to fetch
|
||||
the matching track data from our library database. Found tracks are
|
||||
then balanced and returned as the final mix tracklist.
|
||||
|
||||
:param with_help: Whether to include the help flag in the query.
|
||||
The flag tells the server to find more data using other tracks from the same album.
|
||||
"""
|
||||
queries = [
|
||||
{
|
||||
"title": track.title,
|
||||
"artists": [a["name"] for a in track.artists],
|
||||
"album": track.og_album,
|
||||
"with_help": with_help,
|
||||
}
|
||||
for track in tracks
|
||||
]
|
||||
|
||||
try:
|
||||
response = requests.post(f"{self.server}/radio", json=queries, timeout=30)
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
|
||||
print("Failed to connect to recommendation server")
|
||||
return [], [], []
|
||||
|
||||
try:
|
||||
results = response.json()
|
||||
except json.JSONDecodeError:
|
||||
print("Failed to decode JSON response from recommendation server")
|
||||
return [], [], []
|
||||
|
||||
trackhashes: list[str] = results["tracks"]
|
||||
|
||||
trackmatches = TrackStore.get_flat_list()
|
||||
trackmatches = [t for t in trackmatches if t.weakhash in trackhashes]
|
||||
|
||||
# filter out duplicates of the same weakhash
|
||||
# group by weakhash and pick the one with the highest bitrate
|
||||
grouped: dict[str, list[Track]] = {}
|
||||
for track in trackmatches:
|
||||
grouped.setdefault(track.weakhash, []).append(track)
|
||||
|
||||
trackmatches = [
|
||||
max(group, key=lambda x: x.bitrate) for group in grouped.values()
|
||||
]
|
||||
|
||||
# sort by trackhash order
|
||||
trackmatches = sorted(trackmatches, key=lambda x: trackhashes.index(x.weakhash))
|
||||
|
||||
# if the mix is short, try to fill it up with tracks
|
||||
# from album and artist data from the cloud!
|
||||
|
||||
# Create as many filler tracks as possible
|
||||
# Then the mix length will be controlled in the Mix model
|
||||
# if len(trackmatches) < self.TRACK_MIX_LENGTH:
|
||||
if True:
|
||||
filler_tracks = self.fallback_create_artist_mix(
|
||||
similar_artists=results["artists"],
|
||||
similar_albums=results["albums"],
|
||||
omit_trackhashes={t.weakhash for t in trackmatches},
|
||||
# limit=self.TRACK_MIX_LENGTH - len(trackmatches),
|
||||
)
|
||||
trackmatches.extend(filler_tracks)
|
||||
|
||||
# try to balance the mix
|
||||
trackmatches = balance_mix(trackmatches)
|
||||
return trackmatches, results["albums"], results["artists"]
|
||||
|
||||
@plugin_method
|
||||
# def get_artist_mix(self, artisthash: str):
|
||||
# """
|
||||
# Given an artisthash, creates an artist mix using the
|
||||
# self.MAX_TRACKS_TO_FETCH most listened to tracks.
|
||||
|
||||
# Returns a tuple of the mix and the sourcehash.
|
||||
# """
|
||||
# artist = ArtistStore.artistmap[artisthash]
|
||||
# tracks = TrackStore.get_tracks_by_trackhashes(artist.trackhashes)
|
||||
|
||||
# tracks = sorted(tracks, key=lambda x: x.playduration, reverse=True)
|
||||
# sourcetracks = tracks[: self.MAX_TRACKS_TO_FETCH]
|
||||
# sourcehash = create_hash(*[t.trackhash for t in sourcetracks])
|
||||
|
||||
# if MixTable.get_by_sourcehash(sourcehash):
|
||||
# raise MixAlreadyExists()
|
||||
|
||||
# tracks, albums, artists = self.get_track_mix(tracks[: self.MAX_TRACKS_TO_FETCH])
|
||||
# return (tracks, albums, artists, sourcehash)
|
||||
|
||||
@plugin_method
|
||||
def create_artist_mixes(self, userid: int):
|
||||
"""
|
||||
Creates artist mixes for a given userid.
|
||||
"""
|
||||
mixes: list[Mix] = []
|
||||
indexed = set()
|
||||
|
||||
today_start, today_end = get_date_range(duration="day")
|
||||
last_2_days_start = get_duration_ago("day", 2)
|
||||
last_7_days_start = get_duration_ago("week")
|
||||
last_1_month_start = get_duration_ago("month")
|
||||
|
||||
artists = {
|
||||
"today": {
|
||||
"max": 4,
|
||||
"artists": get_artists_in_period(today_start, today_end, userid),
|
||||
"created": 0,
|
||||
},
|
||||
"last_2_days": {
|
||||
"max": 3,
|
||||
"artists": get_artists_in_period(
|
||||
last_2_days_start, time.time(), userid
|
||||
),
|
||||
"created": 0,
|
||||
},
|
||||
"last_7_days": {
|
||||
"max": 4,
|
||||
"artists": get_artists_in_period(
|
||||
last_7_days_start, time.time(), userid
|
||||
),
|
||||
"created": 0,
|
||||
},
|
||||
"last_1_month": {
|
||||
"max": 4,
|
||||
"artists": get_artists_in_period(
|
||||
last_1_month_start, time.time(), userid
|
||||
),
|
||||
"created": 0,
|
||||
},
|
||||
}
|
||||
|
||||
# FIXME: Make sure that different artists don't generate the same mix
|
||||
|
||||
for i, period in enumerate(artists.values()):
|
||||
# if previous period has less than its max
|
||||
# add the difference to this period's limit
|
||||
limit = period["max"]
|
||||
|
||||
if i > 0:
|
||||
previous_period = artists[list(artists.keys())[i - 1]]
|
||||
if previous_period["created"] < previous_period["max"]:
|
||||
limit += previous_period["max"] - previous_period["created"]
|
||||
|
||||
for artist in period["artists"]:
|
||||
if period["created"] >= limit:
|
||||
break
|
||||
|
||||
if artist["artisthash"] in indexed:
|
||||
continue
|
||||
|
||||
# INFO: track['tracks'] is a dict of trackhashes and their counts
|
||||
# get the trackhashes sorted by count
|
||||
trackhashes = sorted(
|
||||
artist["tracks"], key=lambda x: artist["tracks"][x], reverse=True
|
||||
)
|
||||
|
||||
mix = self.create_artist_mix(
|
||||
artist, trackhashes[: self.MAX_TRACKS_TO_FETCH], userid=userid
|
||||
)
|
||||
|
||||
if mix:
|
||||
mixes.append(mix)
|
||||
indexed.add(artist["artisthash"])
|
||||
period["created"] += 1
|
||||
|
||||
print(f"⭐⭐⭐⭐ Created {len(mixes)} mixes")
|
||||
print([m.title for m in mixes])
|
||||
return mixes
|
||||
|
||||
@classmethod
|
||||
def get_mix_description(cls, tracks: list[Track], artishash: str):
|
||||
"""
|
||||
Constructs a description for a mix by putting together the first n=4
|
||||
artists in the mix tracklist.
|
||||
"""
|
||||
first_4_artists = []
|
||||
indexed = set()
|
||||
|
||||
for track in tracks:
|
||||
if len(first_4_artists) < 4:
|
||||
if (
|
||||
track.artists[0]["artisthash"] != artishash
|
||||
and track.artists[0]["artisthash"] not in indexed
|
||||
):
|
||||
first_4_artists.append(track.artists[0])
|
||||
indexed.add(track.artists[0]["artisthash"])
|
||||
|
||||
if len(first_4_artists) == 4:
|
||||
return f"Featuring {', '.join(a['name'] for a in first_4_artists)} and more"
|
||||
|
||||
if len(first_4_artists) > 0:
|
||||
return f"Featuring {', '.join(a['name'] for a in first_4_artists)}"
|
||||
|
||||
return f"Featuring {tracks[0].artists[0]['name']}"
|
||||
|
||||
def create_artist_mix(
|
||||
self, artist: dict[str, str], trackhashes: list[str], userid: int
|
||||
):
|
||||
"""
|
||||
Given an artist dict, creates an artist mix.
|
||||
"""
|
||||
_artist = ArtistStore.artistmap.get(artist["artisthash"])
|
||||
|
||||
if not _artist:
|
||||
return None
|
||||
|
||||
tracks = TrackStore.get_tracks_by_trackhashes(trackhashes)
|
||||
# tracks = sorted(tracks, key=lambda x: x.playduration, reverse=True)
|
||||
# sourcetracks = tracks[: self.MAX_TRACKS_TO_FETCH]
|
||||
|
||||
# INFO: Sort the trackhashes when creating the sourcehash
|
||||
sourcehash = create_hash(
|
||||
*sorted(trackhashes, key=lambda x: trackhashes.index(x))
|
||||
)
|
||||
|
||||
db_mix = MixTable.get_by_sourcehash(sourcehash)
|
||||
if db_mix:
|
||||
print(f"🔍 Found existing mix for {_artist.artist.name}")
|
||||
print(db_mix.title)
|
||||
return db_mix
|
||||
|
||||
mix_tracks, albums, artists = self.get_track_mix_data(tracks)
|
||||
|
||||
if len(mix_tracks) < self.MIN_TRACK_MIX_LENGTH:
|
||||
return None
|
||||
|
||||
# try downloading artist image
|
||||
mix_image = {"image": _artist.artist.image, "color": _artist.artist.color}
|
||||
image = self.download_artist_image(_artist.artist)
|
||||
|
||||
if image:
|
||||
mix_image["image"] = image
|
||||
|
||||
mix = Mix(
|
||||
# the a prefix indicates that this is an artist mix
|
||||
id=f"a{userid}{artist['artisthash']}",
|
||||
title=artist["artist"] + " Radio",
|
||||
description=self.get_mix_description(mix_tracks, artist["artisthash"]),
|
||||
tracks=[t.trackhash for t in mix_tracks],
|
||||
sourcehash=sourcehash,
|
||||
userid=userid,
|
||||
extra={
|
||||
"type": "artist",
|
||||
"artisthash": artist["artisthash"],
|
||||
"sourcetracks": trackhashes,
|
||||
"image": mix_image,
|
||||
# NOTE: Save the similar albums and artists
|
||||
# Related to the source tracks that were used to create the mix
|
||||
# Will be useful when generating other homepage entries
|
||||
"albums": albums,
|
||||
"artists": artists,
|
||||
},
|
||||
timestamp=int(time.time()),
|
||||
)
|
||||
|
||||
MixTable.insert_one(mix)
|
||||
return mix
|
||||
|
||||
def download_artist_image(self, artist: Artist):
|
||||
try:
|
||||
res = requests.get(
|
||||
f"{self.server}/mix/image?artist={quote(artist.name)}&type=Artist"
|
||||
)
|
||||
except requests.exceptions.ConnectionError:
|
||||
return None
|
||||
|
||||
if res.status_code == 200:
|
||||
filename = f"{artist.artisthash}_{int(time.time())}.webp"
|
||||
path = Paths.get_md_mixes_img_path() + "/" + filename
|
||||
|
||||
image = Image.open(BytesIO(res.content))
|
||||
aspect_ratio = image.width / image.height
|
||||
|
||||
# resize to 512px
|
||||
md_width = 512
|
||||
md_height = int(md_width / aspect_ratio)
|
||||
|
||||
image = image.resize((md_width, md_height), Image.LANCZOS)
|
||||
image.save(path, "webp")
|
||||
|
||||
# resize to 256px
|
||||
sm_width = 256
|
||||
sm_height = int(sm_width / aspect_ratio)
|
||||
|
||||
image = image.resize((sm_width, sm_height), Image.LANCZOS)
|
||||
small_path = Paths.get_sm_mixes_img_path() + "/" + filename
|
||||
image.save(small_path, "webp")
|
||||
|
||||
return filename
|
||||
|
||||
return None
|
||||
|
||||
def fallback_create_artist_mix(
|
||||
self,
|
||||
# artist: dict[str, str],
|
||||
similar_albums: list[str],
|
||||
similar_artists: list[str],
|
||||
omit_trackhashes: set[str],
|
||||
limit: int = 99,
|
||||
):
|
||||
"""
|
||||
Creates an artist mix by selecting random tracks from similar albums and artists.
|
||||
|
||||
This is used when:
|
||||
- The Swing Music recommendation server is down.
|
||||
- The artist has less than self.MIN_TRACK_MIX_LENGTH tracks from the cloud mix.
|
||||
- When we need to dilute the mix to balance the artist distribution.
|
||||
|
||||
:param similar_albums: A list of similar album weakhashes to select tracks from.
|
||||
:param similar_artists: A list of similar artist hashes to select tracks from.
|
||||
:param omit_trackhashes: A set of trackhashes to omit from the new tracklist.
|
||||
:param limit: The maximum number of tracks to select.
|
||||
"""
|
||||
|
||||
mixtracks = []
|
||||
albummatches = (
|
||||
a
|
||||
for a in AlbumStore.albummap.values()
|
||||
if a.album.weakhash in similar_albums
|
||||
)
|
||||
|
||||
for match in albummatches:
|
||||
if len(mixtracks) >= limit:
|
||||
return mixtracks
|
||||
|
||||
albumtracks = [
|
||||
t
|
||||
for t in TrackStore.get_tracks_by_trackhashes(match.trackhashes)
|
||||
if t.weakhash not in omit_trackhashes
|
||||
]
|
||||
|
||||
if len(albumtracks) == 0:
|
||||
continue
|
||||
|
||||
sample = random.sample(albumtracks, k=1)
|
||||
mixtracks.extend(sample)
|
||||
|
||||
artistmatches = (
|
||||
a
|
||||
for a in ArtistStore.artistmap.values()
|
||||
if a.artist.artisthash in similar_artists
|
||||
)
|
||||
|
||||
for match in artistmatches:
|
||||
if len(mixtracks) >= limit:
|
||||
return mixtracks
|
||||
|
||||
artisttracks = [
|
||||
t
|
||||
for t in TrackStore.get_tracks_by_trackhashes(match.trackhashes)
|
||||
if t.weakhash not in omit_trackhashes
|
||||
]
|
||||
|
||||
if len(artisttracks) == 0:
|
||||
continue
|
||||
|
||||
sample = random.sample(artisttracks, k=1)
|
||||
mixtracks.extend(sample)
|
||||
|
||||
return mixtracks
|
||||
|
||||
def get_mix_from_lastfm_data(self, artisthash: str, limit: int):
|
||||
"""
|
||||
Creates a mix from the locally available lastfm similar artists data.
|
||||
|
||||
The resulting mix is definitely expected to be of low quality.
|
||||
|
||||
TODO: Maybe implement this!
|
||||
"""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def get_track_mix(cls, mix: Mix):
|
||||
"""
|
||||
Given a mix, returns the excess tracks as a custom mix.
|
||||
"""
|
||||
|
||||
# INFO: If the mix can't have more than 20 tracks, return None
|
||||
if len(mix.tracks) <= cls.MIX_TRACKS_LENGTH + 20:
|
||||
return None
|
||||
|
||||
og_track = TrackStore.trackhashmap.get(mix.tracks[0])
|
||||
|
||||
if not og_track:
|
||||
return None
|
||||
|
||||
og_track = og_track.get_best()
|
||||
tracks = [og_track] + TrackStore.get_tracks_by_trackhashes(
|
||||
mix.tracks[cls.MIX_TRACKS_LENGTH :]
|
||||
)
|
||||
|
||||
trackmix = Mix(
|
||||
id=f"t{mix.userid}{mix.extra['artisthash']}",
|
||||
title=og_track.title,
|
||||
description=cls.get_mix_description(tracks, mix.extra["artisthash"]),
|
||||
tracks=[t.trackhash for t in tracks],
|
||||
sourcehash=create_hash(*[t.trackhash for t in tracks]),
|
||||
userid=mix.userid,
|
||||
extra={
|
||||
"type": "track",
|
||||
"og_sourcehash": mix.sourcehash,
|
||||
"images": cls.get_custom_mix_images(tracks),
|
||||
"artists": None,
|
||||
"albums": None,
|
||||
},
|
||||
)
|
||||
trackmix.timestamp = mix.timestamp
|
||||
|
||||
# INFO: Write track mix save state
|
||||
if mix.extra.get("trackmix_saved"):
|
||||
trackmix.saved = True
|
||||
|
||||
return trackmix
|
||||
|
||||
@classmethod
|
||||
def get_custom_mix_images(cls, tracks: list[Track]):
|
||||
first_album = tracks[0].albumhash
|
||||
first_img = {
|
||||
"image": first_album + ".webp",
|
||||
"type": "album",
|
||||
"color": AlbumStore.albummap[first_album].album.color,
|
||||
}
|
||||
|
||||
seen = set()
|
||||
images = [first_img]
|
||||
|
||||
for track in tracks[1:]:
|
||||
artisthash = track.artists[0]["artisthash"]
|
||||
|
||||
if artisthash in seen:
|
||||
continue
|
||||
|
||||
artist = ArtistStore.artistmap.get(artisthash)
|
||||
|
||||
if not artist:
|
||||
continue
|
||||
|
||||
seen.add(artisthash)
|
||||
|
||||
image = {
|
||||
"image": artisthash + ".webp",
|
||||
"type": "artist",
|
||||
"color": artist.artist.color,
|
||||
}
|
||||
|
||||
images.append(image)
|
||||
|
||||
if len(images) == 3:
|
||||
break
|
||||
|
||||
return images
|
||||
|
||||
@staticmethod
|
||||
def get_because_items(mixes: list[Mix]):
|
||||
"""
|
||||
Given a list of mixes, returns a list of artists that are similar to the
|
||||
artists in the mixes.
|
||||
"""
|
||||
artists: dict[str, list[dict[str, str | int]]] = {}
|
||||
albums: dict[str, list[dict[str, str | int]]] = {}
|
||||
|
||||
for mix in mixes:
|
||||
mix_artisthash = mix.extra["artisthash"]
|
||||
artists.setdefault(mix_artisthash, [])
|
||||
albums.setdefault(mix_artisthash, [])
|
||||
|
||||
for artisthash in mix.extra["artists"]:
|
||||
artist = ArtistStore.artistmap.get(artisthash)
|
||||
|
||||
if not artist:
|
||||
continue
|
||||
|
||||
artists[mix_artisthash].append(
|
||||
{
|
||||
"type": "artist",
|
||||
"trackcount": artist.artist.trackcount,
|
||||
"hash": artisthash,
|
||||
"help_text": str(artist.artist.trackcount)
|
||||
+ ngettext(" track", " tracks", artist.artist.trackcount),
|
||||
}
|
||||
)
|
||||
|
||||
for albumhash in mix.extra["albums"]:
|
||||
album = AlbumStore.albummap.get(albumhash)
|
||||
|
||||
if not album:
|
||||
continue
|
||||
|
||||
albums[mix_artisthash].append(
|
||||
{
|
||||
"type": "album",
|
||||
"trackcount": album.album.trackcount,
|
||||
"hash": albumhash,
|
||||
"help_text": str(album.album.trackcount)
|
||||
+ ngettext(" track", " tracks", album.album.trackcount),
|
||||
}
|
||||
)
|
||||
|
||||
# INFO: Sort artists by trackcount
|
||||
artists[mix_artisthash] = sorted(
|
||||
artists[mix_artisthash],
|
||||
key=lambda x: x["trackcount"],
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
# INFO: Sort albums by trackcount
|
||||
albums[mix_artisthash] = sorted(
|
||||
albums[mix_artisthash],
|
||||
key=lambda x: x["trackcount"],
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
artisthash = mixes[0].extra["artisthash"]
|
||||
because_you_listened_to_artist = {
|
||||
"title": "Because you listened to "
|
||||
+ ArtistStore.artistmap[artisthash].artist.name,
|
||||
"items": albums[artisthash][:15],
|
||||
}
|
||||
|
||||
# Flatten list of artists and remove duplicates by artisthash
|
||||
all_artists = []
|
||||
seen = set()
|
||||
|
||||
# for artist_list in artists.values():
|
||||
# for artist in artist_list:
|
||||
# if artist["hash"] not in seen:
|
||||
# all_artists.append(artist)
|
||||
# seen.add(artist["hash"])
|
||||
|
||||
artists_you_might_like = {
|
||||
"title": "Artists you might like",
|
||||
"items": artists[artisthash][:15],
|
||||
}
|
||||
|
||||
return because_you_listened_to_artist, artists_you_might_like
|
||||
@@ -38,6 +38,7 @@ def serialize_for_card(album: Album):
|
||||
"extra",
|
||||
"id",
|
||||
"lastplayed",
|
||||
"weakhash",
|
||||
}
|
||||
|
||||
return album_serializer(album, props_to_remove)
|
||||
|
||||
@@ -103,6 +103,26 @@ class Paths:
|
||||
def get_config_file_path(cls):
|
||||
return join(cls.get_app_dir(), "settings.json")
|
||||
|
||||
@classmethod
|
||||
def get_mixes_img_path(cls):
|
||||
return join(cls.get_img_path(), "mixes")
|
||||
|
||||
@classmethod
|
||||
def get_artist_mixes_img_path(cls):
|
||||
return join(cls.get_mixes_img_path(), "artists")
|
||||
|
||||
@classmethod
|
||||
def get_og_mixes_img_path(cls):
|
||||
return join(cls.get_mixes_img_path(), "original")
|
||||
|
||||
@classmethod
|
||||
def get_md_mixes_img_path(cls):
|
||||
return join(cls.get_mixes_img_path(), "medium")
|
||||
|
||||
@classmethod
|
||||
def get_sm_mixes_img_path(cls):
|
||||
return join(cls.get_mixes_img_path(), "small")
|
||||
|
||||
|
||||
# defaults
|
||||
class Defaults:
|
||||
|
||||
@@ -62,6 +62,12 @@ def create_config_dir() -> None:
|
||||
|
||||
playlist_img_path = os.path.join("images", "playlists")
|
||||
|
||||
|
||||
mixes_img_path = settings.Paths.get_mixes_img_path()
|
||||
og_mixes_img_path = settings.Paths.get_og_mixes_img_path()
|
||||
md_mixes_img_path = settings.Paths.get_md_mixes_img_path()
|
||||
sm_mixes_img_path = settings.Paths.get_sm_mixes_img_path()
|
||||
|
||||
dirs = [
|
||||
"", # creates the config folder
|
||||
sm_thumb_path,
|
||||
@@ -73,6 +79,10 @@ def create_config_dir() -> None:
|
||||
md_artist_img_path,
|
||||
small_artist_img_path,
|
||||
large_artist_img_path,
|
||||
mixes_img_path,
|
||||
og_mixes_img_path,
|
||||
md_mixes_img_path,
|
||||
sm_mixes_img_path,
|
||||
]
|
||||
|
||||
for _dir in dirs:
|
||||
|
||||
@@ -1,20 +1,14 @@
|
||||
from itertools import groupby
|
||||
import json
|
||||
from pprint import pprint
|
||||
import random
|
||||
from typing import Iterable
|
||||
|
||||
from app.lib.tagger import create_albums
|
||||
from app.models import Album, Track
|
||||
from app.store.artists import ArtistStore
|
||||
from app.utils import flatten
|
||||
from app.utils.auth import get_current_userid
|
||||
from app.utils.customlist import CustomList
|
||||
from app.utils.remove_duplicates import remove_duplicates
|
||||
|
||||
from ..utils.hashing import create_hash
|
||||
from .tracks import TrackStore
|
||||
from app.utils.progressbar import tqdm
|
||||
|
||||
ALBUM_LOAD_KEY = ""
|
||||
|
||||
|
||||
@@ -162,3 +162,16 @@ class ArtistStore:
|
||||
return TrackStore.get_tracks_by_trackhashes(entry.trackhashes)
|
||||
|
||||
return []
|
||||
|
||||
@classmethod
|
||||
def export(cls):
|
||||
path = "artists.json"
|
||||
|
||||
with open(path, "w") as f:
|
||||
data = [
|
||||
{
|
||||
"name": a.name,
|
||||
}
|
||||
for a in cls.get_flat_list()
|
||||
]
|
||||
json.dump(data, f)
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
from typing import Any
|
||||
|
||||
from app.store.homepageentries import (
|
||||
BecauseYouListenedToArtistHomepageEntry,
|
||||
GenericRecoverableEntry,
|
||||
HomepageEntry,
|
||||
MixHomepageEntry,
|
||||
RecentlyAddedHomepageEntry,
|
||||
RecentlyPlayedHomepageEntry,
|
||||
)
|
||||
from app.utils.auth import get_current_userid
|
||||
|
||||
|
||||
class HomepageStore:
|
||||
"""
|
||||
Stores the homepage items.
|
||||
"""
|
||||
|
||||
entries: dict[str, HomepageEntry] = {
|
||||
"recently_played": RecentlyPlayedHomepageEntry(
|
||||
title="Recently played",
|
||||
),
|
||||
"artist_mixes": MixHomepageEntry(
|
||||
title="Artist mixes for you",
|
||||
description="Based on artists you have been listening to",
|
||||
),
|
||||
"custom_mixes": MixHomepageEntry(
|
||||
title="Mixes for you",
|
||||
description="Because artist mixes alone aren't enough",
|
||||
),
|
||||
"top_streamed_weekly_artists": GenericRecoverableEntry(
|
||||
title="Top artists this week",
|
||||
description="Your most played artists since Monday",
|
||||
),
|
||||
"top_streamed_monthly_artists": GenericRecoverableEntry(
|
||||
title="Top artists this month",
|
||||
description="Your most played artists since the start of the month",
|
||||
),
|
||||
"because_you_listened_to_artist": BecauseYouListenedToArtistHomepageEntry(
|
||||
title="",
|
||||
description="Artists similar to the artist you listened to",
|
||||
),
|
||||
"artists_you_might_like": BecauseYouListenedToArtistHomepageEntry(
|
||||
title="Artists you might like",
|
||||
description="Artists similar to the artists you have listened to",
|
||||
),
|
||||
"recently_added": RecentlyAddedHomepageEntry(
|
||||
title="Recently added",
|
||||
description="New music added to your library",
|
||||
),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def set_mixes(cls, items: list[Any], entrykey: str, userid: int | None = None):
|
||||
idmap = {item.id: item for item in items}
|
||||
cls.entries[entrykey].items[userid or get_current_userid()] = idmap
|
||||
|
||||
@classmethod
|
||||
def get_mix(cls, mixkey: str, mixid: str):
|
||||
mix = cls.entries[mixkey].items.get(get_current_userid(), {}).get(mixid)
|
||||
return mix.to_full_dict() if mix else None
|
||||
|
||||
@classmethod
|
||||
def get_homepage_items(cls, limit: int):
|
||||
# return a dict of entry name to entry items
|
||||
return [
|
||||
{entry: cls.entries[entry].get_items(get_current_userid(), limit)}
|
||||
for entry in cls.entries.keys()
|
||||
if len(cls.entries[entry].items)
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def find_mix(cls, mixid: str):
|
||||
mixentries = ["artist_mixes", "custom_mixes"]
|
||||
|
||||
for entry in mixentries:
|
||||
mix = cls.entries[entry].items.get(get_current_userid(), {}).get(mixid)
|
||||
if mix:
|
||||
return mix
|
||||
|
||||
return None
|
||||
@@ -0,0 +1,118 @@
|
||||
from abc import ABC
|
||||
from typing import Any
|
||||
|
||||
from app.lib.home.recover_items import recover_items
|
||||
from app.models.mix import Mix
|
||||
|
||||
class HomepageEntry(ABC):
|
||||
"""
|
||||
Base class for all homepage entries.
|
||||
|
||||
items is a dict of userid to a dict of stuff.
|
||||
"""
|
||||
|
||||
title: str
|
||||
description: str
|
||||
items: dict[int, Any]
|
||||
|
||||
def __init__(self, title: str, description: str):
|
||||
self.title = title
|
||||
self.description = description
|
||||
|
||||
def get_items(self, userid: int, limit: int | None = None):
|
||||
"""
|
||||
Return usable items for the homepage.
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
class MixHomepageEntry(HomepageEntry):
|
||||
"""
|
||||
A homepage entry for mixes.
|
||||
self.items is a dict of userid to a dict of mixid to mix.
|
||||
"""
|
||||
|
||||
items: dict[int, dict[str, Mix]]
|
||||
|
||||
def __init__(self, title: str, description: str):
|
||||
super().__init__(title, description)
|
||||
self.items = {}
|
||||
|
||||
def get_items(self, userid: int, limit: int | None = None):
|
||||
items = []
|
||||
|
||||
for mix in self.items.get(userid, {}).values():
|
||||
if limit and len(items) >= limit:
|
||||
break
|
||||
|
||||
items.append(
|
||||
{
|
||||
"type": "mix",
|
||||
"item": mix.to_dict(),
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
"title": self.title,
|
||||
"description": self.description,
|
||||
"items": items,
|
||||
}
|
||||
|
||||
|
||||
class RecentlyPlayedHomepageEntry(HomepageEntry):
|
||||
"""
|
||||
A homepage entry for recently played.
|
||||
"""
|
||||
|
||||
items: dict[int, list[dict[str, Any]]]
|
||||
|
||||
def __init__(self, title: str, description: str = ""):
|
||||
super().__init__(title, description)
|
||||
self.items = {}
|
||||
|
||||
def get_items(self, userid: int, limit: int | None = None):
|
||||
items = self.items.get(userid, [])[:limit]
|
||||
|
||||
return {
|
||||
"title": self.title,
|
||||
"description": self.description,
|
||||
"items": recover_items(items),
|
||||
}
|
||||
|
||||
|
||||
class RecentlyAddedHomepageEntry(RecentlyPlayedHomepageEntry):
|
||||
"""
|
||||
A homepage entry for recently added.
|
||||
"""
|
||||
|
||||
def get_items(self, userid: int, limit: int | None = None):
|
||||
return super().get_items(0, limit)
|
||||
|
||||
|
||||
class GenericRecoverableEntry(RecentlyPlayedHomepageEntry):
|
||||
"""
|
||||
A homepage entry for top streamed.
|
||||
"""
|
||||
|
||||
# NOTE: This extends RecentlyPlayedHomepageEntry because
|
||||
# the shape of the data is the same.
|
||||
pass
|
||||
|
||||
|
||||
class BecauseYouListenedToArtistHomepageEntry(RecentlyPlayedHomepageEntry):
|
||||
"""
|
||||
A homepage entry for because you listened to artist.
|
||||
"""
|
||||
|
||||
# SHAPE: {userid: {title: str, items: list[RecoverableItem]}}
|
||||
items: dict[int, dict[str, Any]]
|
||||
|
||||
def get_items(self, userid: int, limit: int | None = None):
|
||||
title = self.items.get(userid, {}).get("title")
|
||||
items = self.items.get(userid, {}).get("items", [])[:limit]
|
||||
|
||||
return {
|
||||
"title": title,
|
||||
"items": recover_items(items),
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
# from tqdm import tqdm
|
||||
|
||||
import itertools
|
||||
import json
|
||||
from typing import Callable, Iterable
|
||||
from app.db.libdata import TrackTable
|
||||
|
||||
@@ -313,3 +314,18 @@ class TrackStore:
|
||||
def get_recently_played(cls, limit: int):
|
||||
tracks = cls.get_flat_list()
|
||||
return sorted(tracks, key=lambda x: x.lastplayed, reverse=True)[:limit]
|
||||
|
||||
@classmethod
|
||||
def export(cls):
|
||||
path = "tracks.json"
|
||||
|
||||
with open(path, "w") as f:
|
||||
data = [
|
||||
{
|
||||
"title": t.title,
|
||||
"album": t.album,
|
||||
"artists": [a["name"] for a in t.artists],
|
||||
}
|
||||
for t in cls.get_flat_list()
|
||||
]
|
||||
json.dump(data, f)
|
||||
|
||||
+51
-5
@@ -46,7 +46,7 @@ def date_string_to_time_passed(prev_date: str) -> str:
|
||||
return timestamp_to_time_passed(then)
|
||||
|
||||
|
||||
def seconds_to_time_string(seconds):
|
||||
def seconds_to_time_string(seconds: int):
|
||||
"""
|
||||
Converts seconds to a time string. e.g. 1 hour 2 minutes, 1 hour 2 seconds, 1 hour, 1 minute 2 seconds, etc.
|
||||
"""
|
||||
@@ -66,17 +66,29 @@ def seconds_to_time_string(seconds):
|
||||
return f"{remaining_seconds} sec"
|
||||
|
||||
|
||||
def get_date_range(duration: str):
|
||||
def get_date_range(duration: str, units_ago: int = 0):
|
||||
"""
|
||||
Returns a tuple of dates representing the start and end of a given duration.
|
||||
"""
|
||||
date_range = None
|
||||
seconds_ago = 0
|
||||
|
||||
if duration != "alltime":
|
||||
seconds_ago = (
|
||||
pendulum.now() - pendulum.now().subtract().start_of(duration)
|
||||
).total_seconds() * units_ago
|
||||
print("seconds_ago", duration, str(seconds_ago))
|
||||
|
||||
match duration:
|
||||
case "week" | "month" | "year":
|
||||
case "day" | "week" | "month" | "year":
|
||||
date_range = (
|
||||
pendulum.now().subtract().start_of(duration).timestamp(),
|
||||
pendulum.now().end_of(duration).timestamp(),
|
||||
pendulum.now()
|
||||
.subtract(seconds=seconds_ago)
|
||||
.start_of(duration)
|
||||
.timestamp(),
|
||||
pendulum.now()
|
||||
# .end_of(duration)
|
||||
.timestamp(),
|
||||
)
|
||||
case "alltime":
|
||||
date_range = (0, pendulum.now().timestamp())
|
||||
@@ -86,6 +98,40 @@ def get_date_range(duration: str):
|
||||
return (int(date_range[0]), int(date_range[1]))
|
||||
|
||||
|
||||
def get_duration_ago(duration: str, units_ago: int = 1) -> int:
|
||||
"""
|
||||
Returns the start of the last duration.
|
||||
"""
|
||||
seconds_in_day = 24 * 60 * 60
|
||||
now = pendulum.now()
|
||||
|
||||
match duration:
|
||||
case "day":
|
||||
return int(
|
||||
now.subtract(seconds=seconds_in_day * units_ago).timestamp()
|
||||
)
|
||||
case "week":
|
||||
return int(
|
||||
now
|
||||
.subtract(seconds=seconds_in_day * 7 * units_ago)
|
||||
.timestamp()
|
||||
)
|
||||
case "month":
|
||||
return int(
|
||||
now
|
||||
.subtract(seconds=seconds_in_day * 30 * units_ago)
|
||||
.timestamp()
|
||||
)
|
||||
case "year":
|
||||
return int(
|
||||
now
|
||||
.subtract(seconds=seconds_in_day * 365 * units_ago)
|
||||
.timestamp()
|
||||
)
|
||||
case _:
|
||||
raise ValueError(f"Invalid duration: {duration}")
|
||||
|
||||
|
||||
def get_duration_in_seconds(duration: str) -> int:
|
||||
"""
|
||||
Returns the number of seconds in a given duration.
|
||||
|
||||
@@ -0,0 +1,113 @@
|
||||
from app.models.track import Track
|
||||
from typing import List, Dict, Tuple
|
||||
from collections import Counter
|
||||
|
||||
|
||||
def violates_gap_rule(
|
||||
balanced_mix: Dict[int, Track], position: int, track: Track, gap: int = 3
|
||||
) -> bool:
|
||||
"""
|
||||
Check if placing the track at the given position violates the gap rule.
|
||||
|
||||
The gap rule is violated if the track has an artist in common with any
|
||||
track within the gap range (default = 3).
|
||||
"""
|
||||
track_artists = set(artist["artisthash"] for artist in track.artists)
|
||||
|
||||
for i in range(max(0, position - gap), position):
|
||||
if i in balanced_mix:
|
||||
existing_artists = set(
|
||||
artist["artisthash"] for artist in balanced_mix[i].artists
|
||||
)
|
||||
if track_artists.intersection(existing_artists):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def find_next_position(
|
||||
balanced_mix: Dict[int, Track], start: int, track: Track, total_tracks: int
|
||||
) -> int:
|
||||
"""
|
||||
Find the next available position for the track, starting from 'start' and wrapping around.
|
||||
"""
|
||||
for i in range(start, total_tracks):
|
||||
if i not in balanced_mix and not violates_gap_rule(balanced_mix, i, track):
|
||||
return i
|
||||
for i in range(start):
|
||||
if i not in balanced_mix and not violates_gap_rule(balanced_mix, i, track):
|
||||
return i
|
||||
return start # If no better position is found, return the original position
|
||||
|
||||
|
||||
def is_tracklist_balanced(tracks: List[Track], gap: int = 3) -> Tuple[bool, bool]:
|
||||
"""
|
||||
Checks if a tracklist is balanced or can be balanced.
|
||||
|
||||
Args:
|
||||
- tracks: List of Track objects
|
||||
- gap: Minimum number of tracks between songs by the same artist (default 3)
|
||||
|
||||
Returns:
|
||||
- A tuple (can_be_balanced, is_currently_balanced)
|
||||
"""
|
||||
total_tracks = len(tracks)
|
||||
|
||||
# Count tracks per artist (considering only the first artist)
|
||||
artist_counts = Counter(track.artists[0]["artisthash"] for track in tracks)
|
||||
|
||||
# Calculate the maximum number of tracks an artist can have in a balanced list
|
||||
max_tracks_per_artist = (total_tracks + gap) // (gap + 1)
|
||||
|
||||
# Check if it's mathematically possible to balance the tracklist
|
||||
can_be_balanced = all(
|
||||
count <= max_tracks_per_artist for count in artist_counts.values()
|
||||
)
|
||||
|
||||
if not can_be_balanced:
|
||||
return False, False
|
||||
|
||||
# Check if the current arrangement is balanced
|
||||
is_currently_balanced = True
|
||||
artist_last_positions = {}
|
||||
|
||||
for i, track in enumerate(tracks):
|
||||
artist = track.artists[0]["artisthash"]
|
||||
if artist in artist_last_positions:
|
||||
if i - artist_last_positions[artist] <= gap:
|
||||
is_currently_balanced = False
|
||||
break
|
||||
artist_last_positions[artist] = i
|
||||
|
||||
return can_be_balanced, is_currently_balanced
|
||||
|
||||
|
||||
def balance_mix(tracks: List[Track]) -> List[Track]:
|
||||
"""
|
||||
Balances the mix by ensuring that the tracks in a mix are distributed evenly.
|
||||
Preserves the overall rating order of tracks while minimizing disruption.
|
||||
|
||||
Tracks that need to be moved are moved down the tracklist until they no longer
|
||||
violate the gap rule.
|
||||
"""
|
||||
can_be_balanced, is_balanced = is_tracklist_balanced(tracks)
|
||||
|
||||
if not can_be_balanced:
|
||||
print("Warning: This tracklist cannot be perfectly balanced.")
|
||||
# Proceed with best-effort balancing
|
||||
|
||||
if is_balanced:
|
||||
return tracks # Already balanced, no need to modify
|
||||
|
||||
balanced_mix: Dict[int, Track] = {}
|
||||
total_tracks = len(tracks)
|
||||
|
||||
for i, track in enumerate(tracks):
|
||||
if i in balanced_mix or not violates_gap_rule(balanced_mix, i, track):
|
||||
balanced_mix[i] = track
|
||||
else:
|
||||
new_position = find_next_position(balanced_mix, i, track, total_tracks)
|
||||
balanced_mix[new_position] = track
|
||||
|
||||
# Convert the dictionary back to a list, preserving the new order
|
||||
return [balanced_mix[i] for i in sorted(balanced_mix.keys())]
|
||||
+24
-10
@@ -11,28 +11,40 @@ from app.store.tracks import TrackStore
|
||||
from app.utils.dates import seconds_to_time_string
|
||||
|
||||
|
||||
def get_artists_in_period(start_time: int, end_time: int):
|
||||
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time)
|
||||
artists = defaultdict(lambda: {"playcount": 0, "playduration": 0})
|
||||
def get_artists_in_period(
|
||||
start_time: int | float, end_time: int | float, userid: int | None = None
|
||||
):
|
||||
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time, userid)
|
||||
artists: Any = defaultdict(
|
||||
lambda: {"playcount": 0, "playduration": 0, "tracks": {}}
|
||||
)
|
||||
|
||||
for scrobble in scrobbles:
|
||||
track = TrackStore.get_tracks_by_trackhashes([scrobble.trackhash])
|
||||
if not track:
|
||||
continue
|
||||
|
||||
track = track[0]
|
||||
|
||||
for artist in track.artists:
|
||||
artisthash = artist["artisthash"]
|
||||
|
||||
artists[artisthash]["artist"] = artist["name"]
|
||||
artists[artisthash]["artisthash"] = artist["artisthash"]
|
||||
artists[artisthash]["playcount"] += 1
|
||||
artists[artisthash]["playduration"] += scrobble.duration
|
||||
|
||||
return list(artists.values())
|
||||
# index the track counts too
|
||||
artists[artisthash]["tracks"][track.trackhash] = (
|
||||
artists[artisthash]["tracks"].get(track.trackhash, 0) + 1
|
||||
)
|
||||
|
||||
artists = list(artists.values())
|
||||
return sorted(artists, key=lambda x: x["playduration"], reverse=True)
|
||||
|
||||
|
||||
def get_albums_in_period(start_time: int, end_time: int):
|
||||
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time)
|
||||
def get_albums_in_period(start_time: int, end_time: int, userid: int | None = None):
|
||||
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time, userid)
|
||||
albums: dict[str, Album] = {}
|
||||
|
||||
for scrobble in scrobbles:
|
||||
@@ -58,8 +70,8 @@ def get_albums_in_period(start_time: int, end_time: int):
|
||||
return list(albums.values())
|
||||
|
||||
|
||||
def get_tracks_in_period(start_time: int, end_time: int):
|
||||
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time)
|
||||
def get_tracks_in_period(start_time: int, end_time: int, userid: int | None = None):
|
||||
scrobbles = ScrobbleTable.get_all_in_period(start_time, end_time, userid)
|
||||
tracks: dict[str, Track] = {}
|
||||
duration = 0
|
||||
|
||||
@@ -158,12 +170,14 @@ def calculate_scrobble_trend(current_scrobbles: int, previous_scrobbles: int) ->
|
||||
)
|
||||
|
||||
|
||||
def calculate_new_artists(current_artists: List[dict[str, Any]], timestamp: int):
|
||||
def calculate_new_artists(
|
||||
current_artists: List[dict[str, Any]], timestamp: int, userid: int | None = None
|
||||
):
|
||||
"""
|
||||
Calculate the number of new artists based on the current and all previous scrobbles.
|
||||
"""
|
||||
current_artists_set = set(artist["artisthash"] for artist in current_artists)
|
||||
all_records = ScrobbleTable.get_all_in_period(0, timestamp)
|
||||
all_records = ScrobbleTable.get_all_in_period(0, timestamp, userid)
|
||||
trackhashes = set(record.trackhash for record in all_records)
|
||||
|
||||
previous_artists_set = set()
|
||||
|
||||
@@ -21,11 +21,14 @@ import setproctitle
|
||||
|
||||
from app.api import create_api
|
||||
from app.arg_handler import ProcessArgs
|
||||
from app.crons import start_cron_jobs
|
||||
from app.lib.index import IndexEverything
|
||||
from app.plugins.register import register_plugins
|
||||
from app.settings import FLASKVARS, TCOLOR, Info
|
||||
from app.setup import load_into_mem, run_setup
|
||||
from app.start_info_logger import log_startup_info
|
||||
from app.store.artists import ArtistStore
|
||||
from app.store.tracks import TrackStore
|
||||
from app.utils.filesystem import get_home_res_path
|
||||
from app.utils.paths import getClientFilesExtensions
|
||||
from app.utils.threading import background
|
||||
@@ -59,7 +62,7 @@ mimetypes.add_type("application/manifest+json", ".webmanifest")
|
||||
|
||||
|
||||
# Background tasks
|
||||
@background
|
||||
# @background
|
||||
def bg_run_setup():
|
||||
IndexEverything()
|
||||
|
||||
@@ -72,20 +75,19 @@ def bg_run_setup():
|
||||
@background
|
||||
def run_swingmusic():
|
||||
log_startup_info()
|
||||
bg_run_setup()
|
||||
register_plugins()
|
||||
|
||||
# start_watchdog()
|
||||
|
||||
setproctitle.setproctitle(f"swingmusic ::{FLASKVARS.get_flask_port()}")
|
||||
# bg_run_setup()
|
||||
start_cron_jobs()
|
||||
|
||||
|
||||
# Setup function calls
|
||||
Info.load()
|
||||
ProcessArgs()
|
||||
run_setup()
|
||||
load_into_mem()
|
||||
run_swingmusic()
|
||||
|
||||
|
||||
# Create the Flask app
|
||||
@@ -102,7 +104,7 @@ whitelisted_routes = {
|
||||
"/auth/refresh",
|
||||
"/docs",
|
||||
}
|
||||
blacklist_extensions = {".webp"}.union(getClientFilesExtensions())
|
||||
blacklist_extensions = {".webp", ".jpg"}.union(getClientFilesExtensions())
|
||||
|
||||
|
||||
def skipAuthAction():
|
||||
@@ -224,6 +226,12 @@ def print_memory_usage(response: Response):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
load_into_mem()
|
||||
run_swingmusic()
|
||||
TrackStore.export()
|
||||
ArtistStore.export()
|
||||
|
||||
host = FLASKVARS.get_flask_host()
|
||||
port = FLASKVARS.get_flask_port()
|
||||
|
||||
@@ -231,7 +239,7 @@ if __name__ == "__main__":
|
||||
app,
|
||||
host=host,
|
||||
port=port,
|
||||
threads=10,
|
||||
threads=100,
|
||||
ipv6=True,
|
||||
ipv4=True,
|
||||
)
|
||||
|
||||
Generated
+15
-1
@@ -2052,6 +2052,20 @@ files = [
|
||||
{file = "roundrobin-0.0.4.tar.gz", hash = "sha256:7e9d19a5bd6123d99993fb935fa86d25c88bb2096e493885f61737ed0f5e9abd"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schedule"
|
||||
version = "1.2.2"
|
||||
description = "Job scheduling for humans."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "schedule-1.2.2-py3-none-any.whl", hash = "sha256:5bef4a2a0183abf44046ae0d164cadcac21b1db011bdd8102e4a0c1e91e06a7d"},
|
||||
{file = "schedule-1.2.2.tar.gz", hash = "sha256:15fe9c75fe5fd9b9627f3f19cc0ef1420508f9f9a46f45cd0769ef75ede5f0b7"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
timezone = ["pytz"]
|
||||
|
||||
[[package]]
|
||||
name = "setproctitle"
|
||||
version = "1.3.3"
|
||||
@@ -2761,4 +2775,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"]
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = ">=3.10,<3.12"
|
||||
content-hash = "43972b6ffadd14e5047f067a0258f2428ebe351df8bd032dc0bf05df379678a6"
|
||||
content-hash = "85f8932739522e7b53b4fe5bbecc3c10a30bb690e25bf9404209c57ec71e88d3"
|
||||
|
||||
@@ -32,6 +32,7 @@ memory-profiler = "^0.61.0"
|
||||
sortedcontainers = "^2.4.0"
|
||||
xxhash = "^3.4.1"
|
||||
ffmpeg-python = "^0.2.0"
|
||||
schedule = "^1.2.2"
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
pylint = "^2.15.5"
|
||||
|
||||
Reference in New Issue
Block a user