mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
fix: recently added
This commit is contained in:
+17
-4
@@ -30,14 +30,14 @@ def create_all():
|
|||||||
|
|
||||||
class Base(MasterBase, DeclarativeBase):
|
class Base(MasterBase, DeclarativeBase):
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_all_hashes(cls):
|
def get_all_hashes(cls, create_date: int | None = None):
|
||||||
with DbManager() as conn:
|
with DbManager() as conn:
|
||||||
if cls.__tablename__ == "track":
|
if cls.__tablename__ == "track":
|
||||||
stmt = select(TrackTable.trackhash)
|
stmt = select(TrackTable.trackhash).where(cls.last_mod < create_date)
|
||||||
elif cls.__tablename__ == "album":
|
elif cls.__tablename__ == "album":
|
||||||
stmt = select(AlbumTable.albumhash)
|
stmt = select(AlbumTable.albumhash).where(cls.created_date < create_date)
|
||||||
elif cls.__tablename__ == "artist":
|
elif cls.__tablename__ == "artist":
|
||||||
stmt = select(ArtistTable.artisthash)
|
stmt = select(ArtistTable.artisthash).where(cls.created_date < create_date)
|
||||||
|
|
||||||
result = conn.execute(stmt)
|
result = conn.execute(stmt)
|
||||||
return {row[0] for row in result.fetchall()}
|
return {row[0] for row in result.fetchall()}
|
||||||
@@ -194,6 +194,18 @@ class TrackTable(Base):
|
|||||||
)
|
)
|
||||||
return tracks_to_dataclasses(result.fetchall())
|
return tracks_to_dataclasses(result.fetchall())
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_recently_added(cls, start: int, limit: int):
|
||||||
|
with DbManager() as conn:
|
||||||
|
result = conn.execute(
|
||||||
|
select(TrackTable)
|
||||||
|
.order_by(TrackTable.last_mod.desc())
|
||||||
|
.offset(start)
|
||||||
|
.limit(limit)
|
||||||
|
)
|
||||||
|
|
||||||
|
return tracks_to_dataclasses(result.fetchall())
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def remove_tracks_by_filepaths(cls, filepaths: set[str]):
|
def remove_tracks_by_filepaths(cls, filepaths: set[str]):
|
||||||
with DbManager(commit=True) as conn:
|
with DbManager(commit=True) as conn:
|
||||||
@@ -238,6 +250,7 @@ class AlbumTable(Base):
|
|||||||
all = result.fetchall()
|
all = result.fetchall()
|
||||||
return albums_to_dataclasses(all)
|
return albums_to_dataclasses(all)
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_album_by_albumhash(cls, hash: str):
|
def get_album_by_albumhash(cls, hash: str):
|
||||||
with DbManager() as conn:
|
with DbManager() as conn:
|
||||||
|
|||||||
@@ -1,11 +1,15 @@
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from pprint import pprint
|
||||||
|
from time import time
|
||||||
|
|
||||||
|
from app.db.libdata import AlbumTable, ArtistTable, TrackTable
|
||||||
from app.lib.playlistlib import get_first_4_images
|
from app.lib.playlistlib import get_first_4_images
|
||||||
from app.models.playlist import Playlist
|
from app.models.playlist import Playlist
|
||||||
from app.models.track import Track
|
from app.models.track import Track
|
||||||
from app.store.tracks import TrackStore
|
|
||||||
from app.store.albums import AlbumStore
|
# from app.store.tracks import TrackStore
|
||||||
from app.store.artists import ArtistStore
|
# from app.store.albums import AlbumStore
|
||||||
|
# from app.store.artists import ArtistStore
|
||||||
|
|
||||||
from app.serializers.track import serialize_track
|
from app.serializers.track import serialize_track
|
||||||
from app.serializers.album import album_serializer
|
from app.serializers.album import album_serializer
|
||||||
@@ -13,7 +17,12 @@ from app.serializers.artist import serialize_for_card
|
|||||||
|
|
||||||
from itertools import groupby
|
from itertools import groupby
|
||||||
|
|
||||||
from app.utils.dates import create_new_date, date_string_to_time_passed, timestamp_to_time_passed
|
from app.utils import flatten
|
||||||
|
from app.utils.dates import (
|
||||||
|
create_new_date,
|
||||||
|
date_string_to_time_passed,
|
||||||
|
timestamp_to_time_passed,
|
||||||
|
)
|
||||||
|
|
||||||
older_albums = set()
|
older_albums = set()
|
||||||
older_artists = set()
|
older_artists = set()
|
||||||
@@ -36,7 +45,7 @@ def check_is_album_folder(tracks: list[Track]):
|
|||||||
|
|
||||||
def check_is_artist_folder(tracks: list[Track]):
|
def check_is_artist_folder(tracks: list[Track]):
|
||||||
# INFO: flatten artist hashes using "-" as a separator
|
# INFO: flatten artist hashes using "-" as a separator
|
||||||
artisthashes = "-".join(t.artist_hashes for t in tracks).split("-")
|
artisthashes = flatten([t.artisthashes for t in tracks])
|
||||||
return calc_based_on_percent(artisthashes, len(tracks))
|
return calc_based_on_percent(artisthashes, len(tracks))
|
||||||
|
|
||||||
|
|
||||||
@@ -48,27 +57,22 @@ def check_is_track_folder(tracks: list[Track]):
|
|||||||
return [create_track(t) for t in tracks]
|
return [create_track(t) for t in tracks]
|
||||||
|
|
||||||
|
|
||||||
def check_is_new_artist(artisthash: str, timestamp: int):
|
# def check_is_new_artist(hashes: set[str], artisthash: str, timestamp: int):
|
||||||
"""
|
# """
|
||||||
Checks if an artist already exists in the library.
|
# Checks if an artist already exists in the library.
|
||||||
"""
|
# """
|
||||||
tracks = filter(
|
# return artisthash not in hashes
|
||||||
lambda t: t.last_mod < timestamp and artisthash in t.artist_hashes,
|
|
||||||
TrackStore.tracks,
|
|
||||||
)
|
|
||||||
|
|
||||||
return next(tracks, None) is None
|
|
||||||
|
|
||||||
|
|
||||||
def check_is_new_album(albumhash: str, timestamp: int):
|
# def check_is_new_album(albumhash: str, timestamp: int):
|
||||||
"""
|
# """
|
||||||
Checks if an album already exists in the library.
|
# Checks if an album already exists in the library.
|
||||||
"""
|
# """
|
||||||
tracks = filter(
|
# tracks = filter(
|
||||||
lambda t: t.last_mod < timestamp and t.albumhash == albumhash, TrackStore.tracks
|
# lambda t: t.last_mod < timestamp and t.albumhash == albumhash, TrackStore.tracks
|
||||||
)
|
# )
|
||||||
|
|
||||||
return next(tracks, None) is None
|
# return next(tracks, None) is None
|
||||||
|
|
||||||
|
|
||||||
def create_track(t: Track):
|
def create_track(t: Track):
|
||||||
@@ -88,13 +92,19 @@ def create_track(t: Track):
|
|||||||
group_type = dict[str, list[Track], float]
|
group_type = dict[str, list[Track], float]
|
||||||
|
|
||||||
|
|
||||||
def check_folder_type(group_: group_type) -> str:
|
def check_folder_type(group_: group_type):
|
||||||
# check if all tracks in group have the same albumhash
|
# check if all tracks in group have the same albumhash
|
||||||
# if so, return "album"
|
# if so, return "album"
|
||||||
key = group_["folder"]
|
key = group_["folder"]
|
||||||
tracks = group_["tracks"]
|
tracks = group_["tracks"]
|
||||||
time = group_["time"]
|
time = group_["time"]
|
||||||
|
|
||||||
|
print(f"Checking folder: {key}")
|
||||||
|
print(f"Tracks: {len(tracks)}")
|
||||||
|
|
||||||
|
existing_artist_hashes: set[str] = set(ArtistTable.get_all_hashes(time))
|
||||||
|
existing_album_hashes: set[str] = set(AlbumTable.get_all_hashes(time))
|
||||||
|
|
||||||
if len(tracks) == 1:
|
if len(tracks) == 1:
|
||||||
entry = create_track(tracks[0])
|
entry = create_track(tracks[0])
|
||||||
entry["item"]["time"] = timestamp_to_time_passed(time)
|
entry["item"]["time"] = timestamp_to_time_passed(time)
|
||||||
@@ -102,7 +112,7 @@ def check_folder_type(group_: group_type) -> str:
|
|||||||
|
|
||||||
is_album, albumhash, _ = check_is_album_folder(tracks)
|
is_album, albumhash, _ = check_is_album_folder(tracks)
|
||||||
if is_album:
|
if is_album:
|
||||||
album = AlbumStore.get_album_by_hash(albumhash)
|
album = AlbumTable.get_album_by_albumhash(albumhash)
|
||||||
|
|
||||||
if album is None:
|
if album is None:
|
||||||
return None
|
return None
|
||||||
@@ -120,7 +130,7 @@ def check_folder_type(group_: group_type) -> str:
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
album["help_text"] = (
|
album["help_text"] = (
|
||||||
"NEW ALBUM" if check_is_new_album(albumhash, time) else "NEW TRACKS"
|
"NEW ALBUM" if albumhash in existing_album_hashes else "NEW TRACKS"
|
||||||
)
|
)
|
||||||
album["time"] = timestamp_to_time_passed(time)
|
album["time"] = timestamp_to_time_passed(time)
|
||||||
|
|
||||||
@@ -131,7 +141,7 @@ def check_folder_type(group_: group_type) -> str:
|
|||||||
|
|
||||||
is_artist, artisthash, trackcount = check_is_artist_folder(tracks)
|
is_artist, artisthash, trackcount = check_is_artist_folder(tracks)
|
||||||
if is_artist:
|
if is_artist:
|
||||||
artist = ArtistStore.get_artist_by_hash(artisthash)
|
artist = ArtistTable.get_artist_by_hash(artisthash)
|
||||||
|
|
||||||
if artist is None:
|
if artist is None:
|
||||||
return None
|
return None
|
||||||
@@ -139,7 +149,7 @@ def check_folder_type(group_: group_type) -> str:
|
|||||||
artist = serialize_for_card(artist)
|
artist = serialize_for_card(artist)
|
||||||
artist["trackcount"] = trackcount
|
artist["trackcount"] = trackcount
|
||||||
artist["help_text"] = (
|
artist["help_text"] = (
|
||||||
"NEW ARTIST" if check_is_new_artist(artisthash, time) else "NEW MUSIC"
|
"NEW ARTIST" if artisthash not in existing_artist_hashes else "NEW MUSIC"
|
||||||
)
|
)
|
||||||
artist["time"] = timestamp_to_time_passed(time)
|
artist["time"] = timestamp_to_time_passed(time)
|
||||||
|
|
||||||
@@ -165,40 +175,62 @@ def check_folder_type(group_: group_type) -> str:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def group_track_by_folders(tracks: Track):
|
def group_track_by_folders(tracks: list[Track], groups: dict[str, list[Track]]):
|
||||||
"""
|
"""
|
||||||
Groups tracks by folder and returns a list of groups sorted by last modified date.
|
Groups tracks by folder and returns a list of groups sorted by last modified date.
|
||||||
|
|
||||||
Uses generator expressions to avoid creating intermediate lists.
|
Uses generator expressions to avoid creating intermediate lists.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# INFO: sort tracks by folder name, then group by folder name
|
# INFO: sort tracks by folder name, then group by folder name
|
||||||
tracks = sorted(tracks, key=lambda t: t.folder)
|
tracks = sorted(tracks, key=lambda t: t.folder)
|
||||||
groups = groupby(tracks, lambda t: t.folder)
|
thisgroup = groupby(tracks, lambda t: t.folder)
|
||||||
|
|
||||||
# INFO: sort tracks by last modified date in descending order to get the most recent last modified date
|
for folder, thistracks in thisgroup:
|
||||||
groups = (
|
groups.setdefault(folder, []).extend(thistracks)
|
||||||
(folder, sorted(tracks, key=lambda t: t.last_mod, reverse=True))
|
|
||||||
for folder, tracks in groups
|
|
||||||
)
|
|
||||||
|
|
||||||
# INFO: Return a generator of the groups
|
return groups
|
||||||
groups = (
|
|
||||||
{"folder": folder, "tracks": list(tracks), "time": tracks[0].last_mod}
|
|
||||||
for folder, tracks in groups
|
|
||||||
)
|
|
||||||
|
|
||||||
# sort groups by last modified date
|
|
||||||
return sorted(groups, key=lambda group: group["time"], reverse=True)
|
|
||||||
|
|
||||||
|
|
||||||
def get_recently_added_items(limit: int = 7):
|
def get_recently_added_items(limit: int = 7):
|
||||||
tracks = sorted(TrackStore.tracks, key=lambda t: t.created_date)
|
# tracks = sorted(TrackStore.tracks, key=lambda t: t.created_date)
|
||||||
groups = group_track_by_folders(tracks)
|
now = time()
|
||||||
|
tracks = get_recently_added_tracks(start=0, limit=None)
|
||||||
|
then = time()
|
||||||
|
|
||||||
|
print(f"Time taken to get tracks: {then - now}")
|
||||||
|
groups = group_track_by_folders(tracks, {})
|
||||||
|
# print(groups)
|
||||||
|
last_trackcount: int = len(tracks)
|
||||||
|
|
||||||
|
# while len(groups.keys()) < limit and last_trackcount > 0:
|
||||||
|
# distracks = get_recently_added_tracks(start=len(tracks), limit=100)
|
||||||
|
# last_trackcount = len(distracks)
|
||||||
|
|
||||||
|
# tracks.extend(distracks)
|
||||||
|
# groups = group_track_by_folders(tracks, groups)
|
||||||
|
|
||||||
|
grouplist = []
|
||||||
|
|
||||||
|
# INFO: sort tracks by last modified date in descending order to get the most recent last modified date
|
||||||
|
for folder, trackgroup in groups.items():
|
||||||
|
trackgroup.sort(key=lambda t: t.last_mod, reverse=True)
|
||||||
|
grouplist.append(
|
||||||
|
{
|
||||||
|
"folder": folder,
|
||||||
|
"len": len(trackgroup),
|
||||||
|
"tracks": trackgroup,
|
||||||
|
"time": trackgroup[0].last_mod,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
pprint(f"😅😅😅😅😅😅😅😅😅😅😅😅😅😅😅😅 {grouplist[0]['len']}")
|
||||||
|
|
||||||
|
# sort groups by last modified date
|
||||||
|
grouplist = sorted(grouplist, key=lambda group: group["time"], reverse=True)
|
||||||
|
|
||||||
recent_items = []
|
recent_items = []
|
||||||
|
|
||||||
for group in groups:
|
for group in grouplist:
|
||||||
item = check_folder_type(group)
|
item = check_folder_type(group)
|
||||||
|
|
||||||
if item not in recent_items:
|
if item not in recent_items:
|
||||||
@@ -217,7 +249,6 @@ def get_recently_added_items(limit: int = 7):
|
|||||||
return recent_items
|
return recent_items
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_recently_added_playlist(limit: int = 100):
|
def get_recently_added_playlist(limit: int = 100):
|
||||||
playlist = Playlist(
|
playlist = Playlist(
|
||||||
id="recentlyadded",
|
id="recentlyadded",
|
||||||
@@ -232,7 +263,7 @@ def get_recently_added_playlist(limit: int = 100):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# Create date to show as last updated
|
# Create date to show as last updated
|
||||||
date = datetime.fromtimestamp(tracks[0].created_date)
|
date = datetime.fromtimestamp(tracks[0].last_mod)
|
||||||
except IndexError:
|
except IndexError:
|
||||||
return playlist, []
|
return playlist, []
|
||||||
|
|
||||||
@@ -244,6 +275,8 @@ def get_recently_added_playlist(limit: int = 100):
|
|||||||
|
|
||||||
return playlist, tracks
|
return playlist, tracks
|
||||||
|
|
||||||
def get_recently_added_tracks(limit: int):
|
|
||||||
tracks = sorted(TrackStore.tracks, key=lambda t: t.created_date, reverse=True)
|
def get_recently_added_tracks(start: int = 0, limit: int = 100):
|
||||||
return tracks[:limit]
|
# tracks = sorted(TrackStore.tracks, key=lambda t: t.created_date, reverse=True)
|
||||||
|
return TrackTable.get_recently_added(start, limit)
|
||||||
|
# return tracks[:limit]
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from app import settings
|
|||||||
class Playlist:
|
class Playlist:
|
||||||
"""Creates playlist objects"""
|
"""Creates playlist objects"""
|
||||||
|
|
||||||
id: int
|
id: int | str
|
||||||
image: str | None
|
image: str | None
|
||||||
last_updated: str
|
last_updated: str
|
||||||
name: str
|
name: str
|
||||||
@@ -21,7 +21,7 @@ class Playlist:
|
|||||||
count: int = 0
|
count: int = 0
|
||||||
duration: int = 0
|
duration: int = 0
|
||||||
has_image: bool = False
|
has_image: bool = False
|
||||||
images: list[str] = dataclasses.field(default_factory=list)
|
images: list[dict[str, str]] = dataclasses.field(default_factory=list)
|
||||||
pinned: bool = False
|
pinned: bool = False
|
||||||
|
|
||||||
def __post_init__(self):
|
def __post_init__(self):
|
||||||
|
|||||||
@@ -1,4 +1,7 @@
|
|||||||
import locale
|
import locale
|
||||||
|
from typing import TypeVar
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
# Set to user's default locale:
|
# Set to user's default locale:
|
||||||
locale.setlocale(locale.LC_ALL, "")
|
locale.setlocale(locale.LC_ALL, "")
|
||||||
@@ -9,3 +12,9 @@ locale.setlocale(locale.LC_ALL, "")
|
|||||||
|
|
||||||
def format_number(number: float) -> str:
|
def format_number(number: float) -> str:
|
||||||
return locale.format_string("%d", number, grouping=True)
|
return locale.format_string("%d", number, grouping=True)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def flatten(list_: list[list[T]]) -> list[T]:
|
||||||
|
return [item for sublist in list_ for item in sublist]
|
||||||
|
|||||||
Reference in New Issue
Block a user