# swingmusic/services/radio_sync.py import asyncio import json import logging from typing import Dict, List, Optional, Set from dataclasses import dataclass, asdict from datetime import datetime, timedelta import websockets from websockets.server import WebSocketServerProtocol import aiohttp import aiofiles from pathlib import Path logger = logging.getLogger(__name__) @dataclass class RadioStation: """Radio station information""" id: str name: str type: str # 'artist_radio', 'genre_radio', 'mood_radio', 'custom_radio' user_id: str current_track_index: int tracks: List[Dict] created_at: datetime last_played: Optional[datetime] = None play_count: int = 0 is_active: bool = False @dataclass class SyncEvent: """Synchronization event""" event_type: str # 'play', 'pause', 'skip', 'seek', 'track_change' station_id: str user_id: str timestamp: datetime data: Dict device_id: str @dataclass class DeviceInfo: """Connected device information""" device_id: str user_id: str device_type: str # 'web', 'mobile', 'desktop' last_seen: datetime websocket: Optional[WebSocketServerProtocol] = None current_station: Optional[str] = None playback_position: float = 0.0 class CrossPlatformRadioSync: """Cross-platform radio synchronization service""" def __init__(self): self.active_stations: Dict[str, RadioStation] = {} self.connected_devices: Dict[str, DeviceInfo] = {} self.station_listeners: Dict[str, Set[str]] = {} # station_id -> device_ids self.user_stations: Dict[str, List[str]] = {} # user_id -> station_ids # WebSocket server self.websocket_server = None self.server_port = 8765 # Configuration self.sync_timeout = 30 # seconds self.max_devices_per_user = 5 self.station_cache_ttl = 3600 # 1 hour # Background tasks self.cleanup_task = None self.sync_task = None async def start_sync_service(self): """Start the radio synchronization service""" try: # Start WebSocket server self.websocket_server = await websockets.serve( self._handle_websocket_connection, "localhost", self.server_port ) # Start background tasks self.cleanup_task = asyncio.create_task(self._cleanup_loop()) self.sync_task = asyncio.create_task(self._sync_loop()) logger.info(f"Radio sync service started on port {self.server_port}") except Exception as e: logger.error(f"Failed to start sync service: {e}") raise async def stop_sync_service(self): """Stop the radio synchronization service""" try: # Cancel background tasks if self.cleanup_task: self.cleanup_task.cancel() if self.sync_task: self.sync_task.cancel() # Close WebSocket server if self.websocket_server: self.websocket_server.close() await self.websocket_server.wait_closed() # Disconnect all devices for device in self.connected_devices.values(): if device.websocket: await device.websocket.close() logger.info("Radio sync service stopped") except Exception as e: logger.error(f"Error stopping sync service: {e}") async def create_radio_station(self, user_id: str, station_data: Dict) -> RadioStation: """Create a new radio station""" try: station_id = f"{station_data['type']}_{user_id}_{datetime.now().timestamp()}" station = RadioStation( id=station_id, name=station_data['name'], type=station_data['type'], user_id=user_id, current_track_index=0, tracks=station_data['tracks'], created_at=datetime.utcnow() ) # Store station self.active_stations[station_id] = station # Update user stations if user_id not in self.user_stations: self.user_stations[user_id] = [] self.user_stations[user_id].append(station_id) # Initialize listeners set self.station_listeners[station_id] = set() logger.info(f"Created radio station: {station_id}") return station except Exception as e: logger.error(f"Failed to create radio station: {e}") raise async def get_user_stations(self, user_id: str) -> List[RadioStation]: """Get all radio stations for a user""" try: station_ids = self.user_stations.get(user_id, []) stations = [] for station_id in station_ids: if station_id in self.active_stations: stations.append(self.active_stations[station_id]) return stations except Exception as e: logger.error(f"Failed to get user stations: {e}") return [] async def join_station(self, user_id: str, device_id: str, station_id: str) -> bool: """Join a radio station""" try: # Check if station exists if station_id not in self.active_stations: logger.warning(f"Station not found: {station_id}") return False # Check if device is connected if device_id not in self.connected_devices: logger.warning(f"Device not connected: {device_id}") return False # Remove from previous station if any await self._leave_station(device_id) # Add to station listeners self.station_listeners[station_id].add(device_id) # Update device info device = self.connected_devices[device_id] device.current_station = station_id device.playback_position = 0.0 # Mark station as active station = self.active_stations[station_id] station.is_active = True # Send current track to device await self._send_current_track(device_id, station) logger.info(f"Device {device_id} joined station {station_id}") return True except Exception as e: logger.error(f"Failed to join station: {e}") return False async def leave_station(self, device_id: str) -> bool: """Leave current radio station""" try: return await self._leave_station(device_id) except Exception as e: logger.error(f"Failed to leave station: {e}") return False async def _leave_station(self, device_id: str) -> bool: """Internal leave station method""" device = self.connected_devices.get(device_id) if not device: return False # Remove from station listeners if device.current_station: station_id = device.current_station if station_id in self.station_listeners: self.station_listeners[station_id].discard(device_id) # Check if station should be deactivated if len(self.station_listeners[station_id]) == 0: station = self.active_stations.get(station_id) if station: station.is_active = False # Update device info device.current_station = None device.playback_position = 0.0 return True async def sync_playback_event(self, event: SyncEvent) -> None: """Synchronize playback event across all devices in station""" try: station_id = event.station_id # Get station listeners listeners = self.station_listeners.get(station_id, set()) # Broadcast event to all listeners (except sender) for device_id in listeners: if device_id != event.device_id: await self._send_event_to_device(device_id, event) # Update station state if needed if event.event_type == 'track_change': await self._handle_track_change(station_id, event.data) elif event.event_type == 'play': await self._handle_play_event(station_id, event) elif event.event_type == 'pause': await self._handle_pause_event(station_id, event) except Exception as e: logger.error(f"Failed to sync playback event: {e}") async def _send_event_to_device(self, device_id: str, event: SyncEvent) -> None: """Send event to specific device""" try: device = self.connected_devices.get(device_id) if not device or not device.websocket: return message = { 'type': 'sync_event', 'event': asdict(event) } await device.websocket.send(json.dumps(message)) except Exception as e: logger.error(f"Failed to send event to device {device_id}: {e}") async def _send_current_track(self, device_id: str, station: RadioStation) -> None: """Send current track information to device""" try: device = self.connected_devices.get(device_id) if not device or not device.websocket: return if station.current_track_index < len(station.tracks): current_track = station.tracks[station.current_track_index] message = { 'type': 'current_track', 'track': current_track, 'station': asdict(station), 'position': device.playback_position } await device.websocket.send(json.dumps(message)) except Exception as e: logger.error(f"Failed to send current track to device {device_id}: {e}") async def _handle_websocket_connection(self, websocket: WebSocketServerProtocol, path: str): """Handle new WebSocket connection""" try: # Wait for authentication message auth_message = await websocket.recv() auth_data = json.loads(auth_message) user_id = auth_data['user_id'] device_id = auth_data['device_id'] device_type = auth_data.get('device_type', 'unknown') # Check device limit user_devices = [d for d in self.connected_devices.values() if d.user_id == user_id] if len(user_devices) >= self.max_devices_per_user: await websocket.send(json.dumps({ 'type': 'error', 'message': 'Device limit exceeded' })) await websocket.close() return # Register device device = DeviceInfo( device_id=device_id, user_id=user_id, device_type=device_type, last_seen=datetime.utcnow(), websocket=websocket ) self.connected_devices[device_id] = device # Send confirmation await websocket.send(json.dumps({ 'type': 'connected', 'device_id': device_id })) logger.info(f"Device {device_id} connected for user {user_id}") # Handle messages async for message in websocket: try: data = json.loads(message) await self._handle_device_message(device_id, data) except Exception as e: logger.error(f"Error handling message from {device_id}: {e}") except Exception as e: logger.error(f"WebSocket connection error: {e}") finally: # Cleanup on disconnect if 'device_id' in locals(): await self._cleanup_device(device_id) async def _handle_device_message(self, device_id: str, data: Dict) -> None: """Handle message from device""" try: device = self.connected_devices.get(device_id) if not device: return message_type = data.get('type') if message_type == 'join_station': station_id = data['station_id'] await self.join_station(device.user_id, device_id, station_id) elif message_type == 'leave_station': await self.leave_station(device_id) elif message_type == 'playback_event': event = SyncEvent( event_type=data['event_type'], station_id=data['station_id'], user_id=device.user_id, timestamp=datetime.utcnow(), data=data['data'], device_id=device_id ) await self.sync_playback_event(event) elif message_type == 'heartbeat': device.last_seen = datetime.utcnow() except Exception as e: logger.error(f"Error handling device message: {e}") async def _handle_track_change(self, station_id: str, data: Dict) -> None: """Handle track change event""" try: station = self.active_stations.get(station_id) if not station: return new_track_index = data.get('track_index', 0) # Validate track index if 0 <= new_track_index < len(station.tracks): station.current_track_index = new_track_index station.play_count += 1 station.last_played = datetime.utcnow() except Exception as e: logger.error(f"Error handling track change: {e}") async def _handle_play_event(self, station_id: str, event: SyncEvent) -> None: """Handle play event""" try: # Update playback position for all devices in station listeners = self.station_listeners.get(station_id, set()) for device_id in listeners: device = self.connected_devices.get(device_id) if device and device_id != event.device_id: device.playback_position = event.data.get('position', 0.0) except Exception as e: logger.error(f"Error handling play event: {e}") async def _handle_pause_event(self, station_id: str, event: SyncEvent) -> None: """Handle pause event""" try: # Update playback position for all devices in station listeners = self.station_listeners.get(station_id, set()) for device_id in listeners: device = self.connected_devices.get(device_id) if device and device_id != event.device_id: device.playback_position = event.data.get('position', 0.0) except Exception as e: logger.error(f"Error handling pause event: {e}") async def _cleanup_device(self, device_id: str) -> None: """Cleanup disconnected device""" try: device = self.connected_devices.get(device_id) if not device: return # Leave current station await self._leave_station(device_id) # Remove from connected devices del self.connected_devices[device_id] logger.info(f"Device {device_id} disconnected") except Exception as e: logger.error(f"Error cleaning up device: {e}") async def _cleanup_loop(self) -> None: """Background cleanup loop""" while True: try: await asyncio.sleep(60) # Run every minute current_time = datetime.utcnow() devices_to_remove = [] # Check for inactive devices for device_id, device in self.connected_devices.items(): if (current_time - device.last_seen).total_seconds() > self.sync_timeout: devices_to_remove.append(device_id) # Remove inactive devices for device_id in devices_to_remove: await self._cleanup_device(device_id) # Cleanup old stations stations_to_remove = [] for station_id, station in self.active_stations.items(): if (current_time - station.created_at).total_seconds() > self.station_cache_ttl: if not station.is_active and len(self.station_listeners.get(station_id, set())) == 0: stations_to_remove.append(station_id) for station_id in stations_to_remove: await self._cleanup_station(station_id) except Exception as e: logger.error(f"Error in cleanup loop: {e}") async def _cleanup_station(self, station_id: str) -> None: """Cleanup old station""" try: station = self.active_stations.get(station_id) if not station: return # Remove from user stations user_stations = self.user_stations.get(station.user_id, []) if station_id in user_stations: user_stations.remove(station_id) # Remove from active stations del self.active_stations[station_id] # Remove listeners if station_id in self.station_listeners: del self.station_listeners[station_id] logger.info(f"Cleaned up station {station_id}") except Exception as e: logger.error(f"Error cleaning up station: {e}") async def _sync_loop(self) -> None: """Background synchronization loop""" while True: try: await asyncio.sleep(5) # Run every 5 seconds # Sync playback positions for active stations for station_id, station in self.active_stations.items(): if station.is_active: await self._sync_station_playback(station_id) except Exception as e: logger.error(f"Error in sync loop: {e}") async def _sync_station_playback(self, station_id: str) -> None: """Synchronize playback for a station""" try: listeners = self.station_listeners.get(station_id, set()) if len(listeners) <= 1: return # No sync needed for single listener # Calculate average playback position positions = [] for device_id in listeners: device = self.connected_devices.get(device_id) if device: positions.append(device.playback_position) if positions: avg_position = sum(positions) / len(positions) # Sync devices that are significantly out of sync sync_threshold = 2.0 # 2 seconds for device_id in listeners: device = self.connected_devices.get(device_id) if device and abs(device.playback_position - avg_position) > sync_threshold: # Send sync command sync_event = SyncEvent( event_type='sync_position', station_id=station_id, user_id=device.user_id, timestamp=datetime.utcnow(), data={'position': avg_position}, device_id='server' ) await self._send_event_to_device(device_id, sync_event) except Exception as e: logger.error(f"Error syncing station playback: {e}") async def get_station_status(self, station_id: str) -> Optional[Dict]: """Get station status information""" try: station = self.active_stations.get(station_id) if not station: return None listeners = self.station_listeners.get(station_id, set()) listener_info = [] for device_id in listeners: device = self.connected_devices.get(device_id) if device: listener_info.append({ 'device_id': device_id, 'device_type': device.device_type, 'playback_position': device.playback_position, 'last_seen': device.last_seen.isoformat() }) return { 'station': asdict(station), 'listeners': listener_info, 'listener_count': len(listeners), 'is_active': station.is_active } except Exception as e: logger.error(f"Error getting station status: {e}") return None async def broadcast_to_user_devices(self, user_id: str, message: Dict) -> None: """Broadcast message to all devices for a user""" try: for device_id, device in self.connected_devices.items(): if device.user_id == user_id and device.websocket: await device.websocket.send(json.dumps(message)) except Exception as e: logger.error(f"Error broadcasting to user devices: {e}") # Client-side helper for connecting to radio sync class RadioSyncClient: """Client-side radio sync connection""" def __init__(self, user_id: str, device_id: str, device_type: str = 'web'): self.user_id = user_id self.device_id = device_id self.device_type = device_type self.websocket = None self.event_callbacks = {} self.is_connected = False async def connect(self, server_url: str = "ws://localhost:8765") -> bool: """Connect to radio sync server""" try: self.websocket = await websockets.connect(server_url) # Send authentication auth_message = { 'user_id': self.user_id, 'device_id': self.device_id, 'device_type': self.device_type } await self.websocket.send(json.dumps(auth_message)) # Wait for confirmation response = await self.websocket.recv() response_data = json.loads(response) if response_data.get('type') == 'connected': self.is_connected = True # Start message handler asyncio.create_task(self._message_handler()) logger.info(f"Connected to radio sync server as {self.device_id}") return True else: logger.error(f"Connection failed: {response_data}") return False except Exception as e: logger.error(f"Failed to connect to radio sync server: {e}") return False async def disconnect(self) -> None: """Disconnect from radio sync server""" if self.websocket: await self.websocket.close() self.websocket = None self.is_connected = False logger.info("Disconnected from radio sync server") async def join_station(self, station_id: str) -> bool: """Join a radio station""" if not self.is_connected: return False try: message = { 'type': 'join_station', 'station_id': station_id } await self.websocket.send(json.dumps(message)) return True except Exception as e: logger.error(f"Failed to join station: {e}") return False async def leave_station(self) -> bool: """Leave current radio station""" if not self.is_connected: return False try: message = { 'type': 'leave_station' } await self.websocket.send(json.dumps(message)) return True except Exception as e: logger.error(f"Failed to leave station: {e}") return False async def send_playback_event(self, station_id: str, event_type: str, data: Dict) -> bool: """Send playback event""" if not self.is_connected: return False try: message = { 'type': 'playback_event', 'station_id': station_id, 'event_type': event_type, 'data': data } await self.websocket.send(json.dumps(message)) return True except Exception as e: logger.error(f"Failed to send playback event: {e}") return False def add_event_callback(self, event_type: str, callback: callable) -> None: """Add callback for specific event types""" if event_type not in self.event_callbacks: self.event_callbacks[event_type] = [] self.event_callbacks[event_type].append(callback) async def _message_handler(self) -> None: """Handle incoming messages""" try: async for message in self.websocket: data = json.loads(message) message_type = data.get('type') if message_type == 'sync_event': event = data['event'] event_type = event['event_type'] # Call callbacks if event_type in self.event_callbacks: for callback in self.event_callbacks[event_type]: await callback(event) elif message_type == 'current_track': # Handle current track update if 'current_track' in self.event_callbacks: for callback in self.event_callbacks['current_track']: await callback(data) elif message_type == 'error': logger.error(f"Server error: {data['message']}") except Exception as e: logger.error(f"Error in message handler: {e}") self.is_connected = False async def send_heartbeat(self) -> None: """Send heartbeat to maintain connection""" if self.is_connected: try: await self.websocket.send(json.dumps({'type': 'heartbeat'})) except Exception as e: logger.error(f"Failed to send heartbeat: {e}") self.is_connected = False