mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
add time ago on recent items
+ move to waitress wsgi server + refactor dates utils + create locustfile for stress test
This commit is contained in:
@@ -11,23 +11,12 @@ from app.serializers.album import album_serializer
|
||||
from app.serializers.artist import serialize_for_card
|
||||
|
||||
from itertools import groupby
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from app.utils.dates import timestamp_from_days_ago, timestamp_to_time_passed
|
||||
|
||||
older_albums = set()
|
||||
older_artists = set()
|
||||
|
||||
group_type = tuple[str, list[Track]]
|
||||
|
||||
|
||||
def timestamp_from_days_ago(days_ago):
|
||||
current_datetime = datetime.now()
|
||||
delta = timedelta(days=days_ago)
|
||||
past_timestamp = current_datetime - delta
|
||||
|
||||
past_timestamp = int(past_timestamp.timestamp())
|
||||
|
||||
return past_timestamp
|
||||
|
||||
|
||||
def calc_based_on_percent(items: list[str], total: int):
|
||||
"""
|
||||
@@ -39,19 +28,28 @@ def calc_based_on_percent(items: list[str], total: int):
|
||||
return most_common_count / total >= 0.7, most_common, most_common_count
|
||||
|
||||
|
||||
def check_is_album_folder(group: group_type):
|
||||
key, group_ = group
|
||||
albumhashes = [t.albumhash for t in group_]
|
||||
return calc_based_on_percent(albumhashes, len(group_))
|
||||
def check_is_album_folder(tracks: list[Track]):
|
||||
albumhashes = [t.albumhash for t in tracks]
|
||||
return calc_based_on_percent(albumhashes, len(tracks))
|
||||
|
||||
|
||||
def check_is_artist_folder(group: group_type):
|
||||
key, group_ = group
|
||||
artisthashes = "-".join(t.artist_hashes for t in group_).split("-")
|
||||
return calc_based_on_percent(artisthashes, len(group_))
|
||||
def check_is_artist_folder(tracks: list[Track]):
|
||||
artisthashes = "-".join(t.artist_hashes for t in tracks).split("-")
|
||||
return calc_based_on_percent(artisthashes, len(tracks))
|
||||
|
||||
|
||||
def check_is_track_folder(tracks: list[Track]):
|
||||
# INFO: is more of a playlist
|
||||
if len(tracks) >= 3:
|
||||
return False
|
||||
|
||||
return [create_track(t) for t in tracks]
|
||||
|
||||
|
||||
def check_is_new_artist(artisthash: str):
|
||||
"""
|
||||
Checks if an artist already exists in the library.
|
||||
"""
|
||||
if artisthash in older_artists:
|
||||
return False
|
||||
|
||||
@@ -59,6 +57,9 @@ def check_is_new_artist(artisthash: str):
|
||||
|
||||
|
||||
def check_is_new_album(albumhash: str):
|
||||
"""
|
||||
Checks if an album already exists in the library.
|
||||
"""
|
||||
if albumhash in older_albums:
|
||||
return False
|
||||
|
||||
@@ -66,6 +67,9 @@ def check_is_new_album(albumhash: str):
|
||||
|
||||
|
||||
def create_track(t: Track):
|
||||
"""
|
||||
Creates a recently added track entry.
|
||||
"""
|
||||
track = serialize_track(t, to_remove={"created_date"})
|
||||
track["help_text"] = "NEW TRACK"
|
||||
|
||||
@@ -75,25 +79,23 @@ def create_track(t: Track):
|
||||
}
|
||||
|
||||
|
||||
def check_is_track_folder(group: group_type):
|
||||
key, group_ = group
|
||||
|
||||
# is more of a playlist
|
||||
if len(group_) >= 3:
|
||||
return False
|
||||
|
||||
return [create_track(t) for t in group_]
|
||||
# INFO: Keys: folder, tracks, time (timestamp)
|
||||
group_type = dict[str, list[Track], float]
|
||||
|
||||
|
||||
def check_folder_type(group_: group_type) -> str:
|
||||
# check if all tracks in group have the same albumhash
|
||||
# if so, return "album"
|
||||
key, tracks = group_
|
||||
key = group_["folder"]
|
||||
tracks = group_["tracks"]
|
||||
time = group_["time"]
|
||||
|
||||
if len(tracks) == 1:
|
||||
return create_track(tracks[0])
|
||||
entry = create_track(tracks[0])
|
||||
entry["item"]["time"] = timestamp_to_time_passed(time)
|
||||
return entry
|
||||
|
||||
is_album, albumhash, _ = check_is_album_folder(group_)
|
||||
is_album, albumhash, _ = check_is_album_folder(tracks)
|
||||
if is_album:
|
||||
album = AlbumStore.get_album_by_hash(albumhash)
|
||||
|
||||
@@ -115,13 +117,14 @@ def check_folder_type(group_: group_type) -> str:
|
||||
album["help_text"] = (
|
||||
"NEW ALBUM" if check_is_new_album(albumhash) else "NEW TRACKS"
|
||||
)
|
||||
album["time"] = timestamp_to_time_passed(time)
|
||||
|
||||
return {
|
||||
"type": "album",
|
||||
"item": album,
|
||||
}
|
||||
|
||||
is_artist, artisthash, trackcount = check_is_artist_folder(group_)
|
||||
is_artist, artisthash, trackcount = check_is_artist_folder(tracks)
|
||||
if is_artist:
|
||||
artist = ArtistStore.get_artist_by_hash(artisthash)
|
||||
|
||||
@@ -133,13 +136,15 @@ def check_folder_type(group_: group_type) -> str:
|
||||
artist["help_text"] = (
|
||||
"NEW ARTIST" if check_is_new_artist(artisthash) else "NEW MUSIC"
|
||||
)
|
||||
artist["time"] = timestamp_to_time_passed(time)
|
||||
|
||||
return {
|
||||
"type": "artist",
|
||||
"item": artist,
|
||||
}
|
||||
|
||||
is_track_folder = check_is_track_folder(group_)
|
||||
is_track_folder = check_is_track_folder(tracks)
|
||||
|
||||
return (
|
||||
is_track_folder
|
||||
if is_track_folder
|
||||
@@ -149,18 +154,23 @@ def check_folder_type(group_: group_type) -> str:
|
||||
"path": key,
|
||||
"count": len(tracks),
|
||||
"help_text": "NEW MUSIC",
|
||||
"time": timestamp_to_time_passed(time),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def group_track_by_folders(tracks: Track) -> (str, list[Track]):
|
||||
def group_track_by_folders(tracks: Track):
|
||||
tracks = sorted(tracks, key=lambda t: t.folder)
|
||||
groups = groupby(tracks, lambda t: t.folder)
|
||||
groups = ((k, list(g)) for k, g in groups)
|
||||
groups = (
|
||||
{"folder": folder, "tracks": list(tracks), "time": os.path.getctime(folder)}
|
||||
for folder, tracks in groups
|
||||
)
|
||||
print(groups)
|
||||
|
||||
# sort groups by last modified date
|
||||
return sorted(groups, key=lambda g: os.path.getctime(g[0]), reverse=True)
|
||||
return sorted(groups, key=lambda group: group["time"], reverse=True)
|
||||
|
||||
|
||||
def get_recent_items(cutoff_days: int, limit: int = 7):
|
||||
@@ -180,18 +190,23 @@ def get_recent_items(cutoff_days: int, limit: int = 7):
|
||||
|
||||
recent_items = []
|
||||
|
||||
for group in groups[:limit]:
|
||||
for group in groups:
|
||||
item = check_folder_type(group)
|
||||
|
||||
if item not in recent_items:
|
||||
if not item:
|
||||
continue
|
||||
|
||||
recent_items.append(item) if type(item) == dict else recent_items.extend(
|
||||
item
|
||||
(
|
||||
recent_items.append(item)
|
||||
if type(item) == dict
|
||||
else recent_items.extend(item)
|
||||
)
|
||||
|
||||
return recent_items[:limit]
|
||||
if len(recent_items) >= limit:
|
||||
break
|
||||
|
||||
return recent_items
|
||||
|
||||
|
||||
def get_recent_tracks(cutoff_days: int):
|
||||
|
||||
@@ -7,6 +7,7 @@ from app.db.sqlite.favorite import SQLiteFavoriteMethods as fdb
|
||||
|
||||
from app.serializers.track import serialize_track
|
||||
from app.serializers.album import album_serializer
|
||||
from app.utils.dates import timestamp_to_time_passed
|
||||
from app.serializers.artist import serialize_for_card
|
||||
from app.serializers.playlist import serialize_for_card as serialize_playlist
|
||||
from app.lib.playlistlib import get_first_4_images, get_recently_added_playlist
|
||||
@@ -40,7 +41,7 @@ def get_recently_played(limit=7):
|
||||
|
||||
album = album_serializer(
|
||||
album,
|
||||
{
|
||||
to_remove={
|
||||
"genres",
|
||||
"date",
|
||||
"count",
|
||||
@@ -50,8 +51,14 @@ def get_recently_played(limit=7):
|
||||
},
|
||||
)
|
||||
album["help_text"] = "album"
|
||||
album["time"] = timestamp_to_time_passed(entry.timestamp)
|
||||
|
||||
items.append({"type": "album", "item": album})
|
||||
items.append(
|
||||
{
|
||||
"type": "album",
|
||||
"item": album,
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
if entry.type == "artist":
|
||||
@@ -62,8 +69,14 @@ def get_recently_played(limit=7):
|
||||
|
||||
artist = serialize_for_card(artist)
|
||||
artist["help_text"] = "artist"
|
||||
artist["time"] = timestamp_to_time_passed(entry.timestamp)
|
||||
|
||||
items.append({"type": "artist", "item": artist})
|
||||
items.append(
|
||||
{
|
||||
"type": "artist",
|
||||
"item": artist,
|
||||
}
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
@@ -85,6 +98,7 @@ def get_recently_played(limit=7):
|
||||
"path": entry.type_src,
|
||||
"count": count,
|
||||
"help_text": "folder",
|
||||
"time": timestamp_to_time_passed(entry.timestamp),
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -102,6 +116,8 @@ def get_recently_played(limit=7):
|
||||
)
|
||||
|
||||
playlist["help_text"] = "playlist"
|
||||
playlist["time"] = timestamp_to_time_passed(entry.timestamp)
|
||||
|
||||
items.append(
|
||||
{
|
||||
"type": "playlist",
|
||||
@@ -125,7 +141,11 @@ def get_recently_played(limit=7):
|
||||
items.append(
|
||||
{
|
||||
"type": "playlist",
|
||||
"item": {"help_text": "playlist", **serialize_playlist(playlist)},
|
||||
"item": {
|
||||
"help_text": "playlist",
|
||||
"time": timestamp_to_time_passed(entry.timestamp),
|
||||
**serialize_playlist(playlist),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -136,6 +156,7 @@ def get_recently_played(limit=7):
|
||||
"item": {
|
||||
"help_text": "playlist",
|
||||
"count": fdb.get_track_count(),
|
||||
"time": timestamp_to_time_passed(entry.timestamp),
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -148,7 +169,13 @@ def get_recently_played(limit=7):
|
||||
|
||||
track = serialize_track(track)
|
||||
track["help_text"] = "track"
|
||||
track["time"] = timestamp_to_time_passed(entry.timestamp)
|
||||
|
||||
items.append({"type": "track", "item": track})
|
||||
items.append(
|
||||
{
|
||||
"type": "track",
|
||||
"item": track,
|
||||
}
|
||||
)
|
||||
|
||||
return items
|
||||
|
||||
@@ -47,6 +47,7 @@ class LyricsProvider(LRCProvider):
|
||||
"""
|
||||
|
||||
ROOT_URL = Keys.PLUGIN_LYRICS_ROOT_URL
|
||||
get_token_trials = 0
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
@@ -85,6 +86,10 @@ class LyricsProvider(LRCProvider):
|
||||
return None
|
||||
|
||||
def _get_token(self):
|
||||
if self.get_token_trials > 3:
|
||||
self.get_token_trials = 0
|
||||
return
|
||||
|
||||
# Check if token is cached and not expired
|
||||
plugin_path = Paths.get_lyrics_plugins_path()
|
||||
token_path = os.path.join(plugin_path, "token.json")
|
||||
@@ -110,11 +115,13 @@ class LyricsProvider(LRCProvider):
|
||||
|
||||
res = res.json()
|
||||
if res["message"]["header"]["status_code"] == 401:
|
||||
time.sleep(10)
|
||||
time.sleep(13)
|
||||
self.get_token_trials += 1
|
||||
return self._get_token()
|
||||
|
||||
new_token = res["message"]["body"]["user_token"]
|
||||
expiration_time = current_time + 600 # 10 minutes expiration
|
||||
self.get_token_trials = 0
|
||||
|
||||
# Cache the new token
|
||||
self.token = new_token
|
||||
|
||||
+23
-4
@@ -1,9 +1,20 @@
|
||||
import pendulum
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
|
||||
def timestamp_from_days_ago(days_ago: int):
|
||||
"""
|
||||
Returns a timestamp from a number of days ago.
|
||||
"""
|
||||
current_datetime = datetime.now()
|
||||
delta = timedelta(days=days_ago)
|
||||
past_timestamp = current_datetime - delta
|
||||
|
||||
return int(past_timestamp.timestamp())
|
||||
|
||||
|
||||
def create_new_date(date: datetime = None) -> str:
|
||||
"""
|
||||
Creates a new date and time string in the format of "YYYY-MM-DD HH:MM:SS"
|
||||
@@ -15,18 +26,26 @@ def create_new_date(date: datetime = None) -> str:
|
||||
return date.strftime(_format)
|
||||
|
||||
|
||||
def date_string_to_time_passed(prev_date: str) -> str:
|
||||
def timestamp_to_time_passed(timestamp: str):
|
||||
"""
|
||||
Converts a date string to time passed. e.g. 2 minutes ago, 1 hour ago, yesterday, 2 days ago, 2 weeks ago, etc.
|
||||
Converts a timestamp to time passed. e.g. 2 minutes ago, 1 hour ago, yesterday, 2 days ago, 2 weeks ago, etc.
|
||||
"""
|
||||
now = datetime.now().timestamp()
|
||||
then = datetime.strptime(prev_date, _format).timestamp()
|
||||
then = datetime.fromtimestamp(int(timestamp)).timestamp()
|
||||
|
||||
diff = now - then
|
||||
now = pendulum.now()
|
||||
return now.subtract(seconds=diff).diff_for_humans()
|
||||
|
||||
|
||||
def date_string_to_time_passed(prev_date: str) -> str:
|
||||
"""
|
||||
Converts a date string to time passed. e.g. 2 minutes ago, 1 hour ago, yesterday, 2 days ago, 2 weeks ago, etc.
|
||||
"""
|
||||
then = datetime.strptime(prev_date, _format).timestamp()
|
||||
return timestamp_to_time_passed(then)
|
||||
|
||||
|
||||
def seconds_to_time_string(seconds):
|
||||
"""
|
||||
Converts seconds to a time string. e.g. 1 hour 2 minutes, 1 hour 2 seconds, 1 hour, 1 minute 2 seconds, etc.
|
||||
|
||||
Reference in New Issue
Block a user