Files
swingmusic-extended/services/radio_sync.py
T
Tomas Dvorak 38f1981283 Move backend files to root level for cleaner GitHub display
- Move all backend files from swingmusic/ to root level
- Backend files now display directly on GitHub repository page
- Keep client applications as submodules (swingmusic-android, swingmusic-desktop, swingmusic-webclient)
- Update README to reflect new structure (no cd swingmusic needed)
- Cleaner, more professional GitHub repository layout

Files moved to root:
- src/ (main source code)
- pyproject.toml, requirements.txt, run.py
- swingmusic.spec, uv.lock, version.txt
- services/

Result: GitHub shows backend files directly while maintaining organized structure
2026-03-17 22:37:49 +01:00

748 lines
28 KiB
Python

# swingmusic/services/radio_sync.py
import asyncio
import json
import logging
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import websockets
from websockets.server import WebSocketServerProtocol
import aiohttp
import aiofiles
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class RadioStation:
"""Radio station information"""
id: str
name: str
type: str # 'artist_radio', 'genre_radio', 'mood_radio', 'custom_radio'
user_id: str
current_track_index: int
tracks: List[Dict]
created_at: datetime
last_played: Optional[datetime] = None
play_count: int = 0
is_active: bool = False
@dataclass
class SyncEvent:
"""Synchronization event"""
event_type: str # 'play', 'pause', 'skip', 'seek', 'track_change'
station_id: str
user_id: str
timestamp: datetime
data: Dict
device_id: str
@dataclass
class DeviceInfo:
"""Connected device information"""
device_id: str
user_id: str
device_type: str # 'web', 'mobile', 'desktop'
last_seen: datetime
websocket: Optional[WebSocketServerProtocol] = None
current_station: Optional[str] = None
playback_position: float = 0.0
class CrossPlatformRadioSync:
"""Cross-platform radio synchronization service"""
def __init__(self):
self.active_stations: Dict[str, RadioStation] = {}
self.connected_devices: Dict[str, DeviceInfo] = {}
self.station_listeners: Dict[str, Set[str]] = {} # station_id -> device_ids
self.user_stations: Dict[str, List[str]] = {} # user_id -> station_ids
# WebSocket server
self.websocket_server = None
self.server_port = 8765
# Configuration
self.sync_timeout = 30 # seconds
self.max_devices_per_user = 5
self.station_cache_ttl = 3600 # 1 hour
# Background tasks
self.cleanup_task = None
self.sync_task = None
async def start_sync_service(self):
"""Start the radio synchronization service"""
try:
# Start WebSocket server
self.websocket_server = await websockets.serve(
self._handle_websocket_connection,
"localhost",
self.server_port
)
# Start background tasks
self.cleanup_task = asyncio.create_task(self._cleanup_loop())
self.sync_task = asyncio.create_task(self._sync_loop())
logger.info(f"Radio sync service started on port {self.server_port}")
except Exception as e:
logger.error(f"Failed to start sync service: {e}")
raise
async def stop_sync_service(self):
"""Stop the radio synchronization service"""
try:
# Cancel background tasks
if self.cleanup_task:
self.cleanup_task.cancel()
if self.sync_task:
self.sync_task.cancel()
# Close WebSocket server
if self.websocket_server:
self.websocket_server.close()
await self.websocket_server.wait_closed()
# Disconnect all devices
for device in self.connected_devices.values():
if device.websocket:
await device.websocket.close()
logger.info("Radio sync service stopped")
except Exception as e:
logger.error(f"Error stopping sync service: {e}")
async def create_radio_station(self, user_id: str, station_data: Dict) -> RadioStation:
"""Create a new radio station"""
try:
station_id = f"{station_data['type']}_{user_id}_{datetime.now().timestamp()}"
station = RadioStation(
id=station_id,
name=station_data['name'],
type=station_data['type'],
user_id=user_id,
current_track_index=0,
tracks=station_data['tracks'],
created_at=datetime.utcnow()
)
# Store station
self.active_stations[station_id] = station
# Update user stations
if user_id not in self.user_stations:
self.user_stations[user_id] = []
self.user_stations[user_id].append(station_id)
# Initialize listeners set
self.station_listeners[station_id] = set()
logger.info(f"Created radio station: {station_id}")
return station
except Exception as e:
logger.error(f"Failed to create radio station: {e}")
raise
async def get_user_stations(self, user_id: str) -> List[RadioStation]:
"""Get all radio stations for a user"""
try:
station_ids = self.user_stations.get(user_id, [])
stations = []
for station_id in station_ids:
if station_id in self.active_stations:
stations.append(self.active_stations[station_id])
return stations
except Exception as e:
logger.error(f"Failed to get user stations: {e}")
return []
async def join_station(self, user_id: str, device_id: str, station_id: str) -> bool:
"""Join a radio station"""
try:
# Check if station exists
if station_id not in self.active_stations:
logger.warning(f"Station not found: {station_id}")
return False
# Check if device is connected
if device_id not in self.connected_devices:
logger.warning(f"Device not connected: {device_id}")
return False
# Remove from previous station if any
await self._leave_station(device_id)
# Add to station listeners
self.station_listeners[station_id].add(device_id)
# Update device info
device = self.connected_devices[device_id]
device.current_station = station_id
device.playback_position = 0.0
# Mark station as active
station = self.active_stations[station_id]
station.is_active = True
# Send current track to device
await self._send_current_track(device_id, station)
logger.info(f"Device {device_id} joined station {station_id}")
return True
except Exception as e:
logger.error(f"Failed to join station: {e}")
return False
async def leave_station(self, device_id: str) -> bool:
"""Leave current radio station"""
try:
return await self._leave_station(device_id)
except Exception as e:
logger.error(f"Failed to leave station: {e}")
return False
async def _leave_station(self, device_id: str) -> bool:
"""Internal leave station method"""
device = self.connected_devices.get(device_id)
if not device:
return False
# Remove from station listeners
if device.current_station:
station_id = device.current_station
if station_id in self.station_listeners:
self.station_listeners[station_id].discard(device_id)
# Check if station should be deactivated
if len(self.station_listeners[station_id]) == 0:
station = self.active_stations.get(station_id)
if station:
station.is_active = False
# Update device info
device.current_station = None
device.playback_position = 0.0
return True
async def sync_playback_event(self, event: SyncEvent) -> None:
"""Synchronize playback event across all devices in station"""
try:
station_id = event.station_id
# Get station listeners
listeners = self.station_listeners.get(station_id, set())
# Broadcast event to all listeners (except sender)
for device_id in listeners:
if device_id != event.device_id:
await self._send_event_to_device(device_id, event)
# Update station state if needed
if event.event_type == 'track_change':
await self._handle_track_change(station_id, event.data)
elif event.event_type == 'play':
await self._handle_play_event(station_id, event)
elif event.event_type == 'pause':
await self._handle_pause_event(station_id, event)
except Exception as e:
logger.error(f"Failed to sync playback event: {e}")
async def _send_event_to_device(self, device_id: str, event: SyncEvent) -> None:
"""Send event to specific device"""
try:
device = self.connected_devices.get(device_id)
if not device or not device.websocket:
return
message = {
'type': 'sync_event',
'event': asdict(event)
}
await device.websocket.send(json.dumps(message))
except Exception as e:
logger.error(f"Failed to send event to device {device_id}: {e}")
async def _send_current_track(self, device_id: str, station: RadioStation) -> None:
"""Send current track information to device"""
try:
device = self.connected_devices.get(device_id)
if not device or not device.websocket:
return
if station.current_track_index < len(station.tracks):
current_track = station.tracks[station.current_track_index]
message = {
'type': 'current_track',
'track': current_track,
'station': asdict(station),
'position': device.playback_position
}
await device.websocket.send(json.dumps(message))
except Exception as e:
logger.error(f"Failed to send current track to device {device_id}: {e}")
async def _handle_websocket_connection(self, websocket: WebSocketServerProtocol, path: str):
"""Handle new WebSocket connection"""
try:
# Wait for authentication message
auth_message = await websocket.recv()
auth_data = json.loads(auth_message)
user_id = auth_data['user_id']
device_id = auth_data['device_id']
device_type = auth_data.get('device_type', 'unknown')
# Check device limit
user_devices = [d for d in self.connected_devices.values() if d.user_id == user_id]
if len(user_devices) >= self.max_devices_per_user:
await websocket.send(json.dumps({
'type': 'error',
'message': 'Device limit exceeded'
}))
await websocket.close()
return
# Register device
device = DeviceInfo(
device_id=device_id,
user_id=user_id,
device_type=device_type,
last_seen=datetime.utcnow(),
websocket=websocket
)
self.connected_devices[device_id] = device
# Send confirmation
await websocket.send(json.dumps({
'type': 'connected',
'device_id': device_id
}))
logger.info(f"Device {device_id} connected for user {user_id}")
# Handle messages
async for message in websocket:
try:
data = json.loads(message)
await self._handle_device_message(device_id, data)
except Exception as e:
logger.error(f"Error handling message from {device_id}: {e}")
except Exception as e:
logger.error(f"WebSocket connection error: {e}")
finally:
# Cleanup on disconnect
if 'device_id' in locals():
await self._cleanup_device(device_id)
async def _handle_device_message(self, device_id: str, data: Dict) -> None:
"""Handle message from device"""
try:
device = self.connected_devices.get(device_id)
if not device:
return
message_type = data.get('type')
if message_type == 'join_station':
station_id = data['station_id']
await self.join_station(device.user_id, device_id, station_id)
elif message_type == 'leave_station':
await self.leave_station(device_id)
elif message_type == 'playback_event':
event = SyncEvent(
event_type=data['event_type'],
station_id=data['station_id'],
user_id=device.user_id,
timestamp=datetime.utcnow(),
data=data['data'],
device_id=device_id
)
await self.sync_playback_event(event)
elif message_type == 'heartbeat':
device.last_seen = datetime.utcnow()
except Exception as e:
logger.error(f"Error handling device message: {e}")
async def _handle_track_change(self, station_id: str, data: Dict) -> None:
"""Handle track change event"""
try:
station = self.active_stations.get(station_id)
if not station:
return
new_track_index = data.get('track_index', 0)
# Validate track index
if 0 <= new_track_index < len(station.tracks):
station.current_track_index = new_track_index
station.play_count += 1
station.last_played = datetime.utcnow()
except Exception as e:
logger.error(f"Error handling track change: {e}")
async def _handle_play_event(self, station_id: str, event: SyncEvent) -> None:
"""Handle play event"""
try:
# Update playback position for all devices in station
listeners = self.station_listeners.get(station_id, set())
for device_id in listeners:
device = self.connected_devices.get(device_id)
if device and device_id != event.device_id:
device.playback_position = event.data.get('position', 0.0)
except Exception as e:
logger.error(f"Error handling play event: {e}")
async def _handle_pause_event(self, station_id: str, event: SyncEvent) -> None:
"""Handle pause event"""
try:
# Update playback position for all devices in station
listeners = self.station_listeners.get(station_id, set())
for device_id in listeners:
device = self.connected_devices.get(device_id)
if device and device_id != event.device_id:
device.playback_position = event.data.get('position', 0.0)
except Exception as e:
logger.error(f"Error handling pause event: {e}")
async def _cleanup_device(self, device_id: str) -> None:
"""Cleanup disconnected device"""
try:
device = self.connected_devices.get(device_id)
if not device:
return
# Leave current station
await self._leave_station(device_id)
# Remove from connected devices
del self.connected_devices[device_id]
logger.info(f"Device {device_id} disconnected")
except Exception as e:
logger.error(f"Error cleaning up device: {e}")
async def _cleanup_loop(self) -> None:
"""Background cleanup loop"""
while True:
try:
await asyncio.sleep(60) # Run every minute
current_time = datetime.utcnow()
devices_to_remove = []
# Check for inactive devices
for device_id, device in self.connected_devices.items():
if (current_time - device.last_seen).total_seconds() > self.sync_timeout:
devices_to_remove.append(device_id)
# Remove inactive devices
for device_id in devices_to_remove:
await self._cleanup_device(device_id)
# Cleanup old stations
stations_to_remove = []
for station_id, station in self.active_stations.items():
if (current_time - station.created_at).total_seconds() > self.station_cache_ttl:
if not station.is_active and len(self.station_listeners.get(station_id, set())) == 0:
stations_to_remove.append(station_id)
for station_id in stations_to_remove:
await self._cleanup_station(station_id)
except Exception as e:
logger.error(f"Error in cleanup loop: {e}")
async def _cleanup_station(self, station_id: str) -> None:
"""Cleanup old station"""
try:
station = self.active_stations.get(station_id)
if not station:
return
# Remove from user stations
user_stations = self.user_stations.get(station.user_id, [])
if station_id in user_stations:
user_stations.remove(station_id)
# Remove from active stations
del self.active_stations[station_id]
# Remove listeners
if station_id in self.station_listeners:
del self.station_listeners[station_id]
logger.info(f"Cleaned up station {station_id}")
except Exception as e:
logger.error(f"Error cleaning up station: {e}")
async def _sync_loop(self) -> None:
"""Background synchronization loop"""
while True:
try:
await asyncio.sleep(5) # Run every 5 seconds
# Sync playback positions for active stations
for station_id, station in self.active_stations.items():
if station.is_active:
await self._sync_station_playback(station_id)
except Exception as e:
logger.error(f"Error in sync loop: {e}")
async def _sync_station_playback(self, station_id: str) -> None:
"""Synchronize playback for a station"""
try:
listeners = self.station_listeners.get(station_id, set())
if len(listeners) <= 1:
return # No sync needed for single listener
# Calculate average playback position
positions = []
for device_id in listeners:
device = self.connected_devices.get(device_id)
if device:
positions.append(device.playback_position)
if positions:
avg_position = sum(positions) / len(positions)
# Sync devices that are significantly out of sync
sync_threshold = 2.0 # 2 seconds
for device_id in listeners:
device = self.connected_devices.get(device_id)
if device and abs(device.playback_position - avg_position) > sync_threshold:
# Send sync command
sync_event = SyncEvent(
event_type='sync_position',
station_id=station_id,
user_id=device.user_id,
timestamp=datetime.utcnow(),
data={'position': avg_position},
device_id='server'
)
await self._send_event_to_device(device_id, sync_event)
except Exception as e:
logger.error(f"Error syncing station playback: {e}")
async def get_station_status(self, station_id: str) -> Optional[Dict]:
"""Get station status information"""
try:
station = self.active_stations.get(station_id)
if not station:
return None
listeners = self.station_listeners.get(station_id, set())
listener_info = []
for device_id in listeners:
device = self.connected_devices.get(device_id)
if device:
listener_info.append({
'device_id': device_id,
'device_type': device.device_type,
'playback_position': device.playback_position,
'last_seen': device.last_seen.isoformat()
})
return {
'station': asdict(station),
'listeners': listener_info,
'listener_count': len(listeners),
'is_active': station.is_active
}
except Exception as e:
logger.error(f"Error getting station status: {e}")
return None
async def broadcast_to_user_devices(self, user_id: str, message: Dict) -> None:
"""Broadcast message to all devices for a user"""
try:
for device_id, device in self.connected_devices.items():
if device.user_id == user_id and device.websocket:
await device.websocket.send(json.dumps(message))
except Exception as e:
logger.error(f"Error broadcasting to user devices: {e}")
# Client-side helper for connecting to radio sync
class RadioSyncClient:
"""Client-side radio sync connection"""
def __init__(self, user_id: str, device_id: str, device_type: str = 'web'):
self.user_id = user_id
self.device_id = device_id
self.device_type = device_type
self.websocket = None
self.event_callbacks = {}
self.is_connected = False
async def connect(self, server_url: str = "ws://localhost:8765") -> bool:
"""Connect to radio sync server"""
try:
self.websocket = await websockets.connect(server_url)
# Send authentication
auth_message = {
'user_id': self.user_id,
'device_id': self.device_id,
'device_type': self.device_type
}
await self.websocket.send(json.dumps(auth_message))
# Wait for confirmation
response = await self.websocket.recv()
response_data = json.loads(response)
if response_data.get('type') == 'connected':
self.is_connected = True
# Start message handler
asyncio.create_task(self._message_handler())
logger.info(f"Connected to radio sync server as {self.device_id}")
return True
else:
logger.error(f"Connection failed: {response_data}")
return False
except Exception as e:
logger.error(f"Failed to connect to radio sync server: {e}")
return False
async def disconnect(self) -> None:
"""Disconnect from radio sync server"""
if self.websocket:
await self.websocket.close()
self.websocket = None
self.is_connected = False
logger.info("Disconnected from radio sync server")
async def join_station(self, station_id: str) -> bool:
"""Join a radio station"""
if not self.is_connected:
return False
try:
message = {
'type': 'join_station',
'station_id': station_id
}
await self.websocket.send(json.dumps(message))
return True
except Exception as e:
logger.error(f"Failed to join station: {e}")
return False
async def leave_station(self) -> bool:
"""Leave current radio station"""
if not self.is_connected:
return False
try:
message = {
'type': 'leave_station'
}
await self.websocket.send(json.dumps(message))
return True
except Exception as e:
logger.error(f"Failed to leave station: {e}")
return False
async def send_playback_event(self, station_id: str, event_type: str, data: Dict) -> bool:
"""Send playback event"""
if not self.is_connected:
return False
try:
message = {
'type': 'playback_event',
'station_id': station_id,
'event_type': event_type,
'data': data
}
await self.websocket.send(json.dumps(message))
return True
except Exception as e:
logger.error(f"Failed to send playback event: {e}")
return False
def add_event_callback(self, event_type: str, callback: callable) -> None:
"""Add callback for specific event types"""
if event_type not in self.event_callbacks:
self.event_callbacks[event_type] = []
self.event_callbacks[event_type].append(callback)
async def _message_handler(self) -> None:
"""Handle incoming messages"""
try:
async for message in self.websocket:
data = json.loads(message)
message_type = data.get('type')
if message_type == 'sync_event':
event = data['event']
event_type = event['event_type']
# Call callbacks
if event_type in self.event_callbacks:
for callback in self.event_callbacks[event_type]:
await callback(event)
elif message_type == 'current_track':
# Handle current track update
if 'current_track' in self.event_callbacks:
for callback in self.event_callbacks['current_track']:
await callback(data)
elif message_type == 'error':
logger.error(f"Server error: {data['message']}")
except Exception as e:
logger.error(f"Error in message handler: {e}")
self.is_connected = False
async def send_heartbeat(self) -> None:
"""Send heartbeat to maintain connection"""
if self.is_connected:
try:
await self.websocket.send(json.dumps({'type': 'heartbeat'}))
except Exception as e:
logger.error(f"Failed to send heartbeat: {e}")
self.is_connected = False