Files
Tomas Dvorak 6e8fedf534 first commit
2026-04-13 17:46:58 +02:00

966 lines
34 KiB
Python

from __future__ import annotations
import hashlib
import logging
import os
import re
import threading
import time
from typing import Any
from sqlalchemy import and_, insert, select, update
from swingmusic.db.engine import DbEngine
from swingmusic.db.libdata import TrackTable
from swingmusic.db.production import DownloadJobTable, TrackedPlaylistTable
from swingmusic.db.userdata import PlaylistTable
from swingmusic.services.download_jobs import download_job_manager
from swingmusic.services.library_projection import get_track_availability_map
from swingmusic.services.spotify_metadata_client import (
SpotifyTrack,
get_spotify_metadata_client,
)
from swingmusic.services.universal_url_parser import universal_url_parser
from swingmusic.utils.dates import create_new_date
from swingmusic.utils.hashing import create_hash
log = logging.getLogger(__name__)
_SPOTIFY_PLAYLIST_URL_PATTERN = re.compile(
r"(?:https?://)?(?:open\.)?spotify\.com/playlist/([A-Za-z0-9]+)",
re.IGNORECASE,
)
def _quality_codec_pair(quality: str | None, codec: str | None) -> tuple[str, str]:
quality_name = (quality or "lossless").lower().strip()
codec_name = (codec or "flac").lower().strip()
quality_map = {
"flac": ("lossless", "flac"),
"lossless": ("lossless", "flac"),
"high": ("high", "mp3"),
"medium": ("medium", "mp3"),
"low": ("low", "mp3"),
"mp3_320": ("high", "mp3"),
"mp3_256": ("medium", "mp3"),
"mp3_192": ("medium", "mp3"),
"mp3_128": ("low", "mp3"),
}
if quality_name in quality_map:
return quality_map[quality_name]
if codec_name == "flac":
return ("lossless", "flac")
if quality_name not in {"lossless", "high", "medium", "low"}:
quality_name = "high"
if codec_name not in {"flac", "mp3", "aac", "ogg", "opus", "m4a"}:
codec_name = "mp3"
return (quality_name, codec_name)
def _parse_spotify_playlist_id(url: str) -> str | None:
parsed = universal_url_parser.parse_url(url)
if parsed and parsed.service.value == "spotify" and parsed.item_type == "playlist":
return parsed.id
match = _SPOTIFY_PLAYLIST_URL_PATTERN.search(url or "")
if match:
return match.group(1)
return None
def _parse_trackable_playlist_source(url: str) -> tuple[str, str, str] | None:
"""
Returns (service, item_type, item_id) for trackable external list sources.
"""
parsed = universal_url_parser.parse_url(url)
if not parsed:
return None
item_type = (parsed.item_type or "").lower()
if item_type != "playlist":
return None
service = parsed.service.value
item_id = parsed.id or ""
if not item_id:
return None
return service, item_type, item_id
def _trackhash_from_spotify_track(track: SpotifyTrack) -> str | None:
title = (track.name or "").strip()
artist_names = [
artist.get("name", "") for artist in (track.artists or []) if artist.get("name")
]
artist = ", ".join([name for name in artist_names if name]).strip()
album = ""
if isinstance(track.album, dict):
album = (track.album.get("name") or "").strip()
if not title or not artist:
return None
return create_hash(title, album, artist)
def _snapshot_hash(track_ids: list[str]) -> str:
joined = "\n".join(track_ids)
return hashlib.sha1(joined.encode("utf-8")).hexdigest()
def _as_int(value: Any) -> int | None:
try:
if value is None:
return None
return int(value)
except (TypeError, ValueError):
return None
def _tracked_playlist_name(service: str, title: str | None, playlist_id: str) -> str:
base = (title or "").strip() or f"{service.title()} Playlist {playlist_id[:12]}"
return f"[Tracked] {base}"[:180]
def _find_mirror_playlist_row(
userid: int, tracked_id: int, local_playlist_id: int | None = None
) -> Any | None:
with DbEngine.manager() as conn:
if local_playlist_id:
row = conn.execute(
select(PlaylistTable).where(
and_(
PlaylistTable.id == local_playlist_id,
PlaylistTable.userid == userid,
)
)
).scalar_one_or_none()
if row:
return row
rows = list(
conn.execute(
select(PlaylistTable).where(PlaylistTable.userid == userid)
).scalars()
)
for row in rows:
extra = row.extra or {}
if not isinstance(extra, dict):
continue
tracked_extra = extra.get("tracked_playlist") or {}
if not isinstance(tracked_extra, dict):
continue
if _as_int(tracked_extra.get("tracked_id")) == tracked_id:
return row
return None
def _sync_mirror_playlist(
*,
tracked_row: Any,
playlist_title: str | None,
owner_name: str | None,
ordered_trackhashes: list[str],
snapshot_track_ids: list[str],
) -> int | None:
userid = int(tracked_row.userid)
row_extra = tracked_row.extra or {}
local_playlist_id = (
_as_int(row_extra.get("local_playlist_id"))
if isinstance(row_extra, dict)
else None
)
mirror_row = _find_mirror_playlist_row(userid, tracked_row.id, local_playlist_id)
playlist_name = _tracked_playlist_name(
tracked_row.service, playlist_title, tracked_row.playlist_id
)
now = int(time.time())
tracked_meta = {
"tracked_id": tracked_row.id,
"service": tracked_row.service,
"playlist_id": tracked_row.playlist_id,
"source_url": tracked_row.source_url,
"owner_name": owner_name,
"last_synced_at": now,
"snapshot_track_count": len(snapshot_track_ids),
}
if mirror_row:
playlist_extra = mirror_row.extra if isinstance(mirror_row.extra, dict) else {}
playlist_extra = {
**playlist_extra,
"managed": True,
"tracked_playlist": tracked_meta,
}
with DbEngine.manager(commit=True) as conn:
conn.execute(
update(PlaylistTable)
.where(
and_(
PlaylistTable.id == mirror_row.id,
PlaylistTable.userid == userid,
)
)
.values(
name=playlist_name,
last_updated=create_new_date(),
trackhashes=ordered_trackhashes,
extra=playlist_extra,
)
)
return int(mirror_row.id)
playlist_settings = {
"has_gif": False,
"banner_pos": 50,
"square_img": False,
"pinned": False,
}
playlist_extra = {
"managed": True,
"tracked_playlist": tracked_meta,
}
with DbEngine.manager(commit=True) as conn:
result = conn.execute(
insert(PlaylistTable).values(
name=playlist_name,
image=None,
last_updated=create_new_date(),
userid=userid,
settings=playlist_settings,
trackhashes=ordered_trackhashes,
extra=playlist_extra,
)
)
inserted_id = None
try:
inserted_id = _as_int(result.inserted_primary_key[0])
except Exception:
inserted_id = None
if inserted_id is None:
inserted_id = _as_int(getattr(result, "lastrowid", None))
return inserted_id
def _serialize_tracked_playlist(row: Any) -> dict[str, Any]:
row_extra = row.extra if isinstance(row.extra, dict) else {}
return {
"id": row.id,
"userid": row.userid,
"service": row.service,
"playlist_id": row.playlist_id,
"source_url": row.source_url,
"title": row.title,
"owner_name": row.owner_name,
"quality": row.quality,
"codec": row.codec,
"auto_sync": bool(row.auto_sync),
"sync_interval_seconds": int(row.sync_interval_seconds or 0),
"next_sync_at": row.next_sync_at,
"last_sync_at": row.last_sync_at,
"status": row.status,
"snapshot_track_count": len(row.snapshot_track_ids or []),
"snapshot_hash": row.snapshot_hash,
"local_playlist_id": _as_int(row_extra.get("local_playlist_id")),
"last_result": row.last_result or {},
"last_error": row.last_error,
"created_at": row.created_at,
"updated_at": row.updated_at,
}
def _has_active_sync_job(userid: int, tracked_id: int) -> bool:
with DbEngine.manager() as conn:
result = conn.execute(
select(DownloadJobTable)
.where(
and_(
DownloadJobTable.userid == userid,
DownloadJobTable.state.in_(["queued", "downloading"]),
)
)
.order_by(DownloadJobTable.created_at.desc())
.limit(200)
)
jobs = list(result.scalars())
for job in jobs:
payload = job.payload or {}
if payload.get("tracked_playlist_id") == tracked_id:
return True
return False
def _latest_completed_sync_job(userid: int, tracked_id: int) -> Any | None:
with DbEngine.manager() as conn:
result = conn.execute(
select(DownloadJobTable)
.where(
and_(
DownloadJobTable.userid == userid,
DownloadJobTable.state == "completed",
)
)
.order_by(DownloadJobTable.created_at.desc())
.limit(300)
)
jobs = list(result.scalars())
for job in jobs:
payload = job.payload or {}
if payload.get("tracked_playlist_id") == tracked_id:
return job
return None
def _collect_trackhashes_for_path(path: str | None) -> list[str]:
if not path:
return []
scope_path = path if os.path.isdir(path) else os.path.dirname(path)
if not scope_path or not os.path.exists(scope_path):
return []
tracks = TrackTable.get_tracks_in_path(scope_path)
seen: set[str] = set()
ordered: list[str] = []
for track in tracks:
trackhash = getattr(track, "trackhash", None)
if not trackhash or trackhash in seen:
continue
seen.add(trackhash)
ordered.append(trackhash)
return ordered
def _snapshot_ids_from_trackhashes(trackhashes: list[str]) -> list[str]:
return [f"trackhash:{trackhash}" for trackhash in trackhashes if trackhash]
class PlaylistTrackingService:
def __init__(self) -> None:
self._thread: threading.Thread | None = None
self._stop = threading.Event()
self._lock = threading.RLock()
self._processing: set[int] = set()
self.poll_interval_seconds = int(
max(
15,
min(
int(float(os.getenv("SWINGMUSIC_PLAYLIST_TRACKER_POLL", "30"))), 300
),
)
)
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._stop.clear()
self._thread = threading.Thread(
target=self._worker_loop,
name="playlist-tracking-worker",
daemon=True,
)
self._thread.start()
def stop(self) -> None:
self._stop.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2)
def list_tracked_playlists(self, userid: int) -> list[dict[str, Any]]:
rows = TrackedPlaylistTable.list_for_user(userid)
return [_serialize_tracked_playlist(row) for row in rows]
def get_tracked_playlist(
self, tracked_id: int, userid: int
) -> dict[str, Any] | None:
row = TrackedPlaylistTable.get_by_id(tracked_id, userid=userid)
if not row:
return None
return _serialize_tracked_playlist(row)
def find_tracked_playlist(
self, *, userid: int, service: str, playlist_id: str
) -> dict[str, Any] | None:
row = TrackedPlaylistTable.get_by_source(
userid=userid, service=service, playlist_id=playlist_id
)
if not row or row.status == "deleted":
return None
return _serialize_tracked_playlist(row)
def track_playlist(
self,
*,
userid: int,
source_url: str,
quality: str | None = None,
codec: str | None = None,
auto_sync: bool = True,
sync_interval_seconds: int = 900,
sync_now: bool = True,
) -> dict[str, Any]:
source = _parse_trackable_playlist_source(source_url)
if not source:
raise ValueError("Only trackable playlist links are supported")
service, item_type, playlist_id = source
quality_name, codec_name = _quality_codec_pair(quality, codec)
interval = max(120, min(int(sync_interval_seconds or 900), 24 * 3600))
now = int(time.time())
playlist = None
owner_name = None
title = None
if service == "spotify":
client = get_spotify_metadata_client()
playlist = client.get_playlist(playlist_id)
if playlist:
owner_name = (playlist.owner or {}).get("display_name")
title = playlist.name
existing = TrackedPlaylistTable.get_by_source(
userid=userid,
service=service,
playlist_id=playlist_id,
)
existing_extra = (
existing.extra if existing and isinstance(existing.extra, dict) else {}
)
merged_extra = {
**existing_extra,
"item_type": item_type,
"service": service,
}
tracked = TrackedPlaylistTable.upsert(
userid=userid,
service=service,
playlist_id=playlist_id,
source_url=source_url,
values={
"title": title,
"owner_name": owner_name,
"quality": quality_name,
"codec": codec_name,
"auto_sync": bool(auto_sync),
"sync_interval_seconds": interval,
"next_sync_at": now,
"status": "active",
"last_error": None,
"extra": merged_extra,
},
)
result: dict[str, Any] = {
"tracked": _serialize_tracked_playlist(tracked),
"sync": None,
}
if sync_now:
result["sync"] = self.sync_tracked_playlist(
tracked.id, userid=userid, force=True
)
refreshed = TrackedPlaylistTable.get_by_id(tracked.id, userid=userid)
if refreshed:
result["tracked"] = _serialize_tracked_playlist(refreshed)
return result
def sync_tracked_playlist(
self, tracked_id: int, *, userid: int, force: bool = False
) -> dict[str, Any]:
with self._lock:
if tracked_id in self._processing:
return {
"success": False,
"tracked_id": tracked_id,
"message": "Sync already in progress",
}
self._processing.add(tracked_id)
try:
row = TrackedPlaylistTable.get_by_id(tracked_id, userid=userid)
if not row:
return {
"success": False,
"tracked_id": tracked_id,
"message": "Tracked playlist not found",
}
if row.status == "deleted":
return {
"success": False,
"tracked_id": tracked_id,
"message": "Tracked playlist is deleted",
}
if not force and not row.auto_sync:
return {
"success": True,
"tracked_id": tracked_id,
"message": "Auto-sync is disabled",
"queued_tracks": 0,
"added_tracks": 0,
"removed_tracks": 0,
"reordered_tracks": 0,
}
now = int(time.time())
TrackedPlaylistTable.update_row(
row.id,
{
"status": "syncing",
"last_error": None,
"next_sync_at": now
+ max(120, int(row.sync_interval_seconds or 900)),
},
)
# Generic multi-platform fallback:
# for non-Spotify playlist providers we still keep the link tracked and
# periodically queue a playlist-level refresh job.
if row.service != "spotify":
old_snapshot_ids = [
track_id for track_id in (row.snapshot_track_ids or []) if track_id
]
new_snapshot_ids = list(old_snapshot_ids)
resolved_trackhashes: list[str] = []
latest_job = _latest_completed_sync_job(userid, row.id)
if latest_job:
resolved_trackhashes = _collect_trackhashes_for_path(
latest_job.target_path
)
new_snapshot_ids = _snapshot_ids_from_trackhashes(
resolved_trackhashes
)
elif old_snapshot_ids and all(
str(track_id).startswith("trackhash:")
for track_id in old_snapshot_ids
):
resolved_trackhashes = [
str(track_id).split(":", 1)[1]
for track_id in old_snapshot_ids
if ":" in str(track_id)
]
old_set = set(old_snapshot_ids)
new_set = set(new_snapshot_ids)
added_items = [
track_id for track_id in new_snapshot_ids if track_id not in old_set
]
removed_items = [
track_id for track_id in old_snapshot_ids if track_id not in new_set
]
old_positions = {
track_id: index for index, track_id in enumerate(old_snapshot_ids)
}
reordered_items = 0
for index, track_id in enumerate(new_snapshot_ids):
previous = old_positions.get(track_id)
if previous is not None and previous != index:
reordered_items += 1
local_playlist_id = _sync_mirror_playlist(
tracked_row=row,
playlist_title=row.title,
owner_name=row.owner_name,
ordered_trackhashes=resolved_trackhashes,
snapshot_track_ids=new_snapshot_ids,
)
if _has_active_sync_job(userid, row.id):
summary = {
"success": True,
"tracked_id": row.id,
"playlist_id": row.playlist_id,
"playlist_title": row.title or row.playlist_id,
"local_playlist_id": local_playlist_id,
"total_tracks": len(new_snapshot_ids),
"added_tracks": len(added_items),
"removed_tracks": len(removed_items),
"reordered_tracks": reordered_items,
"queued_tracks": 0,
"skipped_tracks": 1,
"queue_errors": 0,
"synced_at": now,
"message": "Active job already exists for this tracked source",
}
else:
item_type = str((row.extra or {}).get("item_type") or "playlist")
try:
download_job_manager.enqueue(
userid=userid,
source_url=row.source_url,
source=row.service,
quality=row.quality,
codec=row.codec,
title=row.title,
artist=row.owner_name,
album=None,
item_type=item_type,
payload={
"tracked_playlist_id": row.id,
"playlist_id": row.playlist_id,
"playlist_title": row.title or row.playlist_id,
"sync_reason": "scheduled_refresh",
},
)
summary = {
"success": True,
"tracked_id": row.id,
"playlist_id": row.playlist_id,
"playlist_title": row.title or row.playlist_id,
"local_playlist_id": local_playlist_id,
"total_tracks": len(new_snapshot_ids),
"added_tracks": len(added_items),
"removed_tracks": len(removed_items),
"reordered_tracks": reordered_items,
"queued_tracks": 1,
"skipped_tracks": 0,
"queue_errors": 0,
"synced_at": now,
"message": f"Queued {row.service} playlist refresh",
}
except Exception as queue_error:
summary = {
"success": False,
"tracked_id": row.id,
"playlist_id": row.playlist_id,
"playlist_title": row.title or row.playlist_id,
"local_playlist_id": local_playlist_id,
"queued_tracks": 0,
"skipped_tracks": 0,
"queue_errors": 1,
"synced_at": now,
"error": str(queue_error),
}
tracked_extra = row.extra if isinstance(row.extra, dict) else {}
if local_playlist_id:
tracked_extra = {
**tracked_extra,
"local_playlist_id": local_playlist_id,
}
update_payload = {
"status": "active" if summary.get("success") else "failed",
"last_sync_at": now,
"next_sync_at": now
+ max(120, int(row.sync_interval_seconds or 900)),
"last_result": summary,
"last_error": summary.get("error"),
"extra": tracked_extra,
}
if new_snapshot_ids is not None:
update_payload["snapshot_track_ids"] = new_snapshot_ids
update_payload["snapshot_hash"] = _snapshot_hash(new_snapshot_ids)
TrackedPlaylistTable.update_row(
row.id,
update_payload,
)
return summary
client = get_spotify_metadata_client()
playlist = client.get_playlist(row.playlist_id)
if not playlist:
raise RuntimeError("Failed to load playlist metadata from Spotify")
max_tracks = int(os.getenv("SWINGMUSIC_PLAYLIST_SYNC_MAX_TRACKS", "800"))
tracks = client.get_playlist_tracks(
row.playlist_id, limit=max(1, min(max_tracks, 2000))
)
track_records: list[dict[str, Any]] = []
for track in tracks:
if not track.id:
continue
album_name = ""
if isinstance(track.album, dict):
album_name = track.album.get("name", "")
artists = [
artist.get("name", "")
for artist in (track.artists or [])
if artist.get("name")
]
artist_name = ", ".join([name for name in artists if name]).strip()
trackhash = _trackhash_from_spotify_track(track)
track_records.append(
{
"spotify_id": track.id,
"trackhash": trackhash,
"title": track.name,
"artist": artist_name,
"album": album_name,
"source_url": f"https://open.spotify.com/track/{track.id}",
}
)
new_track_ids = [record["spotify_id"] for record in track_records]
old_track_ids = [
track_id for track_id in (row.snapshot_track_ids or []) if track_id
]
old_set = set(old_track_ids)
new_set = set(new_track_ids)
added_track_ids = [
track_id for track_id in new_track_ids if track_id not in old_set
]
removed_track_ids = [
track_id for track_id in old_track_ids if track_id not in new_set
]
old_positions = {
track_id: index for index, track_id in enumerate(old_track_ids)
}
reordered_tracks = 0
for index, track_id in enumerate(new_track_ids):
previous = old_positions.get(track_id)
if previous is not None and previous != index:
reordered_tracks += 1
trackhashes = [
record["trackhash"]
for record in track_records
if record.get("trackhash")
]
availability = get_track_availability_map(trackhashes, userid=userid)
added_set = set(added_track_ids)
removed_set = set(removed_track_ids)
queued_tracks = 0
skipped_tracks = 0
queue_errors = 0
cancelled_removed_jobs = 0
seen_trackhashes: set[str] = set()
mirror_trackhashes = [
record["trackhash"]
for record in track_records
if record.get("trackhash")
]
local_playlist_id = _sync_mirror_playlist(
tracked_row=row,
playlist_title=playlist.name,
owner_name=(playlist.owner or {}).get("display_name")
if playlist.owner
else row.owner_name,
ordered_trackhashes=mirror_trackhashes,
snapshot_track_ids=new_track_ids,
)
if removed_set:
active_jobs = DownloadJobTable.list_for_user(
userid, states={"queued", "downloading"}
)
for job in active_jobs:
payload = job.payload or {}
if payload.get("tracked_playlist_id") != row.id:
continue
if payload.get("spotify_id") not in removed_set:
continue
if download_job_manager.cancel(job.id, userid):
cancelled_removed_jobs += 1
for record in track_records:
trackhash = record.get("trackhash")
spotify_id = record.get("spotify_id")
if not spotify_id or not trackhash:
skipped_tracks += 1
continue
if trackhash in seen_trackhashes:
skipped_tracks += 1
continue
seen_trackhashes.add(trackhash)
status = (availability.get(trackhash) or {}).get("state", "missing")
should_queue = False
if spotify_id in added_set:
should_queue = status != "available"
elif force:
should_queue = status in {"missing", "failed"}
if status == "queued":
should_queue = False
if not should_queue:
skipped_tracks += 1
continue
try:
download_job_manager.enqueue(
userid=userid,
source_url=record["source_url"],
source="spotify",
quality=row.quality,
codec=row.codec,
trackhash=trackhash,
title=record.get("title"),
artist=record.get("artist"),
album=record.get("album"),
item_type="track",
payload={
"tracked_playlist_id": row.id,
"playlist_id": row.playlist_id,
"playlist_title": row.title or playlist.name,
"spotify_id": spotify_id,
"sync_reason": "new_track"
if spotify_id in added_set
else "missing_repair",
},
)
queued_tracks += 1
except Exception:
queue_errors += 1
summary = {
"success": True,
"tracked_id": row.id,
"playlist_id": row.playlist_id,
"playlist_title": playlist.name,
"local_playlist_id": local_playlist_id,
"total_tracks": len(new_track_ids),
"added_tracks": len(added_track_ids),
"removed_tracks": len(removed_track_ids),
"reordered_tracks": reordered_tracks,
"queued_tracks": queued_tracks,
"skipped_tracks": skipped_tracks,
"queue_errors": queue_errors,
"cancelled_removed_jobs": cancelled_removed_jobs,
"synced_at": now,
}
tracked_extra = row.extra if isinstance(row.extra, dict) else {}
if local_playlist_id:
tracked_extra = {
**tracked_extra,
"local_playlist_id": local_playlist_id,
}
TrackedPlaylistTable.update_row(
row.id,
{
"title": playlist.name,
"owner_name": (playlist.owner or {}).get("display_name")
if playlist.owner
else row.owner_name,
"status": "active",
"last_sync_at": now,
"next_sync_at": now
+ max(120, int(row.sync_interval_seconds or 900)),
"snapshot_track_ids": new_track_ids,
"snapshot_hash": _snapshot_hash(new_track_ids),
"last_result": {
**summary,
"removed_track_ids": removed_track_ids[:300],
"added_track_ids": added_track_ids[:300],
},
"last_error": None,
"extra": tracked_extra,
},
)
return summary
except Exception as error:
log.exception("Playlist sync failed for tracked_id=%s", tracked_id)
now = int(time.time())
TrackedPlaylistTable.update_row(
tracked_id,
{
"status": "failed",
"last_error": str(error),
"next_sync_at": now + 300,
"last_result": {
"success": False,
"tracked_id": tracked_id,
"error": str(error),
"synced_at": now,
},
},
)
return {
"success": False,
"tracked_id": tracked_id,
"error": str(error),
}
finally:
with self._lock:
self._processing.discard(tracked_id)
def set_auto_sync(
self, tracked_id: int, *, userid: int, enabled: bool
) -> dict[str, Any] | None:
row = TrackedPlaylistTable.get_by_id(tracked_id, userid=userid)
if not row:
return None
now = int(time.time())
updated = TrackedPlaylistTable.update_row(
tracked_id,
{
"auto_sync": bool(enabled),
"status": "active" if enabled else "paused",
"next_sync_at": now + max(120, int(row.sync_interval_seconds or 900)),
},
)
return _serialize_tracked_playlist(updated)
def untrack_playlist(self, tracked_id: int, *, userid: int) -> bool:
row = TrackedPlaylistTable.get_by_id(tracked_id, userid=userid)
if not row:
return False
TrackedPlaylistTable.update_row(
tracked_id,
{
"status": "deleted",
"auto_sync": False,
"next_sync_at": int(time.time()) + (10 * 365 * 24 * 3600),
},
)
return True
def _worker_loop(self) -> None:
while not self._stop.is_set():
try:
due = TrackedPlaylistTable.due_for_sync(
now_ts=int(time.time()), limit=20
)
for row in due:
if self._stop.is_set():
break
self.sync_tracked_playlist(row.id, userid=row.userid, force=False)
except Exception:
log.exception("Playlist tracking worker iteration failed")
self._stop.wait(self.poll_interval_seconds)
playlist_tracking_service = PlaylistTrackingService()