mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
38f1981283
- Move all backend files from swingmusic/ to root level - Backend files now display directly on GitHub repository page - Keep client applications as submodules (swingmusic-android, swingmusic-desktop, swingmusic-webclient) - Update README to reflect new structure (no cd swingmusic needed) - Cleaner, more professional GitHub repository layout Files moved to root: - src/ (main source code) - pyproject.toml, requirements.txt, run.py - swingmusic.spec, uv.lock, version.txt - services/ Result: GitHub shows backend files directly while maintaining organized structure
382 lines
16 KiB
Python
382 lines
16 KiB
Python
# swingmusic/services/beat_matching.py
|
|
import numpy as np
|
|
import librosa
|
|
from typing import Dict, List, Tuple, Optional
|
|
from dataclasses import dataclass
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class BeatInfo:
|
|
"""Beat information for a track"""
|
|
tempo: float
|
|
beat_frames: np.ndarray
|
|
beat_times: np.ndarray
|
|
downbeats: np.ndarray
|
|
key: str
|
|
energy_curve: np.ndarray
|
|
|
|
@dataclass
|
|
class BeatMatch:
|
|
"""Beat matching result between two tracks"""
|
|
compatibility_score: float
|
|
tempo_ratio: float
|
|
key_compatibility: str
|
|
optimal_mix_point: float
|
|
transition_type: str
|
|
energy_match: float
|
|
|
|
class BeatMatcher:
|
|
"""Advanced beat matching engine for DJ transitions"""
|
|
|
|
def __init__(self):
|
|
self.sample_rate = 22050
|
|
self.hop_length = 512
|
|
self.key_compatibility_matrix = self._build_key_compatibility_matrix()
|
|
|
|
def _build_key_compatibility_matrix(self) -> Dict[str, Dict[str, float]]:
|
|
"""Build harmonic key compatibility matrix"""
|
|
# Camelot wheel compatibility
|
|
compatibility = {
|
|
'1A': {'1A': 1.0, '12A': 0.9, '2A': 0.9, '1B': 0.8, '12B': 0.8},
|
|
'2A': {'2A': 1.0, '1A': 0.9, '3A': 0.9, '2B': 0.8, '1B': 0.8},
|
|
'3A': {'3A': 1.0, '2A': 0.9, '4A': 0.9, '3B': 0.8, '2B': 0.8},
|
|
'4A': {'4A': 1.0, '3A': 0.9, '5A': 0.9, '4B': 0.8, '3B': 0.8},
|
|
'5A': {'5A': 1.0, '4A': 0.9, '6A': 0.9, '5B': 0.8, '4B': 0.8},
|
|
'6A': {'6A': 1.0, '5A': 0.9, '7A': 0.9, '6B': 0.8, '5B': 0.8},
|
|
'7A': {'7A': 1.0, '6A': 0.9, '8A': 0.9, '7B': 0.8, '6B': 0.8},
|
|
'8A': {'8A': 1.0, '7A': 0.9, '9A': 0.9, '8B': 0.8, '7B': 0.8},
|
|
'9A': {'9A': 1.0, '8A': 0.9, '10A': 0.9, '9B': 0.8, '8B': 0.8},
|
|
'10A': {'10A': 1.0, '9A': 0.9, '11A': 0.9, '10B': 0.8, '9B': 0.8},
|
|
'11A': {'11A': 1.0, '10A': 0.9, '12A': 0.9, '11B': 0.8, '10B': 0.8},
|
|
'12A': {'12A': 1.0, '11A': 0.9, '1A': 0.9, '12B': 0.8, '11B': 0.8},
|
|
'1B': {'1B': 1.0, '12B': 0.9, '2B': 0.9, '1A': 0.8, '12A': 0.8},
|
|
'2B': {'2B': 1.0, '1B': 0.9, '3B': 0.9, '2A': 0.8, '1A': 0.8},
|
|
'3B': {'3B': 1.0, '2B': 0.9, '4B': 0.9, '3A': 0.8, '2A': 0.8},
|
|
'4B': {'4B': 1.0, '3B': 0.9, '5B': 0.9, '4A': 0.8, '3A': 0.8},
|
|
'5B': {'5B': 1.0, '4B': 0.9, '6B': 0.9, '5A': 0.8, '4A': 0.8},
|
|
'6B': {'6B': 1.0, '5B': 0.9, '7B': 0.9, '6A': 0.8, '5A': 0.8},
|
|
'7B': {'7B': 1.0, '6B': 0.9, '8B': 0.9, '7A': 0.8, '6A': 0.8},
|
|
'8B': {'8B': 1.0, '7B': 0.9, '9B': 0.9, '8A': 0.8, '7A': 0.8},
|
|
'9B': {'9B': 1.0, '8B': 0.9, '10B': 0.9, '9A': 0.8, '8A': 0.8},
|
|
'10B': {'10B': 1.0, '9B': 0.9, '11B': 0.9, '10A': 0.8, '9A': 0.8},
|
|
'11B': {'11B': 1.0, '10B': 0.9, '12B': 0.9, '11A': 0.8, '10A': 0.8},
|
|
'12B': {'12B': 1.0, '11B': 0.9, '1B': 0.9, '12A': 0.8, '11A': 0.8}
|
|
}
|
|
return compatibility
|
|
|
|
async def analyze_beats(self, audio_path: str) -> BeatInfo:
|
|
"""Analyze beats and tempo from audio file"""
|
|
try:
|
|
# Load audio
|
|
y, sr = librosa.load(audio_path, sr=self.sample_rate)
|
|
|
|
# Extract tempo and beat frames
|
|
tempo, beat_frames = librosa.beat.beat_track(
|
|
y=y, sr=sr, hop_length=self.hop_length
|
|
)
|
|
|
|
# Convert frames to time
|
|
beat_times = librosa.frames_to_time(
|
|
beat_frames, sr=sr, hop_length=self.hop_length
|
|
)
|
|
|
|
# Detect downbeats (first beat of each measure)
|
|
downbeats = self._detect_downbeats(y, sr, beat_frames)
|
|
|
|
# Extract key
|
|
key = await self._extract_key(y, sr)
|
|
|
|
# Calculate energy curve
|
|
energy_curve = self._calculate_energy_curve(y, beat_frames)
|
|
|
|
return BeatInfo(
|
|
tempo=float(tempo),
|
|
beat_frames=beat_frames,
|
|
beat_times=beat_times,
|
|
downbeats=downbeats,
|
|
key=key,
|
|
energy_curve=energy_curve
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Beat analysis failed for {audio_path}: {e}")
|
|
raise
|
|
|
|
def _detect_downbeats(self, y: np.ndarray, sr: int, beat_frames: np.ndarray) -> np.ndarray:
|
|
"""Detect downbeats using harmonic and percussive separation"""
|
|
# Separate harmonic and percussive
|
|
y_harmonic = librosa.effects.harmonic(y)
|
|
y_percussive = librosa.effects.percussive(y)
|
|
|
|
# Calculate onset strength
|
|
harmonic_onset = librosa.onset.onset_strength(
|
|
y=y_harmonic, sr=sr, hop_length=self.hop_length
|
|
)
|
|
percussive_onset = librosa.onset.onset_strength(
|
|
y=y_percussive, sr=sr, hop_length=self.hop_length
|
|
)
|
|
|
|
# Combine onset strengths
|
|
combined_onset = harmonic_onset + percussive_onset
|
|
|
|
# Find peaks at beat positions
|
|
beat_onset_strength = combined_onset[beat_frames]
|
|
|
|
# Detect downbeats (peaks in onset strength)
|
|
downbeat_indices = []
|
|
for i in range(1, len(beat_onset_strength) - 1):
|
|
if (beat_onset_strength[i] > beat_onset_strength[i-1] and
|
|
beat_onset_strength[i] > beat_onset_strength[i+1]):
|
|
downbeat_indices.append(i)
|
|
|
|
return beat_frames[downbeat_indices] if downbeat_indices else beat_frames[::4] # Assume 4/4 time
|
|
|
|
async def _extract_key(self, y: np.ndarray, sr: int) -> str:
|
|
"""Extract musical key using chroma features"""
|
|
# Extract chroma features
|
|
chroma = librosa.feature.chroma_stft(y=y, sr=sr, hop_length=self.hop_length)
|
|
|
|
# Average chroma across time
|
|
chroma_mean = np.mean(chroma, axis=1)
|
|
|
|
# Determine key profile (simplified)
|
|
# Map to Camelot notation
|
|
key_profiles = {
|
|
'C': '8B', 'C#': '3B', 'D': '10B', 'D#': '5B',
|
|
'E': '12B', 'F': '7B', 'F#': '2B', 'G': '9B',
|
|
'G#': '4B', 'A': '11B', 'A#': '6B', 'B': '1B',
|
|
'Cm': '8A', 'C#m': '3A', 'Dm': '10A', 'D#m': '5A',
|
|
'Em': '12A', 'Fm': '7A', 'F#m': '2A', 'Gm': '9A',
|
|
'G#m': '4A', 'Am': '11A', 'A#m': '6A', 'Bm': '1A'
|
|
}
|
|
|
|
# Find best matching key (simplified - would need more sophisticated analysis)
|
|
max_chroma_idx = np.argmax(chroma_mean)
|
|
key_names = list(key_profiles.keys())
|
|
detected_key = key_names[max_chroma_idx % len(key_names)]
|
|
|
|
return key_profiles.get(detected_key, '8A') # Default to C major
|
|
|
|
def _calculate_energy_curve(self, y: np.ndarray, beat_frames: np.ndarray) -> np.ndarray:
|
|
"""Calculate energy curve for each beat"""
|
|
energy_curve = []
|
|
|
|
for i in range(len(beat_frames) - 1):
|
|
start_frame = beat_frames[i]
|
|
end_frame = beat_frames[i + 1] if i + 1 < len(beat_frames) else len(y)
|
|
|
|
# Calculate RMS energy for this beat
|
|
beat_segment = y[start_frame:end_frame]
|
|
energy = np.sqrt(np.mean(beat_segment ** 2))
|
|
energy_curve.append(energy)
|
|
|
|
return np.array(energy_curve)
|
|
|
|
async def calculate_beat_match(self, track1_beats: BeatInfo, track2_beats: BeatInfo) -> BeatMatch:
|
|
"""Calculate beat matching compatibility between two tracks"""
|
|
try:
|
|
# Tempo compatibility
|
|
tempo_ratio = track2_beats.tempo / track1_beats.tempo
|
|
tempo_compatibility = self._calculate_tempo_compatibility(tempo_ratio)
|
|
|
|
# Key compatibility
|
|
key_compatibility_score = self._calculate_key_compatibility(
|
|
track1_beats.key, track2_beats.key
|
|
)
|
|
|
|
# Energy matching
|
|
energy_match = self._calculate_energy_compatibility(
|
|
track1_beats.energy_curve, track2_beats.energy_curve
|
|
)
|
|
|
|
# Overall compatibility
|
|
compatibility_score = (
|
|
tempo_compatibility * 0.4 +
|
|
key_compatibility_score * 0.3 +
|
|
energy_match * 0.3
|
|
)
|
|
|
|
# Determine optimal transition type
|
|
transition_type = self._determine_transition_type(
|
|
tempo_compatibility, key_compatibility_score, energy_match
|
|
)
|
|
|
|
# Find optimal mix point
|
|
optimal_mix_point = self._find_optimal_mix_point(
|
|
track1_beats, track2_beats, transition_type
|
|
)
|
|
|
|
return BeatMatch(
|
|
compatibility_score=compatibility_score,
|
|
tempo_ratio=tempo_ratio,
|
|
key_compatibility=f"{track1_beats.key} → {track2_beats.key}",
|
|
optimal_mix_point=optimal_mix_point,
|
|
transition_type=transition_type,
|
|
energy_match=energy_match
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Beat matching calculation failed: {e}")
|
|
raise
|
|
|
|
def _calculate_tempo_compatibility(self, tempo_ratio: float) -> float:
|
|
"""Calculate tempo compatibility score"""
|
|
# Perfect match (same tempo)
|
|
if 0.98 <= tempo_ratio <= 1.02:
|
|
return 1.0
|
|
|
|
# Harmonic ratios (2x, 0.5x)
|
|
if 0.49 <= tempo_ratio <= 0.51 or 1.98 <= tempo_ratio <= 2.02:
|
|
return 0.8
|
|
|
|
# Close match (within 10%)
|
|
if 0.9 <= tempo_ratio <= 1.1:
|
|
return 0.7
|
|
|
|
# Acceptable range (within 20%)
|
|
if 0.8 <= tempo_ratio <= 1.2:
|
|
return 0.5
|
|
|
|
# Poor compatibility
|
|
return 0.2
|
|
|
|
def _calculate_key_compatibility(self, key1: str, key2: str) -> float:
|
|
"""Calculate harmonic key compatibility"""
|
|
if key1 in self.key_compatibility_matrix and key2 in self.key_compatibility_matrix[key1]:
|
|
return self.key_compatibility_matrix[key1][key2]
|
|
return 0.1 # Very low compatibility for unknown keys
|
|
|
|
def _calculate_energy_compatibility(self, energy1: np.ndarray, energy2: np.ndarray) -> float:
|
|
"""Calculate energy curve compatibility"""
|
|
if len(energy1) == 0 or len(energy2) == 0:
|
|
return 0.5
|
|
|
|
# Normalize energy curves
|
|
energy1_norm = (energy1 - np.min(energy1)) / (np.max(energy1) - np.min(energy1))
|
|
energy2_norm = (energy2 - np.min(energy2)) / (np.max(energy2) - np.min(energy2))
|
|
|
|
# Calculate correlation
|
|
min_length = min(len(energy1_norm), len(energy2_norm))
|
|
correlation = np.corrcoef(energy1_norm[:min_length], energy2_norm[:min_length])[0, 1]
|
|
|
|
if np.isnan(correlation):
|
|
return 0.5
|
|
|
|
return (correlation + 1) / 2 # Normalize to 0-1 range
|
|
|
|
def _determine_transition_type(self, tempo_comp: float, key_comp: float, energy_comp: float) -> str:
|
|
"""Determine optimal transition type"""
|
|
overall_score = (tempo_comp + key_comp + energy_comp) / 3
|
|
|
|
if overall_score >= 0.8:
|
|
return "perfect_blend"
|
|
elif overall_score >= 0.6:
|
|
return "smooth_transition"
|
|
elif overall_score >= 0.4:
|
|
return "crossfade"
|
|
elif key_comp >= 0.7:
|
|
return "harmonic_mix"
|
|
else:
|
|
return "cut"
|
|
|
|
def _find_optimal_mix_point(self, track1_beats: BeatInfo, track2_beats: BeatInfo,
|
|
transition_type: str) -> float:
|
|
"""Find optimal mix point in track 1 (0-1, where 1 is end)"""
|
|
if transition_type == "perfect_blend":
|
|
# Mix near the end for perfect blend
|
|
return 0.85
|
|
elif transition_type == "smooth_transition":
|
|
# Earlier mix point for smooth transition
|
|
return 0.75
|
|
elif transition_type == "crossfade":
|
|
# Standard crossfade point
|
|
return 0.9
|
|
elif transition_type == "harmonic_mix":
|
|
# Mix at downbeat for harmonic compatibility
|
|
if len(track1_beats.downbeats) > 0:
|
|
last_downbeat_ratio = track1_beats.downbeats[-1] / len(track1_beats.beat_frames)
|
|
return min(last_downbeat_ratio + 0.1, 0.9)
|
|
return 0.8
|
|
else: # cut
|
|
# Quick cut near the end
|
|
return 0.95
|
|
|
|
async def create_beat_sync_transition(self, track1_path: str, track2_path: str,
|
|
beat_match: BeatMatch) -> Dict:
|
|
"""Create beat-synchronized transition parameters"""
|
|
try:
|
|
# Analyze both tracks
|
|
track1_beats = await self.analyze_beats(track1_path)
|
|
track2_beats = await self.analyze_beats(track2_path)
|
|
|
|
# Calculate transition parameters
|
|
transition_start = beat_match.optimal_mix_point
|
|
transition_duration = self._calculate_transition_duration(
|
|
beat_match.transition_type, track1_beats.tempo
|
|
)
|
|
|
|
# Calculate tempo adjustment
|
|
tempo_adjustment = 1.0 / beat_match.tempo_ratio if beat_match.tempo_ratio != 1.0 else 1.0
|
|
|
|
return {
|
|
'transition_type': beat_match.transition_type,
|
|
'start_point': transition_start,
|
|
'duration': transition_duration,
|
|
'tempo_adjustment': tempo_adjustment,
|
|
'key_change_needed': beat_match.key_compatibility,
|
|
'energy_ramp': self._calculate_energy_ramp(
|
|
track1_beats.energy_curve, track2_beats.energy_curve,
|
|
transition_start, transition_duration
|
|
),
|
|
'beat_markers': {
|
|
'track1_last_beat': transition_start,
|
|
'track2_first_beat': 0.0,
|
|
'sync_point': transition_start + transition_duration / 2
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Beat sync transition creation failed: {e}")
|
|
raise
|
|
|
|
def _calculate_transition_duration(self, transition_type: str, tempo: float) -> float:
|
|
"""Calculate transition duration based on type and tempo"""
|
|
beat_duration = 60.0 / tempo
|
|
|
|
if transition_type == "perfect_blend":
|
|
return beat_duration * 16 # 16 bars
|
|
elif transition_type == "smooth_transition":
|
|
return beat_duration * 8 # 8 bars
|
|
elif transition_type == "crossfade":
|
|
return beat_duration * 4 # 4 bars
|
|
elif transition_type == "harmonic_mix":
|
|
return beat_duration * 8 # 8 bars for harmonic transition
|
|
else: # cut
|
|
return beat_duration * 1 # 1 bar quick cut
|
|
|
|
def _calculate_energy_ramp(self, energy1: np.ndarray, energy2: np.ndarray,
|
|
start_point: float, duration: float) -> np.ndarray:
|
|
"""Calculate energy ramp for smooth transition"""
|
|
# Get energy values around transition point
|
|
start_idx = int(len(energy1) * start_point)
|
|
end_idx = min(start_idx + int(duration * len(energy1)), len(energy1))
|
|
|
|
if start_idx >= len(energy1):
|
|
return np.array([0.5]) # Default energy
|
|
|
|
track1_energy = energy1[start_idx:end_idx] if start_idx < len(energy1) else np.array([energy1[-1]])
|
|
|
|
# Create smooth ramp from track1 energy to track2 energy
|
|
track2_start_energy = energy2[0] if len(energy2) > 0 else 0.5
|
|
|
|
if len(track1_energy) == 0:
|
|
return np.linspace(0.5, track2_start_energy, 10)
|
|
|
|
ramp_points = max(len(track1_energy), 10)
|
|
energy_ramp = np.linspace(track1_energy[-1] if len(track1_energy) > 0 else 0.5,
|
|
track2_start_energy, ramp_points)
|
|
|
|
return energy_ramp
|