Files
Tomas Dvorak 38f1981283 Move backend files to root level for cleaner GitHub display
- Move all backend files from swingmusic/ to root level
- Backend files now display directly on GitHub repository page
- Keep client applications as submodules (swingmusic-android, swingmusic-desktop, swingmusic-webclient)
- Update README to reflect new structure (no cd swingmusic needed)
- Cleaner, more professional GitHub repository layout

Files moved to root:
- src/ (main source code)
- pyproject.toml, requirements.txt, run.py
- swingmusic.spec, uv.lock, version.txt
- services/

Result: GitHub shows backend files directly while maintaining organized structure
2026-03-17 22:37:49 +01:00

382 lines
16 KiB
Python

# swingmusic/services/beat_matching.py
import numpy as np
import librosa
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class BeatInfo:
"""Beat information for a track"""
tempo: float
beat_frames: np.ndarray
beat_times: np.ndarray
downbeats: np.ndarray
key: str
energy_curve: np.ndarray
@dataclass
class BeatMatch:
"""Beat matching result between two tracks"""
compatibility_score: float
tempo_ratio: float
key_compatibility: str
optimal_mix_point: float
transition_type: str
energy_match: float
class BeatMatcher:
"""Advanced beat matching engine for DJ transitions"""
def __init__(self):
self.sample_rate = 22050
self.hop_length = 512
self.key_compatibility_matrix = self._build_key_compatibility_matrix()
def _build_key_compatibility_matrix(self) -> Dict[str, Dict[str, float]]:
"""Build harmonic key compatibility matrix"""
# Camelot wheel compatibility
compatibility = {
'1A': {'1A': 1.0, '12A': 0.9, '2A': 0.9, '1B': 0.8, '12B': 0.8},
'2A': {'2A': 1.0, '1A': 0.9, '3A': 0.9, '2B': 0.8, '1B': 0.8},
'3A': {'3A': 1.0, '2A': 0.9, '4A': 0.9, '3B': 0.8, '2B': 0.8},
'4A': {'4A': 1.0, '3A': 0.9, '5A': 0.9, '4B': 0.8, '3B': 0.8},
'5A': {'5A': 1.0, '4A': 0.9, '6A': 0.9, '5B': 0.8, '4B': 0.8},
'6A': {'6A': 1.0, '5A': 0.9, '7A': 0.9, '6B': 0.8, '5B': 0.8},
'7A': {'7A': 1.0, '6A': 0.9, '8A': 0.9, '7B': 0.8, '6B': 0.8},
'8A': {'8A': 1.0, '7A': 0.9, '9A': 0.9, '8B': 0.8, '7B': 0.8},
'9A': {'9A': 1.0, '8A': 0.9, '10A': 0.9, '9B': 0.8, '8B': 0.8},
'10A': {'10A': 1.0, '9A': 0.9, '11A': 0.9, '10B': 0.8, '9B': 0.8},
'11A': {'11A': 1.0, '10A': 0.9, '12A': 0.9, '11B': 0.8, '10B': 0.8},
'12A': {'12A': 1.0, '11A': 0.9, '1A': 0.9, '12B': 0.8, '11B': 0.8},
'1B': {'1B': 1.0, '12B': 0.9, '2B': 0.9, '1A': 0.8, '12A': 0.8},
'2B': {'2B': 1.0, '1B': 0.9, '3B': 0.9, '2A': 0.8, '1A': 0.8},
'3B': {'3B': 1.0, '2B': 0.9, '4B': 0.9, '3A': 0.8, '2A': 0.8},
'4B': {'4B': 1.0, '3B': 0.9, '5B': 0.9, '4A': 0.8, '3A': 0.8},
'5B': {'5B': 1.0, '4B': 0.9, '6B': 0.9, '5A': 0.8, '4A': 0.8},
'6B': {'6B': 1.0, '5B': 0.9, '7B': 0.9, '6A': 0.8, '5A': 0.8},
'7B': {'7B': 1.0, '6B': 0.9, '8B': 0.9, '7A': 0.8, '6A': 0.8},
'8B': {'8B': 1.0, '7B': 0.9, '9B': 0.9, '8A': 0.8, '7A': 0.8},
'9B': {'9B': 1.0, '8B': 0.9, '10B': 0.9, '9A': 0.8, '8A': 0.8},
'10B': {'10B': 1.0, '9B': 0.9, '11B': 0.9, '10A': 0.8, '9A': 0.8},
'11B': {'11B': 1.0, '10B': 0.9, '12B': 0.9, '11A': 0.8, '10A': 0.8},
'12B': {'12B': 1.0, '11B': 0.9, '1B': 0.9, '12A': 0.8, '11A': 0.8}
}
return compatibility
async def analyze_beats(self, audio_path: str) -> BeatInfo:
"""Analyze beats and tempo from audio file"""
try:
# Load audio
y, sr = librosa.load(audio_path, sr=self.sample_rate)
# Extract tempo and beat frames
tempo, beat_frames = librosa.beat.beat_track(
y=y, sr=sr, hop_length=self.hop_length
)
# Convert frames to time
beat_times = librosa.frames_to_time(
beat_frames, sr=sr, hop_length=self.hop_length
)
# Detect downbeats (first beat of each measure)
downbeats = self._detect_downbeats(y, sr, beat_frames)
# Extract key
key = await self._extract_key(y, sr)
# Calculate energy curve
energy_curve = self._calculate_energy_curve(y, beat_frames)
return BeatInfo(
tempo=float(tempo),
beat_frames=beat_frames,
beat_times=beat_times,
downbeats=downbeats,
key=key,
energy_curve=energy_curve
)
except Exception as e:
logger.error(f"Beat analysis failed for {audio_path}: {e}")
raise
def _detect_downbeats(self, y: np.ndarray, sr: int, beat_frames: np.ndarray) -> np.ndarray:
"""Detect downbeats using harmonic and percussive separation"""
# Separate harmonic and percussive
y_harmonic = librosa.effects.harmonic(y)
y_percussive = librosa.effects.percussive(y)
# Calculate onset strength
harmonic_onset = librosa.onset.onset_strength(
y=y_harmonic, sr=sr, hop_length=self.hop_length
)
percussive_onset = librosa.onset.onset_strength(
y=y_percussive, sr=sr, hop_length=self.hop_length
)
# Combine onset strengths
combined_onset = harmonic_onset + percussive_onset
# Find peaks at beat positions
beat_onset_strength = combined_onset[beat_frames]
# Detect downbeats (peaks in onset strength)
downbeat_indices = []
for i in range(1, len(beat_onset_strength) - 1):
if (beat_onset_strength[i] > beat_onset_strength[i-1] and
beat_onset_strength[i] > beat_onset_strength[i+1]):
downbeat_indices.append(i)
return beat_frames[downbeat_indices] if downbeat_indices else beat_frames[::4] # Assume 4/4 time
async def _extract_key(self, y: np.ndarray, sr: int) -> str:
"""Extract musical key using chroma features"""
# Extract chroma features
chroma = librosa.feature.chroma_stft(y=y, sr=sr, hop_length=self.hop_length)
# Average chroma across time
chroma_mean = np.mean(chroma, axis=1)
# Determine key profile (simplified)
# Map to Camelot notation
key_profiles = {
'C': '8B', 'C#': '3B', 'D': '10B', 'D#': '5B',
'E': '12B', 'F': '7B', 'F#': '2B', 'G': '9B',
'G#': '4B', 'A': '11B', 'A#': '6B', 'B': '1B',
'Cm': '8A', 'C#m': '3A', 'Dm': '10A', 'D#m': '5A',
'Em': '12A', 'Fm': '7A', 'F#m': '2A', 'Gm': '9A',
'G#m': '4A', 'Am': '11A', 'A#m': '6A', 'Bm': '1A'
}
# Find best matching key (simplified - would need more sophisticated analysis)
max_chroma_idx = np.argmax(chroma_mean)
key_names = list(key_profiles.keys())
detected_key = key_names[max_chroma_idx % len(key_names)]
return key_profiles.get(detected_key, '8A') # Default to C major
def _calculate_energy_curve(self, y: np.ndarray, beat_frames: np.ndarray) -> np.ndarray:
"""Calculate energy curve for each beat"""
energy_curve = []
for i in range(len(beat_frames) - 1):
start_frame = beat_frames[i]
end_frame = beat_frames[i + 1] if i + 1 < len(beat_frames) else len(y)
# Calculate RMS energy for this beat
beat_segment = y[start_frame:end_frame]
energy = np.sqrt(np.mean(beat_segment ** 2))
energy_curve.append(energy)
return np.array(energy_curve)
async def calculate_beat_match(self, track1_beats: BeatInfo, track2_beats: BeatInfo) -> BeatMatch:
"""Calculate beat matching compatibility between two tracks"""
try:
# Tempo compatibility
tempo_ratio = track2_beats.tempo / track1_beats.tempo
tempo_compatibility = self._calculate_tempo_compatibility(tempo_ratio)
# Key compatibility
key_compatibility_score = self._calculate_key_compatibility(
track1_beats.key, track2_beats.key
)
# Energy matching
energy_match = self._calculate_energy_compatibility(
track1_beats.energy_curve, track2_beats.energy_curve
)
# Overall compatibility
compatibility_score = (
tempo_compatibility * 0.4 +
key_compatibility_score * 0.3 +
energy_match * 0.3
)
# Determine optimal transition type
transition_type = self._determine_transition_type(
tempo_compatibility, key_compatibility_score, energy_match
)
# Find optimal mix point
optimal_mix_point = self._find_optimal_mix_point(
track1_beats, track2_beats, transition_type
)
return BeatMatch(
compatibility_score=compatibility_score,
tempo_ratio=tempo_ratio,
key_compatibility=f"{track1_beats.key}{track2_beats.key}",
optimal_mix_point=optimal_mix_point,
transition_type=transition_type,
energy_match=energy_match
)
except Exception as e:
logger.error(f"Beat matching calculation failed: {e}")
raise
def _calculate_tempo_compatibility(self, tempo_ratio: float) -> float:
"""Calculate tempo compatibility score"""
# Perfect match (same tempo)
if 0.98 <= tempo_ratio <= 1.02:
return 1.0
# Harmonic ratios (2x, 0.5x)
if 0.49 <= tempo_ratio <= 0.51 or 1.98 <= tempo_ratio <= 2.02:
return 0.8
# Close match (within 10%)
if 0.9 <= tempo_ratio <= 1.1:
return 0.7
# Acceptable range (within 20%)
if 0.8 <= tempo_ratio <= 1.2:
return 0.5
# Poor compatibility
return 0.2
def _calculate_key_compatibility(self, key1: str, key2: str) -> float:
"""Calculate harmonic key compatibility"""
if key1 in self.key_compatibility_matrix and key2 in self.key_compatibility_matrix[key1]:
return self.key_compatibility_matrix[key1][key2]
return 0.1 # Very low compatibility for unknown keys
def _calculate_energy_compatibility(self, energy1: np.ndarray, energy2: np.ndarray) -> float:
"""Calculate energy curve compatibility"""
if len(energy1) == 0 or len(energy2) == 0:
return 0.5
# Normalize energy curves
energy1_norm = (energy1 - np.min(energy1)) / (np.max(energy1) - np.min(energy1))
energy2_norm = (energy2 - np.min(energy2)) / (np.max(energy2) - np.min(energy2))
# Calculate correlation
min_length = min(len(energy1_norm), len(energy2_norm))
correlation = np.corrcoef(energy1_norm[:min_length], energy2_norm[:min_length])[0, 1]
if np.isnan(correlation):
return 0.5
return (correlation + 1) / 2 # Normalize to 0-1 range
def _determine_transition_type(self, tempo_comp: float, key_comp: float, energy_comp: float) -> str:
"""Determine optimal transition type"""
overall_score = (tempo_comp + key_comp + energy_comp) / 3
if overall_score >= 0.8:
return "perfect_blend"
elif overall_score >= 0.6:
return "smooth_transition"
elif overall_score >= 0.4:
return "crossfade"
elif key_comp >= 0.7:
return "harmonic_mix"
else:
return "cut"
def _find_optimal_mix_point(self, track1_beats: BeatInfo, track2_beats: BeatInfo,
transition_type: str) -> float:
"""Find optimal mix point in track 1 (0-1, where 1 is end)"""
if transition_type == "perfect_blend":
# Mix near the end for perfect blend
return 0.85
elif transition_type == "smooth_transition":
# Earlier mix point for smooth transition
return 0.75
elif transition_type == "crossfade":
# Standard crossfade point
return 0.9
elif transition_type == "harmonic_mix":
# Mix at downbeat for harmonic compatibility
if len(track1_beats.downbeats) > 0:
last_downbeat_ratio = track1_beats.downbeats[-1] / len(track1_beats.beat_frames)
return min(last_downbeat_ratio + 0.1, 0.9)
return 0.8
else: # cut
# Quick cut near the end
return 0.95
async def create_beat_sync_transition(self, track1_path: str, track2_path: str,
beat_match: BeatMatch) -> Dict:
"""Create beat-synchronized transition parameters"""
try:
# Analyze both tracks
track1_beats = await self.analyze_beats(track1_path)
track2_beats = await self.analyze_beats(track2_path)
# Calculate transition parameters
transition_start = beat_match.optimal_mix_point
transition_duration = self._calculate_transition_duration(
beat_match.transition_type, track1_beats.tempo
)
# Calculate tempo adjustment
tempo_adjustment = 1.0 / beat_match.tempo_ratio if beat_match.tempo_ratio != 1.0 else 1.0
return {
'transition_type': beat_match.transition_type,
'start_point': transition_start,
'duration': transition_duration,
'tempo_adjustment': tempo_adjustment,
'key_change_needed': beat_match.key_compatibility,
'energy_ramp': self._calculate_energy_ramp(
track1_beats.energy_curve, track2_beats.energy_curve,
transition_start, transition_duration
),
'beat_markers': {
'track1_last_beat': transition_start,
'track2_first_beat': 0.0,
'sync_point': transition_start + transition_duration / 2
}
}
except Exception as e:
logger.error(f"Beat sync transition creation failed: {e}")
raise
def _calculate_transition_duration(self, transition_type: str, tempo: float) -> float:
"""Calculate transition duration based on type and tempo"""
beat_duration = 60.0 / tempo
if transition_type == "perfect_blend":
return beat_duration * 16 # 16 bars
elif transition_type == "smooth_transition":
return beat_duration * 8 # 8 bars
elif transition_type == "crossfade":
return beat_duration * 4 # 4 bars
elif transition_type == "harmonic_mix":
return beat_duration * 8 # 8 bars for harmonic transition
else: # cut
return beat_duration * 1 # 1 bar quick cut
def _calculate_energy_ramp(self, energy1: np.ndarray, energy2: np.ndarray,
start_point: float, duration: float) -> np.ndarray:
"""Calculate energy ramp for smooth transition"""
# Get energy values around transition point
start_idx = int(len(energy1) * start_point)
end_idx = min(start_idx + int(duration * len(energy1)), len(energy1))
if start_idx >= len(energy1):
return np.array([0.5]) # Default energy
track1_energy = energy1[start_idx:end_idx] if start_idx < len(energy1) else np.array([energy1[-1]])
# Create smooth ramp from track1 energy to track2 energy
track2_start_energy = energy2[0] if len(energy2) > 0 else 0.5
if len(track1_energy) == 0:
return np.linspace(0.5, track2_start_energy, 10)
ramp_points = max(len(track1_energy), 10)
energy_ramp = np.linspace(track1_energy[-1] if len(track1_energy) > 0 else 0.5,
track2_start_energy, ramp_points)
return energy_ramp