# swingmusic/services/beat_matching.py import numpy as np import librosa from typing import Dict, List, Tuple, Optional from dataclasses import dataclass import logging logger = logging.getLogger(__name__) @dataclass class BeatInfo: """Beat information for a track""" tempo: float beat_frames: np.ndarray beat_times: np.ndarray downbeats: np.ndarray key: str energy_curve: np.ndarray @dataclass class BeatMatch: """Beat matching result between two tracks""" compatibility_score: float tempo_ratio: float key_compatibility: str optimal_mix_point: float transition_type: str energy_match: float class BeatMatcher: """Advanced beat matching engine for DJ transitions""" def __init__(self): self.sample_rate = 22050 self.hop_length = 512 self.key_compatibility_matrix = self._build_key_compatibility_matrix() def _build_key_compatibility_matrix(self) -> Dict[str, Dict[str, float]]: """Build harmonic key compatibility matrix""" # Camelot wheel compatibility compatibility = { '1A': {'1A': 1.0, '12A': 0.9, '2A': 0.9, '1B': 0.8, '12B': 0.8}, '2A': {'2A': 1.0, '1A': 0.9, '3A': 0.9, '2B': 0.8, '1B': 0.8}, '3A': {'3A': 1.0, '2A': 0.9, '4A': 0.9, '3B': 0.8, '2B': 0.8}, '4A': {'4A': 1.0, '3A': 0.9, '5A': 0.9, '4B': 0.8, '3B': 0.8}, '5A': {'5A': 1.0, '4A': 0.9, '6A': 0.9, '5B': 0.8, '4B': 0.8}, '6A': {'6A': 1.0, '5A': 0.9, '7A': 0.9, '6B': 0.8, '5B': 0.8}, '7A': {'7A': 1.0, '6A': 0.9, '8A': 0.9, '7B': 0.8, '6B': 0.8}, '8A': {'8A': 1.0, '7A': 0.9, '9A': 0.9, '8B': 0.8, '7B': 0.8}, '9A': {'9A': 1.0, '8A': 0.9, '10A': 0.9, '9B': 0.8, '8B': 0.8}, '10A': {'10A': 1.0, '9A': 0.9, '11A': 0.9, '10B': 0.8, '9B': 0.8}, '11A': {'11A': 1.0, '10A': 0.9, '12A': 0.9, '11B': 0.8, '10B': 0.8}, '12A': {'12A': 1.0, '11A': 0.9, '1A': 0.9, '12B': 0.8, '11B': 0.8}, '1B': {'1B': 1.0, '12B': 0.9, '2B': 0.9, '1A': 0.8, '12A': 0.8}, '2B': {'2B': 1.0, '1B': 0.9, '3B': 0.9, '2A': 0.8, '1A': 0.8}, '3B': {'3B': 1.0, '2B': 0.9, '4B': 0.9, '3A': 0.8, '2A': 0.8}, '4B': {'4B': 1.0, '3B': 0.9, '5B': 0.9, '4A': 0.8, '3A': 0.8}, '5B': {'5B': 1.0, '4B': 0.9, '6B': 0.9, '5A': 0.8, '4A': 0.8}, '6B': {'6B': 1.0, '5B': 0.9, '7B': 0.9, '6A': 0.8, '5A': 0.8}, '7B': {'7B': 1.0, '6B': 0.9, '8B': 0.9, '7A': 0.8, '6A': 0.8}, '8B': {'8B': 1.0, '7B': 0.9, '9B': 0.9, '8A': 0.8, '7A': 0.8}, '9B': {'9B': 1.0, '8B': 0.9, '10B': 0.9, '9A': 0.8, '8A': 0.8}, '10B': {'10B': 1.0, '9B': 0.9, '11B': 0.9, '10A': 0.8, '9A': 0.8}, '11B': {'11B': 1.0, '10B': 0.9, '12B': 0.9, '11A': 0.8, '10A': 0.8}, '12B': {'12B': 1.0, '11B': 0.9, '1B': 0.9, '12A': 0.8, '11A': 0.8} } return compatibility async def analyze_beats(self, audio_path: str) -> BeatInfo: """Analyze beats and tempo from audio file""" try: # Load audio y, sr = librosa.load(audio_path, sr=self.sample_rate) # Extract tempo and beat frames tempo, beat_frames = librosa.beat.beat_track( y=y, sr=sr, hop_length=self.hop_length ) # Convert frames to time beat_times = librosa.frames_to_time( beat_frames, sr=sr, hop_length=self.hop_length ) # Detect downbeats (first beat of each measure) downbeats = self._detect_downbeats(y, sr, beat_frames) # Extract key key = await self._extract_key(y, sr) # Calculate energy curve energy_curve = self._calculate_energy_curve(y, beat_frames) return BeatInfo( tempo=float(tempo), beat_frames=beat_frames, beat_times=beat_times, downbeats=downbeats, key=key, energy_curve=energy_curve ) except Exception as e: logger.error(f"Beat analysis failed for {audio_path}: {e}") raise def _detect_downbeats(self, y: np.ndarray, sr: int, beat_frames: np.ndarray) -> np.ndarray: """Detect downbeats using harmonic and percussive separation""" # Separate harmonic and percussive y_harmonic = librosa.effects.harmonic(y) y_percussive = librosa.effects.percussive(y) # Calculate onset strength harmonic_onset = librosa.onset.onset_strength( y=y_harmonic, sr=sr, hop_length=self.hop_length ) percussive_onset = librosa.onset.onset_strength( y=y_percussive, sr=sr, hop_length=self.hop_length ) # Combine onset strengths combined_onset = harmonic_onset + percussive_onset # Find peaks at beat positions beat_onset_strength = combined_onset[beat_frames] # Detect downbeats (peaks in onset strength) downbeat_indices = [] for i in range(1, len(beat_onset_strength) - 1): if (beat_onset_strength[i] > beat_onset_strength[i-1] and beat_onset_strength[i] > beat_onset_strength[i+1]): downbeat_indices.append(i) return beat_frames[downbeat_indices] if downbeat_indices else beat_frames[::4] # Assume 4/4 time async def _extract_key(self, y: np.ndarray, sr: int) -> str: """Extract musical key using chroma features""" # Extract chroma features chroma = librosa.feature.chroma_stft(y=y, sr=sr, hop_length=self.hop_length) # Average chroma across time chroma_mean = np.mean(chroma, axis=1) # Determine key profile (simplified) # Map to Camelot notation key_profiles = { 'C': '8B', 'C#': '3B', 'D': '10B', 'D#': '5B', 'E': '12B', 'F': '7B', 'F#': '2B', 'G': '9B', 'G#': '4B', 'A': '11B', 'A#': '6B', 'B': '1B', 'Cm': '8A', 'C#m': '3A', 'Dm': '10A', 'D#m': '5A', 'Em': '12A', 'Fm': '7A', 'F#m': '2A', 'Gm': '9A', 'G#m': '4A', 'Am': '11A', 'A#m': '6A', 'Bm': '1A' } # Find best matching key (simplified - would need more sophisticated analysis) max_chroma_idx = np.argmax(chroma_mean) key_names = list(key_profiles.keys()) detected_key = key_names[max_chroma_idx % len(key_names)] return key_profiles.get(detected_key, '8A') # Default to C major def _calculate_energy_curve(self, y: np.ndarray, beat_frames: np.ndarray) -> np.ndarray: """Calculate energy curve for each beat""" energy_curve = [] for i in range(len(beat_frames) - 1): start_frame = beat_frames[i] end_frame = beat_frames[i + 1] if i + 1 < len(beat_frames) else len(y) # Calculate RMS energy for this beat beat_segment = y[start_frame:end_frame] energy = np.sqrt(np.mean(beat_segment ** 2)) energy_curve.append(energy) return np.array(energy_curve) async def calculate_beat_match(self, track1_beats: BeatInfo, track2_beats: BeatInfo) -> BeatMatch: """Calculate beat matching compatibility between two tracks""" try: # Tempo compatibility tempo_ratio = track2_beats.tempo / track1_beats.tempo tempo_compatibility = self._calculate_tempo_compatibility(tempo_ratio) # Key compatibility key_compatibility_score = self._calculate_key_compatibility( track1_beats.key, track2_beats.key ) # Energy matching energy_match = self._calculate_energy_compatibility( track1_beats.energy_curve, track2_beats.energy_curve ) # Overall compatibility compatibility_score = ( tempo_compatibility * 0.4 + key_compatibility_score * 0.3 + energy_match * 0.3 ) # Determine optimal transition type transition_type = self._determine_transition_type( tempo_compatibility, key_compatibility_score, energy_match ) # Find optimal mix point optimal_mix_point = self._find_optimal_mix_point( track1_beats, track2_beats, transition_type ) return BeatMatch( compatibility_score=compatibility_score, tempo_ratio=tempo_ratio, key_compatibility=f"{track1_beats.key} → {track2_beats.key}", optimal_mix_point=optimal_mix_point, transition_type=transition_type, energy_match=energy_match ) except Exception as e: logger.error(f"Beat matching calculation failed: {e}") raise def _calculate_tempo_compatibility(self, tempo_ratio: float) -> float: """Calculate tempo compatibility score""" # Perfect match (same tempo) if 0.98 <= tempo_ratio <= 1.02: return 1.0 # Harmonic ratios (2x, 0.5x) if 0.49 <= tempo_ratio <= 0.51 or 1.98 <= tempo_ratio <= 2.02: return 0.8 # Close match (within 10%) if 0.9 <= tempo_ratio <= 1.1: return 0.7 # Acceptable range (within 20%) if 0.8 <= tempo_ratio <= 1.2: return 0.5 # Poor compatibility return 0.2 def _calculate_key_compatibility(self, key1: str, key2: str) -> float: """Calculate harmonic key compatibility""" if key1 in self.key_compatibility_matrix and key2 in self.key_compatibility_matrix[key1]: return self.key_compatibility_matrix[key1][key2] return 0.1 # Very low compatibility for unknown keys def _calculate_energy_compatibility(self, energy1: np.ndarray, energy2: np.ndarray) -> float: """Calculate energy curve compatibility""" if len(energy1) == 0 or len(energy2) == 0: return 0.5 # Normalize energy curves energy1_norm = (energy1 - np.min(energy1)) / (np.max(energy1) - np.min(energy1)) energy2_norm = (energy2 - np.min(energy2)) / (np.max(energy2) - np.min(energy2)) # Calculate correlation min_length = min(len(energy1_norm), len(energy2_norm)) correlation = np.corrcoef(energy1_norm[:min_length], energy2_norm[:min_length])[0, 1] if np.isnan(correlation): return 0.5 return (correlation + 1) / 2 # Normalize to 0-1 range def _determine_transition_type(self, tempo_comp: float, key_comp: float, energy_comp: float) -> str: """Determine optimal transition type""" overall_score = (tempo_comp + key_comp + energy_comp) / 3 if overall_score >= 0.8: return "perfect_blend" elif overall_score >= 0.6: return "smooth_transition" elif overall_score >= 0.4: return "crossfade" elif key_comp >= 0.7: return "harmonic_mix" else: return "cut" def _find_optimal_mix_point(self, track1_beats: BeatInfo, track2_beats: BeatInfo, transition_type: str) -> float: """Find optimal mix point in track 1 (0-1, where 1 is end)""" if transition_type == "perfect_blend": # Mix near the end for perfect blend return 0.85 elif transition_type == "smooth_transition": # Earlier mix point for smooth transition return 0.75 elif transition_type == "crossfade": # Standard crossfade point return 0.9 elif transition_type == "harmonic_mix": # Mix at downbeat for harmonic compatibility if len(track1_beats.downbeats) > 0: last_downbeat_ratio = track1_beats.downbeats[-1] / len(track1_beats.beat_frames) return min(last_downbeat_ratio + 0.1, 0.9) return 0.8 else: # cut # Quick cut near the end return 0.95 async def create_beat_sync_transition(self, track1_path: str, track2_path: str, beat_match: BeatMatch) -> Dict: """Create beat-synchronized transition parameters""" try: # Analyze both tracks track1_beats = await self.analyze_beats(track1_path) track2_beats = await self.analyze_beats(track2_path) # Calculate transition parameters transition_start = beat_match.optimal_mix_point transition_duration = self._calculate_transition_duration( beat_match.transition_type, track1_beats.tempo ) # Calculate tempo adjustment tempo_adjustment = 1.0 / beat_match.tempo_ratio if beat_match.tempo_ratio != 1.0 else 1.0 return { 'transition_type': beat_match.transition_type, 'start_point': transition_start, 'duration': transition_duration, 'tempo_adjustment': tempo_adjustment, 'key_change_needed': beat_match.key_compatibility, 'energy_ramp': self._calculate_energy_ramp( track1_beats.energy_curve, track2_beats.energy_curve, transition_start, transition_duration ), 'beat_markers': { 'track1_last_beat': transition_start, 'track2_first_beat': 0.0, 'sync_point': transition_start + transition_duration / 2 } } except Exception as e: logger.error(f"Beat sync transition creation failed: {e}") raise def _calculate_transition_duration(self, transition_type: str, tempo: float) -> float: """Calculate transition duration based on type and tempo""" beat_duration = 60.0 / tempo if transition_type == "perfect_blend": return beat_duration * 16 # 16 bars elif transition_type == "smooth_transition": return beat_duration * 8 # 8 bars elif transition_type == "crossfade": return beat_duration * 4 # 4 bars elif transition_type == "harmonic_mix": return beat_duration * 8 # 8 bars for harmonic transition else: # cut return beat_duration * 1 # 1 bar quick cut def _calculate_energy_ramp(self, energy1: np.ndarray, energy2: np.ndarray, start_point: float, duration: float) -> np.ndarray: """Calculate energy ramp for smooth transition""" # Get energy values around transition point start_idx = int(len(energy1) * start_point) end_idx = min(start_idx + int(duration * len(energy1)), len(energy1)) if start_idx >= len(energy1): return np.array([0.5]) # Default energy track1_energy = energy1[start_idx:end_idx] if start_idx < len(energy1) else np.array([energy1[-1]]) # Create smooth ramp from track1 energy to track2 energy track2_start_energy = energy2[0] if len(energy2) > 0 else 0.5 if len(track1_energy) == 0: return np.linspace(0.5, track2_start_energy, 10) ramp_points = max(len(track1_energy), 10) energy_ramp = np.linspace(track1_energy[-1] if len(track1_energy) > 0 else 0.5, track2_start_energy, ramp_points) return energy_ramp