implement mix tracklist balancing

This commit is contained in:
cwilvx
2024-10-27 06:35:37 +01:00
parent c4a73f0d63
commit f2153d936d
5 changed files with 150 additions and 5 deletions
+3
View File
@@ -7,6 +7,9 @@ from app.utils.threading import background
@background
def start_cron_jobs():
"""
This is the function that triggers the cron jobs.
"""
Mixes().run()
# schedule.run_pending()
+6
View File
@@ -4,10 +4,16 @@ from app.store.homepage import HomepageStore
class Mixes(CronJob):
"""
This cron job creates mixes displayed on the homepage.
"""
def __init__(self):
super().__init__("mixes", 5)
def run(self):
"""
Creates the artist mixes
"""
print("⭐⭐⭐⭐ Mixes cron job running")
mixes = MixesPlugin()
artist_mixes = mixes.get_artists()
+39
View File
@@ -3,6 +3,7 @@ import string
import requests
from urllib.parse import quote
from app.db.userdata import SimilarArtistTable
from app.models.artist import Artist
from app.models.mix import Mix
from app.models.track import Track
@@ -10,6 +11,7 @@ from app.plugins import Plugin, plugin_method
from app.store.artists import ArtistStore
from app.store.tracks import TrackStore
from app.utils.dates import get_date_range
from app.utils.mixes import balance_mix
from app.utils.remove_duplicates import remove_duplicates
from app.utils.stats import get_artists_in_period
@@ -62,6 +64,7 @@ class MixesPlugin(Plugin):
# sort by trackhash order
trackmatches = sorted(trackmatches, key=lambda x: trackhashes.index(x.weakhash))
trackmatches = balance_mix(trackmatches)
return trackmatches
@plugin_method
@@ -123,6 +126,7 @@ class MixesPlugin(Plugin):
indexed.add(artist["artisthash"])
period["created"] += 1
print(f"⭐⭐⭐⭐ Created {len(mixes)} mixes")
return mixes
def get_mix_description(self, tracks: list[Track], artishash: str):
@@ -162,3 +166,38 @@ class MixesPlugin(Plugin):
"artisthash": artist["artisthash"],
},
)
def fallback_create_artist_mix(self, artist: dict[str, str], similar_artists: list[str], trackhashes: set[str], limit: int):
"""
Creates an artist mix by selecting random tracks from similar artists.
This is used when:
- The Swing Music recommendation server is down.
- The artist has less than self.MIN_TRACK_MIX_LENGTH tracks from the cloud mix.
- When we need to dilute the mix to balance the artist distribution.
:param artist: The artist to create a mix for.
:param similar_artists: A list of similar artists to select tracks from. If not provided, we try reading from the local database. When we exhaust the passed list, we also try reading from the local database.
:param trackhashes: A set of trackhashes to omit from the new tracklist.
:param limit: The maximum number of tracks to select.
"""
artists = similar_artists
if len(similar_artists) == 0:
local_similar_artists = SimilarArtistTable.get_by_hash(artist["artisthash"])
if local_similar_artists:
artists = [a.artisthash for a in local_similar_artists.similar_artists]
if len(artists) == 0:
return []
# CHECKPOINT: I'M TIRED AF AND I NEED TO SLEEP
# The plan:
# Figure out which artists we should skip for the new tracklist
# these would be artists with a large number of tracks in the mix already
# Since the artisthashes are ordered by similarity score, we iterate from the start
# and go forward collecting tracks that aren't in the mix yet.
#
+95
View File
@@ -0,0 +1,95 @@
from app.models.track import Track
from typing import List, Dict, Tuple
from collections import Counter
def violates_gap_rule(balanced_mix: Dict[int, Track], position: int, track: Track, gap: int = 3) -> bool:
"""
Check if placing the track at the given position violates the gap rule.
"""
track_artists = set(artist['artisthash'] for artist in track.artists)
for i in range(max(0, position - gap), position):
if i in balanced_mix:
existing_artists = set(artist['artisthash'] for artist in balanced_mix[i].artists)
if track_artists.intersection(existing_artists):
return True
return False
def find_next_position(balanced_mix: Dict[int, Track], start: int, track: Track, total_tracks: int) -> int:
"""
Find the next available position for the track, starting from 'start' and wrapping around.
"""
for i in range(start, total_tracks):
if i not in balanced_mix and not violates_gap_rule(balanced_mix, i, track):
return i
for i in range(start):
if i not in balanced_mix and not violates_gap_rule(balanced_mix, i, track):
return i
return start # If no better position is found, return the original position
def is_tracklist_balanced(tracks: List[Track], gap: int = 3) -> Tuple[bool, bool]:
"""
Checks if a tracklist is balanced or can be balanced.
Args:
- tracks: List of Track objects
- gap: Minimum number of tracks between songs by the same artist (default 3)
Returns:
- A tuple (can_be_balanced, is_currently_balanced)
"""
total_tracks = len(tracks)
# Count tracks per artist (considering only the first artist)
artist_counts = Counter(track.artists[0]['artisthash'] for track in tracks)
# Calculate the maximum number of tracks an artist can have in a balanced list
max_tracks_per_artist = (total_tracks + gap) // (gap + 1)
# Check if it's mathematically possible to balance the tracklist
can_be_balanced = all(count <= max_tracks_per_artist for count in artist_counts.values())
if not can_be_balanced:
return False, False
# Check if the current arrangement is balanced
is_currently_balanced = True
artist_last_positions = {}
for i, track in enumerate(tracks):
artist = track.artists[0]['artisthash']
if artist in artist_last_positions:
if i - artist_last_positions[artist] <= gap:
is_currently_balanced = False
break
artist_last_positions[artist] = i
return can_be_balanced, is_currently_balanced
def balance_mix(tracks: List[Track]) -> List[Track]:
"""
Balances the mix by ensuring that the tracks in a mix are distributed evenly.
Preserves the overall rating order of tracks while minimizing disruption.
"""
can_be_balanced, is_balanced = is_tracklist_balanced(tracks)
if not can_be_balanced:
print("Warning: This tracklist cannot be perfectly balanced.")
# Proceed with best-effort balancing
if is_balanced:
return tracks # Already balanced, no need to modify
balanced_mix: Dict[int, Track] = {}
total_tracks = len(tracks)
for i, track in enumerate(tracks):
if i in balanced_mix or not violates_gap_rule(balanced_mix, i, track):
balanced_mix[i] = track
else:
new_position = find_next_position(balanced_mix, i, track, total_tracks)
balanced_mix[new_position] = track
# Convert the dictionary back to a list, preserving the new order
return [balanced_mix[i] for i in sorted(balanced_mix.keys())]
+7 -5
View File
@@ -60,7 +60,7 @@ mimetypes.add_type("application/manifest+json", ".webmanifest")
# Background tasks
@background
# @background
def bg_run_setup():
IndexEverything()
@@ -73,21 +73,19 @@ def bg_run_setup():
@background
def run_swingmusic():
log_startup_info()
bg_run_setup()
register_plugins()
start_cron_jobs()
# start_watchdog()
setproctitle.setproctitle(f"swingmusic ::{FLASKVARS.get_flask_port()}")
# bg_run_setup()
start_cron_jobs()
# Setup function calls
Info.load()
ProcessArgs()
run_setup()
load_into_mem()
run_swingmusic()
# Create the Flask app
@@ -226,6 +224,10 @@ def print_memory_usage(response: Response):
if __name__ == "__main__":
load_into_mem()
run_swingmusic()
host = FLASKVARS.get_flask_host()
port = FLASKVARS.get_flask_port()