""" Advanced Audio Quality Management Service This service provides comprehensive audio quality control including: - Adaptive quality streaming based on network conditions - Multi-format support with intelligent transcoding - Audio enhancement features (EQ, spatial audio, loudness normalization) - Quality comparison and analysis tools - Device-specific optimization """ import asyncio import logging import json import os import subprocess import tempfile from typing import Dict, List, Optional, Tuple, Any, Union from dataclasses import dataclass, asdict from enum import Enum from pathlib import Path import aiofiles from sqlalchemy.orm import Session from swingmusic.db import db from swingmusic.config import USER_DATA_DIR from swingmusic.utils.network_monitor import NetworkMonitor from swingmusic.utils.device_detector import DeviceDetector logger = logging.getLogger(__name__) class AudioFormat(Enum): """Supported audio formats""" FLAC = "flac" ALAC = "alac" WAV = "wav" AIFF = "aiff" MP3_320 = "mp3_320" MP3_256 = "mp3_256" MP3_192 = "mp3_192" MP3_128 = "mp3_128" AAC_256 = "aac_256" AAC_192 = "aac_192" AAC_128 = "aac_128" OGG_VORBIS = "ogg_vorbis" OGG_OPUS = "ogg_opus" class QualityLevel(Enum): """Audio quality levels""" LOSSLESS = "lossless" HIGH = "high" MEDIUM = "medium" LOW = "low" DATA_SAVER = "data_saver" class SampleRate(Enum): """Supported sample rates""" RATE_44_1 = "44.1kHz" RATE_48 = "48kHz" RATE_96 = "96kHz" RATE_192 = "192kHz" class BitDepth(Enum): """Supported bit depths""" BIT_16 = "16bit" BIT_24 = "24bit" BIT_32 = "32bit" class SpatialAudioFormat(Enum): """Spatial audio formats""" NONE = "none" STEREO = "stereo" BINAURAL = "binaural" DOLBY_ATMOS = "dolby_atmos" SONY_360 = "sony_360" AMBISONIC = "ambisonic" @dataclass class AudioQualitySettings: """Comprehensive audio quality settings""" # Streaming quality streaming_quality: QualityLevel = QualityLevel.HIGH adaptive_quality: bool = True network_aware_quality: bool = True device_specific_quality: bool = True # Download quality download_format: AudioFormat = AudioFormat.FLAC download_bitrate: Optional[int] = None # For lossy formats download_sample_rate: SampleRate = SampleRate.RATE_44_1 download_bit_depth: BitDepth = BitDepth.BIT_16 # Advanced audio settings enable_dolby_atmos: bool = False enable_360_audio: bool = False spatial_audio_format: SpatialAudioFormat = SpatialAudioFormat.STEREO # Audio enhancements enable_adaptive_eq: bool = True enable_spatial_audio_processing: bool = False enable_loudness_normalization: bool = True target_loudness: float = -14.0 # LUFS # Processing settings enable_crossfade: bool = False crossfade_duration: float = 2.0 enable_gapless_playback: bool = True enable_replaygain: bool = True # Quality preferences prioritize_fidelity: bool = True prioritize_file_size: bool = False prioritize_compatibility: bool = False # Advanced options custom_ffmpeg_params: Dict[str, Any] = None enable_experimental_codecs: bool = False cache_transcoded_files: bool = True @dataclass class AudioAnalysis: """Audio analysis results""" file_path: str format: str duration: float sample_rate: int bit_depth: int bitrate: int channels: int codec: str # Audio characteristics dynamic_range: float # dB peak_level: float # dB rms_level: float # dB loudness: float # LUFS # Frequency analysis frequency_response: Dict[str, float] spectral_centroid: float spectral_rolloff: float # Quality metrics signal_to_noise_ratio: float total_harmonic_distortion: float # Metadata detected_genre: Optional[str] = None acoustic_features: Dict[str, float] = None @dataclass class QualityComparison: """Quality comparison between different formats""" original_file: str formats: Dict[str, Dict[str, Any]] # Comparison metrics size_difference: Dict[str, float] # Percentage quality_score: Dict[str, float] # 0-100 transparency_score: Dict[str, float] # 0-100 # Recommendations recommended_format: str recommended_reason: str class AudioTranscoder: """Audio transcoding with FFmpeg""" def __init__(self): self.ffmpeg_path = self._find_ffmpeg() self.temp_dir = Path(tempfile.gettempdir()) / "swingmusic_transcode" self.temp_dir.mkdir(exist_ok=True) def _find_ffmpeg(self) -> str: """Find FFmpeg executable""" # Try common paths ffmpeg_paths = [ "ffmpeg", "/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg" ] for path in ffmpeg_paths: try: result = subprocess.run([path, "-version"], capture_output=True, text=True) if result.returncode == 0: return path except (subprocess.SubprocessError, FileNotFoundError): continue raise RuntimeError("FFmpeg not found. Please install FFmpeg.") async def transcode(self, input_path: str, output_path: str, settings: AudioQualitySettings) -> bool: """Transcode audio file according to settings""" try: # Build FFmpeg command cmd = self._build_transcode_command(input_path, output_path, settings) # Execute transcoding process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: logger.error(f"FFmpeg error: {stderr.decode()}") return False return True except Exception as e: logger.error(f"Transcoding error: {e}") return False def _build_transcode_command(self, input_path: str, output_path: str, settings: AudioQualitySettings) -> List[str]: """Build FFmpeg command for transcoding""" cmd = [self.ffmpeg_path, "-i", input_path] # Audio codec settings if settings.download_format == AudioFormat.FLAC: cmd.extend(["-c:a", "flac", "-compression_level", "8"]) elif settings.download_format == AudioFormat.MP3_320: cmd.extend(["-c:a", "libmp3lame", "-b:a", "320k"]) elif settings.download_format == AudioFormat.MP3_256: cmd.extend(["-c:a", "libmp3lame", "-b:a", "256k"]) elif settings.download_format == AudioFormat.AAC_256: cmd.extend(["-c:a", "aac", "-b:a", "256k"]) elif settings.download_format == AudioFormat.OGG_VORBIS: cmd.extend(["-c:a", "libvorbis", "-b:a", "256k"]) else: # Default to FLAC cmd.extend(["-c:a", "flac"]) # Sample rate if settings.download_sample_rate == SampleRate.RATE_48: cmd.extend(["-ar", "48000"]) elif settings.download_sample_rate == SampleRate.RATE_96: cmd.extend(["-ar", "96000"]) elif settings.download_sample_rate == SampleRate.RATE_192: cmd.extend(["-ar", "192000"]) # Bit depth if settings.download_bit_depth == BitDepth.BIT_24: cmd.extend(["-sample_format", "s24"]) elif settings.download_bit_depth == BitDepth.BIT_32: cmd.extend(["-sample_format", "s32"]) # Custom FFmpeg parameters if settings.custom_ffmpeg_params: for key, value in settings.custom_ffmpeg_params.items(): if isinstance(value, bool): if value: cmd.extend([key]) else: cmd.extend([key, str(value)]) # Output settings cmd.extend(["-y", output_path]) # -y to overwrite return cmd class AudioAnalyzer: """Audio analysis using FFmpeg and audio processing libraries""" def __init__(self): self.ffmpeg_path = self._find_ffmpeg() def _find_ffmpeg(self) -> str: """Find FFmpeg executable""" ffmpeg_paths = [ "ffmpeg", "/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg" ] for path in ffmpeg_paths: try: result = subprocess.run([path, "-version"], capture_output=True, text=True) if result.returncode == 0: return path except (subprocess.SubprocessError, FileNotFoundError): continue raise RuntimeError("FFmpeg not found") async def analyze_file(self, file_path: str) -> AudioAnalysis: """Comprehensive audio file analysis""" try: # Get basic info with FFprobe probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", file_path ] process = await asyncio.create_subprocess_exec( *probe_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: raise RuntimeError(f"FFprobe error: {stderr.decode()}") probe_data = json.loads(stdout.decode()) # Extract audio stream info audio_stream = None for stream in probe_data.get("streams", []): if stream.get("codec_type") == "audio": audio_stream = stream break if not audio_stream: raise ValueError("No audio stream found") format_info = probe_data.get("format", {}) # Create analysis object analysis = AudioAnalysis( file_path=file_path, format=format_info.get("format_name", "unknown"), duration=float(format_info.get("duration", 0)), sample_rate=int(audio_stream.get("sample_rate", 44100)), bit_depth=self._extract_bit_depth(audio_stream), bitrate=int(format_info.get("bit_rate", 0)), channels=int(audio_stream.get("channels", 2)), codec=audio_stream.get("codec_name", "unknown"), # Audio characteristics (simplified for now) dynamic_range=0.0, peak_level=0.0, rms_level=0.0, loudness=0.0, frequency_response={}, spectral_centroid=0.0, spectral_rolloff=0.0, signal_to_noise_ratio=0.0, total_harmonic_distortion=0.0 ) # Perform advanced analysis await self._perform_advanced_analysis(analysis) return analysis except Exception as e: logger.error(f"Audio analysis error: {e}") raise def _extract_bit_depth(self, stream: Dict) -> int: """Extract bit depth from stream info""" bits_per_sample = stream.get("bits_per_sample") if bits_per_sample: return int(bits_per_sample) # Try to determine from codec codec_name = stream.get("codec_name", "").lower() if "flac" in codec_name or "pcm" in codec_name: return 16 # Default assumption return 16 async def _perform_advanced_analysis(self, analysis: AudioAnalysis): """Perform advanced audio analysis""" try: # This would integrate with audio processing libraries # For now, we'll provide placeholder values # In a real implementation, you would use: # - librosa for audio analysis # - pydub for basic processing # - numpy for mathematical operations analysis.dynamic_range = 15.0 # Placeholder analysis.peak_level = -1.0 # Placeholder analysis.rms_level = -12.0 # Placeholder analysis.loudness = -14.0 # Placeholder (LUFS target) # Frequency bands (Hz) analysis.frequency_response = { "20": 0.0, "60": 0.0, "250": 0.0, "1000": 0.0, "4000": 0.0, "16000": 0.0, "20000": 0.0 } analysis.spectral_centroid = 2000.0 analysis.spectral_rolloff = 18000.0 analysis.signal_to_noise_ratio = 60.0 analysis.total_harmonic_distortion = 0.01 except Exception as e: logger.error(f"Advanced analysis error: {e}") class AdaptiveQualityManager: """Adaptive quality management based on conditions""" def __init__(self): self.network_monitor = NetworkMonitor() self.device_detector = DeviceDetector() self.quality_profiles = self._load_quality_profiles() def _load_quality_profiles(self) -> Dict[str, Dict]: """Load quality profiles for different conditions""" return { "excellent_network": { "streaming": QualityLevel.LOSSLESS, "download": AudioFormat.FLAC, "bitrate": None }, "good_network": { "streaming": QualityLevel.HIGH, "download": AudioFormat.MP3_320, "bitrate": 320 }, "fair_network": { "streaming": QualityLevel.MEDIUM, "download": AudioFormat.MP3_256, "bitrate": 256 }, "poor_network": { "streaming": QualityLevel.LOW, "download": AudioFormat.MP3_128, "bitrate": 128 }, "data_saver": { "streaming": QualityLevel.DATA_SAVER, "download": AudioFormat.MP3_128, "bitrate": 128 }, "mobile_device": { "streaming": QualityLevel.MEDIUM, "download": AudioFormat.AAC_256, "bitrate": 256 }, "high_end_device": { "streaming": QualityLevel.LOSSLESS, "download": AudioFormat.FLAC, "bitrate": None }, "battery_saver": { "streaming": QualityLevel.LOW, "download": AudioFormat.MP3_192, "bitrate": 192 } } async def get_optimal_quality(self, user_settings: AudioQualitySettings, context: Dict[str, Any] = None) -> Dict[str, Any]: """Get optimal quality settings based on current conditions""" context = context or {} # Get current conditions network_speed = await self.network_monitor.get_current_speed() device_info = self.device_detector.get_device_info() battery_level = device_info.get("battery_level", 100) # Determine quality profile profile = self._determine_quality_profile( network_speed, device_info, battery_level, user_settings, context ) return profile def _determine_quality_profile(self, network_speed: float, device_info: Dict, battery_level: float, user_settings: AudioQualitySettings, context: Dict) -> Dict[str, Any]: """Determine the best quality profile""" # Network-based selection if user_settings.network_aware_quality: if network_speed > 10.0: # Mbps network_profile = "excellent_network" elif network_speed > 5.0: network_profile = "good_network" elif network_speed > 2.0: network_profile = "fair_network" elif network_speed > 0.5: network_profile = "poor_network" else: network_profile = "data_saver" else: network_profile = "good_network" # Default # Device-based selection if user_settings.device_specific_quality: device_type = device_info.get("type", "desktop") if device_type == "mobile": device_profile = "mobile_device" elif device_type == "high_end": device_profile = "high_end_device" else: device_profile = "good_network" else: device_profile = "good_network" # Battery-based selection if battery_level < 20 and context.get("battery_saver", False): battery_profile = "battery_saver" else: battery_profile = "good_network" # Select the most restrictive profile profiles = [network_profile, device_profile, battery_profile] selected_profile = self.quality_profiles["good_network"] # Default for profile_name in profiles: profile = self.quality_profiles.get(profile_name) if profile: # Compare and select the most appropriate if self._is_more_restrictive(profile, selected_profile): selected_profile = profile return selected_profile.copy() def _is_more_restrictive(self, profile1: Dict, profile2: Dict) -> bool: """Check if profile1 is more restrictive than profile2""" quality_order = { QualityLevel.LOSSLESS: 4, QualityLevel.HIGH: 3, QualityLevel.MEDIUM: 2, QualityLevel.LOW: 1, QualityLevel.DATA_SAVER: 0 } q1 = quality_order.get(profile1.get("streaming"), 2) q2 = quality_order.get(profile2.get("streaming"), 2) return q1 < q2 class AudioEnhancementService: """Audio enhancement processing""" def __init__(self): self.transcoder = AudioTranscoder() self.analyzer = AudioAnalyzer() async def apply_enhancements(self, input_path: str, output_path: str, settings: AudioQualitySettings) -> bool: """Apply audio enhancements to a file""" try: # Analyze the input file analysis = await self.analyzer.analyze_file(input_path) # Build enhancement command cmd = self._build_enhancement_command(input_path, output_path, settings, analysis) # Apply enhancements process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: logger.error(f"Enhancement error: {stderr.decode()}") return False return True except Exception as e: logger.error(f"Audio enhancement error: {e}") return False def _build_enhancement_command(self, input_path: str, output_path: str, settings: AudioQualitySettings, analysis: AudioAnalysis) -> List[str]: """Build FFmpeg command for audio enhancements""" cmd = [self.transcoder.ffmpeg_path, "-i", input_path] # Audio filters filters = [] # Loudness normalization if settings.enable_loudness_normalization: filters.append(f"loudnorm=I={settings.target_loudness}") # Adaptive EQ (simplified) if settings.enable_adaptive_eq: # This would be more sophisticated in a real implementation # analyzing the frequency response and applying appropriate EQ filters.append("equalizer=f=1000:width_type=h:width=100:g=2") # Spatial audio processing if settings.enable_spatial_audio_processing: if settings.spatial_audio_format == SpatialAudioFormat.BINAURAL: filters.append("bs2b") elif settings.spatial_audio_format == SpatialAudioFormat.AMBISONIC: filters.append("surround") # Combine filters if filters: filter_string = ",".join(filters) cmd.extend(["-af", filter_string]) # Output codec (preserve quality) cmd.extend(["-c:a", "pcm_s16le"]) # Output cmd.extend(["-y", output_path]) return cmd class AudioQualityManager: """ Main audio quality management service This service coordinates all audio quality operations including: - Adaptive quality streaming - Audio transcoding and enhancement - Quality analysis and comparison - User preference management """ def __init__(self): self.transcoder = AudioTranscoder() self.analyzer = AudioAnalyzer() self.adaptive_manager = AdaptiveQualityManager() self.enhancement_service = AudioEnhancementService() # Cache for analysis results self._analysis_cache = {} self._quality_cache = {} async def get_optimal_streaming_quality(self, user_id: int, context: Dict[str, Any] = None) -> Dict[str, Any]: """Get optimal streaming quality for user""" try: # Get user settings user_settings = await self._get_user_settings(user_id) # Get optimal quality based on conditions optimal = await self.adaptive_manager.get_optimal_quality( user_settings, context ) return optimal except Exception as e: logger.error(f"Error getting optimal quality: {e}") return {"streaming": "medium", "download": "mp3_256", "bitrate": 256} async def transcode_for_streaming(self, input_path: str, user_id: int, context: Dict[str, Any] = None) -> Optional[str]: """Transcode file for optimal streaming""" try: # Get optimal quality quality_settings = await self.get_optimal_streaming_quality(user_id, context) # Create output path output_dir = Path(USER_DATA_DIR) / "transcoded" output_dir.mkdir(exist_ok=True) input_file = Path(input_path) output_file = output_dir / f"{input_file.stem}_transcoded.mp3" # Build settings for transcoding settings = AudioQualitySettings() if quality_settings.get("download") == AudioFormat.FLAC: settings.download_format = AudioFormat.FLAC elif quality_settings.get("download") == AudioFormat.MP3_320: settings.download_format = AudioFormat.MP3_320 else: settings.download_format = AudioFormat.MP3_256 # Transcode success = await self.transcoder.transcode( str(input_file), str(output_file), settings ) if success: return str(output_file) else: return None except Exception as e: logger.error(f"Transcoding error: {e}") return None async def analyze_audio_file(self, file_path: str) -> AudioAnalysis: """Analyze audio file""" # Check cache first if file_path in self._analysis_cache: return self._analysis_cache[file_path] try: analysis = await self.analyzer.analyze_file(file_path) self._analysis_cache[file_path] = analysis return analysis except Exception as e: logger.error(f"Analysis error: {e}") raise async def compare_quality_formats(self, original_path: str, formats: List[AudioFormat]) -> QualityComparison: """Compare quality across different formats""" try: original_analysis = await self.analyze_audio_file(original_path) comparison = QualityComparison( original_file=original_path, formats={}, size_difference={}, quality_score={}, transparency_score={}, recommended_format="flac", recommended_reason="Best quality for archival" ) original_size = Path(original_path).stat().st_size for format_type in formats: try: # Transcode to format temp_file = await self._transcode_to_format(original_path, format_type) if temp_file: # Analyze transcoded file transcoded_analysis = await self.analyze_audio_file(temp_file) # Calculate metrics transcoded_size = Path(temp_file).stat().st_size size_diff = ((transcoded_size - original_size) / original_size) * 100 quality_score = self._calculate_quality_score( original_analysis, transcoded_analysis ) transparency_score = self._calculate_transparency_score( original_analysis, transcoded_analysis ) comparison.formats[format_type.value] = { "analysis": asdict(transcoded_analysis), "file_size": transcoded_size, "file_path": temp_file } comparison.size_difference[format_type.value] = size_diff comparison.quality_score[format_type.value] = quality_score comparison.transparency_score[format_type.value] = transparency_score # Clean up temp file os.unlink(temp_file) except Exception as e: logger.error(f"Error comparing format {format_type}: {e}") continue # Determine recommendation comparison.recommended_format, comparison.recommended_reason = \ self._determine_best_format(comparison) return comparison except Exception as e: logger.error(f"Quality comparison error: {e}") raise async def _transcode_to_format(self, input_path: str, format_type: AudioFormat) -> Optional[str]: """Transcode file to specific format for comparison""" try: temp_dir = Path(tempfile.gettempdir()) / "swingmusic_compare" temp_dir.mkdir(exist_ok=True) input_file = Path(input_path) output_file = temp_dir / f"{input_file.stem}_compare.{format_type.value}" settings = AudioQualitySettings() settings.download_format = format_type success = await self.transcoder.transcode( str(input_file), str(output_file), settings ) if success: return str(output_file) else: return None except Exception as e: logger.error(f"Format transcoding error: {e}") return None def _calculate_quality_score(self, original: AudioAnalysis, transcoded: AudioAnalysis) -> float: """Calculate quality score (0-100)""" try: # Simplified quality calculation # In a real implementation, this would be more sophisticated score = 100.0 # Penalize quality loss if transcoded.bitrate < original.bitrate: score -= (original.bitrate - transcoded.bitrate) / original.bitrate * 30 # Penalize sample rate reduction if transcoded.sample_rate < original.sample_rate: score -= (original.sample_rate - transcoded.sample_rate) / original.sample_rate * 20 # Penalize bit depth reduction if transcoded.bit_depth < original.bit_depth: score -= (original.bit_depth - transcoded.bit_depth) / original.bit_depth * 10 return max(0, min(100, score)) except Exception: return 50.0 # Default score def _calculate_transparency_score(self, original: AudioAnalysis, transcoded: AudioAnalysis) -> float: """Calculate transparency score (0-100)""" try: # Simplified transparency calculation # In a real implementation, this would use ABX testing or perceptual models if transcoded.format == original.format: return 100.0 # Lossless formats get high transparency if transcoded.format in ["flac", "alac", "wav"]: return 95.0 # High bitrate lossy formats if transcoded.bitrate >= 320: return 85.0 elif transcoded.bitrate >= 256: return 75.0 elif transcoded.bitrate >= 192: return 60.0 else: return 40.0 except Exception: return 50.0 def _determine_best_format(self, comparison: QualityComparison) -> Tuple[str, str]: """Determine the best format recommendation""" try: best_format = "flac" best_reason = "Best quality for archival purposes" # Consider user priorities scores = comparison.quality_score if scores: # Find format with best balance of quality and size best_score = 0 for format_name, score in scores.items(): size_penalty = abs(comparison.size_difference.get(format_name, 0)) / 100 combined_score = score - size_penalty * 10 if combined_score > best_score: best_score = combined_score best_format = format_name best_reason = f"Best balance of quality ({score:.1f}) and file size" return best_format, best_reason except Exception: return "flac", "Best quality for archival purposes" async def _get_user_settings(self, user_id: int) -> AudioQualitySettings: """Get user's audio quality settings""" try: with db.session() as session: # This would query user_settings table # For now, return defaults return AudioQualitySettings() except Exception as e: logger.error(f"Error getting user settings: {e}") return AudioQualitySettings() async def update_user_settings(self, user_id: int, settings: AudioQualitySettings) -> bool: """Update user's audio quality settings""" try: with db.session() as session: # This would update user_settings table logger.info(f"Updated audio settings for user {user_id}") return True except Exception as e: logger.error(f"Error updating user settings: {e}") return False def clear_cache(self): """Clear analysis and quality cache""" self._analysis_cache.clear() self._quality_cache.clear() # Singleton instance audio_quality_manager = AudioQualityManager()