""" Mobile Offline Mode Service This service provides comprehensive mobile offline functionality including: - Mobile download manager with intelligent queuing - Offline sync service with conflict resolution - Offline playback with adaptive streaming - Storage management and optimization - Background sync and progress tracking """ import asyncio import datetime import json import logging import os from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass, asdict from enum import Enum from pathlib import Path import hashlib from sqlalchemy import select, update, delete, and_, or_, func from sqlalchemy.orm import Session from swingmusic.db import db from swingmusic.models.user import User from swingmusic.models.track import Track from swingmusic.models.playlist import Playlist from swingmusic.services.universal_music_downloader import UniversalMusicDownloader from swingmusic.services.audio_quality_manager import audio_quality_manager from swingmusic.utils.storage_manager import StorageManager from swingmusic.utils.background_sync import BackgroundSyncManager from swingmusic.config import USER_DATA_DIR logger = logging.getLogger(__name__) class SyncStatus(Enum): """Sync status for mobile devices""" NOT_SYNCED = "not_synced" SYNCING = "syncing" SYNCED = "synced" SYNC_ERROR = "sync_error" CONFLICT = "conflict" class OfflineQuality(Enum): """Offline download quality presets""" SPACE_SAVER = "space_saver" # Low quality, maximum storage efficiency BALANCED = "balanced" # Medium quality, good balance HIGH_QUALITY = "high_quality" # High quality, more storage usage LOSSLESS = "lossless" # Lossless quality, maximum storage usage @dataclass class MobileDevice: """Represents a mobile device registered for offline sync""" device_id: str user_id: int device_name: str device_type: str # android, ios storage_capacity: int # in bytes available_storage: int # in bytes last_sync: Optional[datetime.datetime] sync_status: SyncStatus offline_quality: OfflineQuality auto_sync_enabled: bool sync_preferences: Dict[str, Any] created_at: datetime.datetime updated_at: datetime.datetime @dataclass class OfflineTrack: """Track available for offline playback""" track_id: str device_id: str user_id: int local_path: str file_size: int quality: str download_date: datetime.datetime last_played: Optional[datetime.datetime] play_count: int sync_version: int checksum: str is_available: bool @dataclass class SyncQueue: """Item in the sync queue for mobile devices""" queue_id: str device_id: str track_id: str user_id: int priority: int # 1=highest, 5=lowest quality: str status: str # pending, downloading, completed, failed progress: float # 0-100 error_message: Optional[str] added_at: datetime.datetime started_at: Optional[datetime.datetime] completed_at: Optional[datetime.datetime] @dataclass class StorageUsage: """Storage usage information""" total_capacity: int used_space: int available_space: int offline_tracks_count: int offline_tracks_size: int other_data_size: int quality_breakdown: Dict[str, int] class MobileOfflineService: """Service for managing mobile offline functionality""" def __init__(self): self.storage_manager = StorageManager() self.background_sync = BackgroundSyncManager() self.universal_downloader = UniversalMusicDownloader() self.mobile_data_dir = USER_DATA_DIR / "mobile" self.mobile_data_dir.mkdir(exist_ok=True) # Device settings self.max_concurrent_downloads = 3 self.default_offline_quality = OfflineQuality.BALANCED self.auto_cleanup_threshold = 0.9 # 90% storage usage triggers cleanup async def register_device(self, user_id: int, device_info: Dict[str, Any]) -> MobileDevice: """ Register a new mobile device for offline sync Args: user_id: User ID device_info: Device information including name, type, storage Returns: Registered device information """ try: device_id = self._generate_device_id(user_id, device_info) device = MobileDevice( device_id=device_id, user_id=user_id, device_name=device_info.get('name', 'Unknown Device'), device_type=device_info.get('type', 'unknown'), storage_capacity=device_info.get('storage_capacity', 0), available_storage=device_info.get('available_storage', 0), last_sync=None, sync_status=SyncStatus.NOT_SYNCED, offline_quality=self.default_offline_quality, auto_sync_enabled=True, sync_preferences=device_info.get('preferences', {}), created_at=datetime.datetime.utcnow(), updated_at=datetime.datetime.utcnow() ) # Save device to database await self._save_device(device) # Initialize device storage await self._initialize_device_storage(device) logger.info(f"Registered mobile device {device_id} for user {user_id}") return device except Exception as e: logger.error(f"Error registering mobile device: {e}") raise async def add_to_offline_library(self, user_id: int, device_id: str, track_ids: List[str], quality: Optional[OfflineQuality] = None) -> List[SyncQueue]: """ Add tracks to offline library for mobile device Args: user_id: User ID device_id: Device ID track_ids: List of track IDs to download quality: Download quality (uses device default if None) Returns: List of sync queue items """ try: # Get device information device = await self._get_device(device_id, user_id) if not device: raise ValueError(f"Device {device_id} not found for user {user_id}") # Use device quality if not specified if quality is None: quality = device.offline_quality # Check storage availability storage_usage = await self._get_storage_usage(device_id) required_space = await self._estimate_download_size(track_ids, quality) if storage_usage.available_space < required_space: # Try to cleanup space freed_space = await self._cleanup_old_content(device_id, required_space) if freed_space < required_space: raise ValueError(f"Insufficient storage space. Need {required_space} bytes, only {storage_usage.available_space} available") # Add tracks to sync queue queue_items = [] for track_id in track_ids: # Check if already downloaded existing = await self._get_offline_track(device_id, track_id) if existing and existing.is_available: continue # Create queue item queue_item = SyncQueue( queue_id=self._generate_queue_id(), device_id=device_id, track_id=track_id, user_id=user_id, priority=self._calculate_download_priority(track_id, user_id), quality=quality.value, status='pending', progress=0.0, error_message=None, added_at=datetime.datetime.utcnow(), started_at=None, completed_at=None ) await self._add_to_sync_queue(queue_item) queue_items.append(queue_item) # Start background processing if not already running await self._start_background_sync(device_id) logger.info(f"Added {len(queue_items)} tracks to offline library for device {device_id}") return queue_items except Exception as e: logger.error(f"Error adding tracks to offline library: {e}") raise async def sync_playlist_offline(self, user_id: int, device_id: str, playlist_id: str, quality: Optional[OfflineQuality] = None) -> List[SyncQueue]: """ Sync entire playlist for offline playback Args: user_id: User ID device_id: Device ID playlist_id: Playlist ID to sync quality: Download quality Returns: List of sync queue items """ try: # Get playlist tracks playlist_tracks = await self._get_playlist_tracks(user_id, playlist_id) track_ids = [track['id'] for track in playlist_tracks] # Add to offline library return await self.add_to_offline_library(user_id, device_id, track_ids, quality) except Exception as e: logger.error(f"Error syncing playlist offline: {e}") raise async def get_offline_library(self, user_id: int, device_id: str) -> Dict[str, Any]: """ Get offline library for mobile device Args: user_id: User ID device_id: Device ID Returns: Offline library information """ try: # Get device information device = await self._get_device(device_id, user_id) if not device: raise ValueError(f"Device {device_id} not found for user {user_id}") # Get offline tracks offline_tracks = await self._get_offline_tracks(device_id) # Get sync queue status queue_status = await self._get_sync_queue_status(device_id) # Get storage usage storage_usage = await self._get_storage_usage(device_id) return { 'device': asdict(device), 'offline_tracks': [asdict(track) for track in offline_tracks], 'sync_queue': { 'pending_count': queue_status['pending'], 'downloading_count': queue_status['downloading'], 'completed_count': queue_status['completed'], 'failed_count': queue_status['failed'], 'total_count': queue_status['total'] }, 'storage_usage': asdict(storage_usage), 'last_sync': device.last_sync, 'sync_status': device.sync_status.value } except Exception as e: logger.error(f"Error getting offline library: {e}") raise async def remove_from_offline_library(self, user_id: int, device_id: str, track_ids: List[str]) -> bool: """ Remove tracks from offline library Args: user_id: User ID device_id: Device ID track_ids: List of track IDs to remove Returns: Success status """ try: removed_count = 0 for track_id in track_ids: # Get offline track offline_track = await self._get_offline_track(device_id, track_id) if not offline_track: continue # Remove local file if os.path.exists(offline_track.local_path): os.remove(offline_track.local_path) # Remove from database await self._remove_offline_track(device_id, track_id) removed_count += 1 # Update storage usage await self._update_storage_usage(device_id) logger.info(f"Removed {removed_count} tracks from offline library for device {device_id}") return True except Exception as e: logger.error(f"Error removing tracks from offline library: {e}") return False async def update_device_settings(self, user_id: int, device_id: str, settings: Dict[str, Any]) -> bool: """ Update device settings and preferences Args: user_id: User ID device_id: Device ID settings: Settings to update Returns: Success status """ try: device = await self._get_device(device_id, user_id) if not device: return False # Update settings if 'offline_quality' in settings: device.offline_quality = OfflineQuality(settings['offline_quality']) if 'auto_sync_enabled' in settings: device.auto_sync_enabled = settings['auto_sync_enabled'] if 'sync_preferences' in settings: device.sync_preferences.update(settings['sync_preferences']) if 'storage_capacity' in settings: device.storage_capacity = settings['storage_capacity'] if 'available_storage' in settings: device.available_storage = settings['available_storage'] device.updated_at = datetime.datetime.utcnow() # Save updated device await self._save_device(device) logger.info(f"Updated settings for device {device_id}") return True except Exception as e: logger.error(f"Error updating device settings: {e}") return False async def get_sync_progress(self, user_id: int, device_id: str) -> Dict[str, Any]: """ Get sync progress for mobile device Args: user_id: User ID device_id: Device ID Returns: Sync progress information """ try: # Get queue items queue_items = await self._get_sync_queue_items(device_id) # Calculate progress total_items = len(queue_items) completed_items = len([item for item in queue_items if item.status == 'completed']) downloading_items = len([item for item in queue_items if item.status == 'downloading']) failed_items = len([item for item in queue_items if item.status == 'failed']) # Calculate overall progress overall_progress = 0.0 if total_items > 0: total_progress = sum(item.progress for item in queue_items) overall_progress = total_progress / total_items # Get currently downloading items current_downloads = [item for item in queue_items if item.status == 'downloading'] return { 'total_items': total_items, 'completed_items': completed_items, 'downloading_items': downloading_items, 'failed_items': failed_items, 'overall_progress': round(overall_progress, 2), 'current_downloads': [ { 'track_id': item.track_id, 'progress': item.progress, 'quality': item.quality, 'added_at': item.added_at.isoformat() } for item in current_downloads ], 'estimated_time_remaining': await self._estimate_sync_time_remaining(device_id) } except Exception as e: logger.error(f"Error getting sync progress: {e}") raise async def force_sync_now(self, user_id: int, device_id: str) -> bool: """ Force immediate sync for mobile device Args: user_id: User ID device_id: Device ID Returns: Success status """ try: device = await self._get_device(device_id, user_id) if not device: return False # Update sync status device.sync_status = SyncStatus.SYNCING device.last_sync = datetime.datetime.utcnow() await self._save_device(device) # Start background sync await self._start_background_sync(device_id, force=True) logger.info(f"Force sync started for device {device_id}") return True except Exception as e: logger.error(f"Error forcing sync: {e}") return False # Private helper methods def _generate_device_id(self, user_id: int, device_info: Dict[str, Any]) -> str: """Generate unique device ID""" device_string = f"{user_id}_{device_info.get('type', 'unknown')}_{device_info.get('name', 'unknown')}" return hashlib.sha256(device_string.encode()).hexdigest()[:16] def _generate_queue_id(self) -> str: """Generate unique queue ID""" return hashlib.sha256(f"{datetime.datetime.utcnow().isoformat()}".encode()).hexdigest()[:16] async def _save_device(self, device: MobileDevice): """Save device to database""" # This would save to database - simplified for now device_file = self.mobile_data_dir / f"device_{device.device_id}.json" with open(device_file, 'w') as f: json.dump(asdict(device), f, default=str) async def _get_device(self, device_id: str, user_id: int) -> Optional[MobileDevice]: """Get device from database""" device_file = self.mobile_data_dir / f"device_{device_id}.json" if not device_file.exists(): return None with open(device_file, 'r') as f: device_data = json.load(f) if device_data['user_id'] != user_id: return None return MobileDevice(**device_data) async def _initialize_device_storage(self, device: MobileDevice): """Initialize storage for device""" device_storage = self.mobile_data_dir / device.device_id device_storage.mkdir(exist_ok=True) # Create subdirectories (device_storage / "tracks").mkdir(exist_ok=True) (device_storage / "metadata").mkdir(exist_ok=True) (device_storage / "cache").mkdir(exist_ok=True) async def _get_storage_usage(self, device_id: str) -> StorageUsage: """Get storage usage information""" device_storage = self.mobile_data_dir / device_id if not device_storage.exists(): return StorageUsage(0, 0, 0, 0, 0, 0, {}) # Calculate directory sizes total_size = 0 tracks_size = 0 tracks_count = 0 for file_path in device_storage.rglob("*"): if file_path.is_file(): file_size = file_path.stat().st_size total_size += file_size if file_path.parent.name == "tracks": tracks_size += file_size tracks_count += 1 # Get device capacity (this would come from device info) device = await self._get_device(device_id, None) # user_id not needed for this return StorageUsage( total_capacity=device.storage_capacity if device else 0, used_space=total_size, available_space=device.available_storage if device else 0, offline_tracks_count=tracks_count, offline_tracks_size=tracks_size, other_data_size=total_size - tracks_size, quality_breakdown={} # Would calculate by quality ) async def _estimate_download_size(self, track_ids: List[str], quality: OfflineQuality) -> int: """Estimate download size for tracks""" # Simplified estimation - would use actual track metadata quality_sizes = { OfflineQuality.SPACE_SAVER: 3 * 1024 * 1024, # 3MB per track OfflineQuality.BALANCED: 6 * 1024 * 1024, # 6MB per track OfflineQuality.HIGH_QUALITY: 12 * 1024 * 1024, # 12MB per track OfflineQuality.LOSSLESS: 30 * 1024 * 1024, # 30MB per track } return len(track_ids) * quality_sizes.get(quality, quality_sizes[OfflineQuality.BALANCED]) async def _cleanup_old_content(self, device_id: str, required_space: int) -> int: """Cleanup old content to free space""" # Get offline tracks sorted by last played offline_tracks = await self._get_offline_tracks(device_id) # Sort by last played (oldest first) offline_tracks.sort(key=lambda t: t.last_played or datetime.datetime.min) freed_space = 0 for track in offline_tracks: if freed_space >= required_space: break # Remove track if os.path.exists(track.local_path): file_size = os.path.getsize(track.local_path) os.remove(track.local_path) freed_space += file_size # Mark as unavailable track.is_available = False await self._save_offline_track(track) return freed_space async def _add_to_sync_queue(self, queue_item: SyncQueue): """Add item to sync queue""" queue_file = self.mobile_data_dir / f"queue_{queue_item.device_id}.json" # Load existing queue queue = [] if queue_file.exists(): with open(queue_file, 'r') as f: queue = json.load(f) # Add new item queue.append(asdict(queue_item)) # Save queue with open(queue_file, 'w') as f: json.dump(queue, f, default=str) async def _get_sync_queue_items(self, device_id: str) -> List[SyncQueue]: """Get all sync queue items for device""" queue_file = self.mobile_data_dir / f"queue_{device_id}.json" if not queue_file.exists(): return [] with open(queue_file, 'r') as f: queue_data = json.load(f) return [SyncQueue(**item) for item in queue_data] async def _calculate_download_priority(self, track_id: str, user_id: int) -> int: """Calculate download priority for track""" # This would consider factors like: # - User's favorite tracks # - Recently played tracks # - Playlist membership # - User preferences # Simplified for now return 3 # Medium priority async def _start_background_sync(self, device_id: str, force: bool = False): """Start background sync process""" # This would integrate with BackgroundSyncManager # For now, just log the request logger.info(f"Background sync requested for device {device_id}, force={force}") async def _get_offline_track(self, device_id: str, track_id: str) -> Optional[OfflineTrack]: """Get offline track information""" tracks_dir = self.mobile_data_dir / device_id / "metadata" track_file = tracks_dir / f"{track_id}.json" if not track_file.exists(): return None with open(track_file, 'r') as f: track_data = json.load(f) return OfflineTrack(**track_data) async def _save_offline_track(self, track: OfflineTrack): """Save offline track information""" tracks_dir = self.mobile_data_dir / track.device_id / "metadata" tracks_dir.mkdir(exist_ok=True) track_file = tracks_dir / f"{track.track_id}.json" with open(track_file, 'w') as f: json.dump(asdict(track), f, default=str) async def _get_offline_tracks(self, device_id: str) -> List[OfflineTrack]: """Get all offline tracks for device""" tracks_dir = self.mobile_data_dir / device_id / "metadata" if not tracks_dir.exists(): return [] tracks = [] for track_file in tracks_dir.glob("*.json"): with open(track_file, 'r') as f: track_data = json.load(f) tracks.append(OfflineTrack(**track_data)) return tracks async def _remove_offline_track(self, device_id: str, track_id: str): """Remove offline track from database""" tracks_dir = self.mobile_data_dir / device_id / "metadata" track_file = tracks_dir / f"{track_id}.json" if track_file.exists(): track_file.unlink() async def _get_sync_queue_status(self, device_id: str) -> Dict[str, int]: """Get sync queue status summary""" queue_items = await self._get_sync_queue_items(device_id) status_counts = { 'pending': 0, 'downloading': 0, 'completed': 0, 'failed': 0, 'total': len(queue_items) } for item in queue_items: status_counts[item.status] = status_counts.get(item.status, 0) + 1 return status_counts async def _update_storage_usage(self, device_id: str): """Update storage usage information""" # This would update device storage information # For now, just recalculate await self._get_storage_usage(device_id) async def _get_playlist_tracks(self, user_id: int, playlist_id: str) -> List[Dict[str, Any]]: """Get tracks in playlist""" # This would query the database for playlist tracks # Simplified for now return [] async def _estimate_sync_time_remaining(self, device_id: str) -> Optional[int]: """Estimate time remaining for sync completion""" queue_items = await self._get_sync_queue_items(device_id) pending_items = [item for item in queue_items if item.status in ['pending', 'downloading']] if not pending_items: return None # Estimate based on average download time avg_time_per_track = 30 # seconds return len(pending_items) * avg_time_per_track # Global service instance mobile_offline_service = MobileOfflineService()