mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
38f1981283
- Move all backend files from swingmusic/ to root level - Backend files now display directly on GitHub repository page - Keep client applications as submodules (swingmusic-android, swingmusic-desktop, swingmusic-webclient) - Update README to reflect new structure (no cd swingmusic needed) - Cleaner, more professional GitHub repository layout Files moved to root: - src/ (main source code) - pyproject.toml, requirements.txt, run.py - swingmusic.spec, uv.lock, version.txt - services/ Result: GitHub shows backend files directly while maintaining organized structure
608 lines
22 KiB
Python
608 lines
22 KiB
Python
# swingmusic/services/real_time_audio.py
|
|
import numpy as np
|
|
import librosa
|
|
import sounddevice as sd
|
|
from typing import Dict, List, Callable, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
from threading import Thread, Event
|
|
import queue
|
|
import logging
|
|
from scipy import signal
|
|
from scipy.io import wavfile
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class AudioConfig:
|
|
"""Audio processing configuration"""
|
|
sample_rate: int = 44100
|
|
buffer_size: int = 1024
|
|
channels: int = 2
|
|
dtype: str = 'float32'
|
|
block_size: int = 512
|
|
hop_length: int = 256
|
|
|
|
@dataclass
|
|
class AudioFeatures:
|
|
"""Real-time audio features"""
|
|
rms_energy: float
|
|
zero_crossing_rate: float
|
|
spectral_centroid: float
|
|
spectral_bandwidth: float
|
|
spectral_rolloff: float
|
|
mfcc: np.ndarray
|
|
chroma: np.ndarray
|
|
tempo: float
|
|
beat_phase: float
|
|
key_strength: np.ndarray
|
|
|
|
@dataclass
|
|
class AudioEvent:
|
|
"""Audio event for callbacks"""
|
|
timestamp: float
|
|
features: AudioFeatures
|
|
audio_data: np.ndarray
|
|
event_type: str
|
|
|
|
class RealTimeAudioProcessor:
|
|
"""Real-time audio processing engine for DJ features"""
|
|
|
|
def __init__(self, config: Optional[AudioConfig] = None):
|
|
self.config = config or AudioConfig()
|
|
self.is_running = False
|
|
self.audio_queue = queue.Queue()
|
|
self.feature_queue = queue.Queue()
|
|
self.event_callbacks: List[Callable] = []
|
|
|
|
# Audio processing components
|
|
self.beat_tracker = BeatTracker(self.config)
|
|
self.key_detector = KeyDetector(self.config)
|
|
self.effects_processor = EffectsProcessor(self.config)
|
|
|
|
# Threading
|
|
self.processing_thread = None
|
|
self.callback_thread = None
|
|
self.stop_event = Event()
|
|
|
|
# Audio buffers
|
|
self.input_buffer = np.zeros((self.config.buffer_size * 4, self.config.channels))
|
|
self.output_buffer = np.zeros((self.config.buffer_size * 4, self.config.channels))
|
|
self.buffer_index = 0
|
|
|
|
def add_event_callback(self, callback: Callable[[AudioEvent], None]):
|
|
"""Add callback for audio events"""
|
|
self.event_callbacks.append(callback)
|
|
|
|
def remove_event_callback(self, callback: Callable[[AudioEvent], None]):
|
|
"""Remove audio event callback"""
|
|
if callback in self.event_callbacks:
|
|
self.event_callbacks.remove(callback)
|
|
|
|
def start_processing(self):
|
|
"""Start real-time audio processing"""
|
|
if self.is_running:
|
|
logger.warning("Audio processing already running")
|
|
return
|
|
|
|
self.is_running = True
|
|
self.stop_event.clear()
|
|
|
|
# Start processing threads
|
|
self.processing_thread = Thread(target=self._processing_loop, daemon=True)
|
|
self.callback_thread = Thread(target=self._callback_loop, daemon=True)
|
|
|
|
self.processing_thread.start()
|
|
self.callback_thread.start()
|
|
|
|
logger.info("Real-time audio processing started")
|
|
|
|
def stop_processing(self):
|
|
"""Stop real-time audio processing"""
|
|
if not self.is_running:
|
|
return
|
|
|
|
self.is_running = False
|
|
self.stop_event.set()
|
|
|
|
# Wait for threads to finish
|
|
if self.processing_thread:
|
|
self.processing_thread.join(timeout=1.0)
|
|
if self.callback_thread:
|
|
self.callback_thread.join(timeout=1.0)
|
|
|
|
logger.info("Real-time audio processing stopped")
|
|
|
|
def process_audio_chunk(self, audio_data: np.ndarray):
|
|
"""Process incoming audio chunk"""
|
|
if not self.is_running:
|
|
return
|
|
|
|
try:
|
|
# Add to processing queue
|
|
self.audio_queue.put(audio_data, block=False)
|
|
except queue.Full:
|
|
logger.warning("Audio queue full, dropping chunk")
|
|
|
|
def _processing_loop(self):
|
|
"""Main audio processing loop"""
|
|
while self.is_running and not self.stop_event.is_set():
|
|
try:
|
|
# Get audio data with timeout
|
|
audio_data = self.audio_queue.get(timeout=0.1)
|
|
|
|
# Process audio
|
|
features = self._extract_features(audio_data)
|
|
|
|
# Create audio event
|
|
event = AudioEvent(
|
|
timestamp=self._get_timestamp(),
|
|
features=features,
|
|
audio_data=audio_data,
|
|
event_type='audio_features'
|
|
)
|
|
|
|
# Add to feature queue
|
|
self.feature_queue.put(event, block=False)
|
|
|
|
except queue.Empty:
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Audio processing error: {e}")
|
|
|
|
def _callback_loop(self):
|
|
"""Callback processing loop"""
|
|
while self.is_running and not self.stop_event.is_set():
|
|
try:
|
|
# Get event with timeout
|
|
event = self.feature_queue.get(timeout=0.1)
|
|
|
|
# Call all callbacks
|
|
for callback in self.event_callbacks:
|
|
try:
|
|
callback(event)
|
|
except Exception as e:
|
|
logger.error(f"Callback error: {e}")
|
|
|
|
except queue.Empty:
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Callback loop error: {e}")
|
|
|
|
def _extract_features(self, audio_data: np.ndarray) -> AudioFeatures:
|
|
"""Extract real-time audio features"""
|
|
try:
|
|
# Convert to mono if needed
|
|
if audio_data.shape[1] > 1:
|
|
audio_mono = np.mean(audio_data, axis=1)
|
|
else:
|
|
audio_mono = audio_data.flatten()
|
|
|
|
# Basic features
|
|
rms_energy = np.sqrt(np.mean(audio_mono ** 2))
|
|
zero_crossing_rate = librosa.feature.zero_crossing_rate(audio_mono)[0]
|
|
|
|
# Spectral features
|
|
spectral_centroids = librosa.feature.spectral_centroid(
|
|
y=audio_mono, sr=self.config.sample_rate
|
|
)[0]
|
|
spectral_bandwidth = librosa.feature.spectral_bandwidth(
|
|
y=audio_mono, sr=self.config.sample_rate
|
|
)[0]
|
|
spectral_rolloff = librosa.feature.spectral_rolloff(
|
|
y=audio_mono, sr=self.config.sample_rate
|
|
)[0]
|
|
|
|
# MFCC
|
|
mfcc = librosa.feature.mfcc(
|
|
y=audio_mono, sr=self.config.sample_rate, n_mfcc=13
|
|
)
|
|
|
|
# Chroma
|
|
chroma = librosa.feature.chroma_stft(
|
|
y=audio_mono, sr=self.config.sample_rate
|
|
)
|
|
|
|
# Tempo and beat tracking
|
|
tempo, beats = librosa.beat.beat_track(
|
|
y=audio_mono, sr=self.config.sample_rate, hop_length=self.config.hop_length
|
|
)
|
|
beat_phase = self._calculate_beat_phase(beats, len(audio_mono))
|
|
|
|
# Key strength
|
|
key_strength = np.mean(chroma, axis=1)
|
|
|
|
return AudioFeatures(
|
|
rms_energy=float(rms_energy),
|
|
zero_crossing_rate=float(np.mean(zero_crossing_rate)),
|
|
spectral_centroid=float(np.mean(spectral_centroids)),
|
|
spectral_bandwidth=float(np.mean(spectral_bandwidth)),
|
|
spectral_rolloff=float(np.mean(spectral_rolloff)),
|
|
mfcc=mfcc,
|
|
chroma=chroma,
|
|
tempo=float(tempo),
|
|
beat_phase=float(beat_phase),
|
|
key_strength=key_strength
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Feature extraction error: {e}")
|
|
# Return default features
|
|
return AudioFeatures(
|
|
rms_energy=0.0, zero_crossing_rate=0.0, spectral_centroid=0.0,
|
|
spectral_bandwidth=0.0, spectral_rolloff=0.0, mfcc=np.zeros((13, 1)),
|
|
chroma=np.zeros((12, 1)), tempo=120.0, beat_phase=0.0,
|
|
key_strength=np.zeros(12)
|
|
)
|
|
|
|
def _calculate_beat_phase(self, beats: np.ndarray, audio_length: int) -> float:
|
|
"""Calculate current beat phase"""
|
|
if len(beats) == 0:
|
|
return 0.0
|
|
|
|
# Find the most recent beat
|
|
current_frame = audio_length // self.config.hop_length
|
|
recent_beats = beats[beats < current_frame]
|
|
|
|
if len(recent_beats) == 0:
|
|
return 0.0
|
|
|
|
last_beat = recent_beats[-1]
|
|
beat_duration = 60.0 / 120.0 # Assume 120 BPM if no tempo detected
|
|
|
|
# Calculate phase within beat
|
|
frames_since_beat = current_frame - last_beat
|
|
time_since_beat = frames_since_beat * self.config.hop_length / self.config.sample_rate
|
|
|
|
phase = (time_since_beat % beat_duration) / beat_duration
|
|
return phase
|
|
|
|
def _get_timestamp(self) -> float:
|
|
"""Get current timestamp"""
|
|
import time
|
|
return time.time()
|
|
|
|
def apply_real_time_effect(self, audio_data: np.ndarray, effect_type: str,
|
|
params: Dict) -> np.ndarray:
|
|
"""Apply real-time audio effect"""
|
|
return self.effects_processor.process(audio_data, effect_type, params)
|
|
|
|
class BeatTracker:
|
|
"""Real-time beat tracking"""
|
|
|
|
def __init__(self, config: AudioConfig):
|
|
self.config = config
|
|
self.tempo_history = []
|
|
self.max_history = 10
|
|
|
|
def track_beat(self, audio_data: np.ndarray) -> Tuple[float, np.ndarray]:
|
|
"""Track beats in real-time audio"""
|
|
try:
|
|
# Convert to mono
|
|
if audio_data.shape[1] > 1:
|
|
audio_mono = np.mean(audio_data, axis=1)
|
|
else:
|
|
audio_mono = audio_data.flatten()
|
|
|
|
# Track tempo and beats
|
|
tempo, beats = librosa.beat.beat_track(
|
|
y=audio_mono, sr=self.config.sample_rate, hop_length=self.config.hop_length
|
|
)
|
|
|
|
# Update tempo history
|
|
self.tempo_history.append(tempo)
|
|
if len(self.tempo_history) > self.max_history:
|
|
self.tempo_history.pop(0)
|
|
|
|
# Use median tempo for stability
|
|
stable_tempo = np.median(self.tempo_history) if self.tempo_history else tempo
|
|
|
|
return float(stable_tempo), beats
|
|
|
|
except Exception as e:
|
|
logger.error(f"Beat tracking error: {e}")
|
|
return 120.0, np.array([])
|
|
|
|
class KeyDetector:
|
|
"""Real-time key detection"""
|
|
|
|
def __init__(self, config: AudioConfig):
|
|
self.config = config
|
|
self.key_history = []
|
|
self.max_history = 5
|
|
|
|
def detect_key(self, audio_data: np.ndarray) -> Tuple[str, float]:
|
|
"""Detect key in real-time audio"""
|
|
try:
|
|
# Convert to mono
|
|
if audio_data.shape[1] > 1:
|
|
audio_mono = np.mean(audio_data, axis=1)
|
|
else:
|
|
audio_mono = audio_data.flatten()
|
|
|
|
# Extract chroma
|
|
chroma = librosa.feature.chroma_stft(
|
|
y=audio_mono, sr=self.config.sample_rate
|
|
)
|
|
|
|
# Average chroma
|
|
chroma_mean = np.mean(chroma, axis=1)
|
|
|
|
# Simple key detection (would need more sophisticated implementation)
|
|
key_idx = np.argmax(chroma_mean)
|
|
key_names = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
|
|
detected_key = key_names[key_idx]
|
|
|
|
# Calculate confidence
|
|
confidence = np.max(chroma_mean) / np.sum(chroma_mean) if np.sum(chroma_mean) > 0 else 0.0
|
|
|
|
# Update history
|
|
self.key_history.append((detected_key, confidence))
|
|
if len(self.key_history) > self.max_history:
|
|
self.key_history.pop(0)
|
|
|
|
# Use most frequent key
|
|
if self.key_history:
|
|
keys = [k for k, _ in self.key_history]
|
|
most_common_key = max(set(keys), key=keys.count)
|
|
avg_confidence = np.mean([c for _, c in self.key_history if k == most_common_key])
|
|
return most_common_key, avg_confidence
|
|
|
|
return detected_key, confidence
|
|
|
|
except Exception as e:
|
|
logger.error(f"Key detection error: {e}")
|
|
return 'C', 0.0
|
|
|
|
class EffectsProcessor:
|
|
"""Real-time audio effects processor"""
|
|
|
|
def __init__(self, config: AudioConfig):
|
|
self.config = config
|
|
|
|
def process(self, audio_data: np.ndarray, effect_type: str, params: Dict) -> np.ndarray:
|
|
"""Process audio with specified effect"""
|
|
try:
|
|
if effect_type == 'reverb':
|
|
return self._apply_reverb(audio_data, params)
|
|
elif effect_type == 'delay':
|
|
return self._apply_delay(audio_data, params)
|
|
elif effect_type == 'filter':
|
|
return self._apply_filter(audio_data, params)
|
|
elif effect_type == 'eq':
|
|
return self._apply_eq(audio_data, params)
|
|
elif effect_type == 'compressor':
|
|
return self._apply_compressor(audio_data, params)
|
|
elif effect_type == 'distortion':
|
|
return self._apply_distortion(audio_data, params)
|
|
else:
|
|
return audio_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Effect processing error: {e}")
|
|
return audio_data
|
|
|
|
def _apply_reverb(self, audio_data: np.ndarray, params: Dict) -> np.ndarray:
|
|
"""Apply reverb effect"""
|
|
delay_time = params.get('delay_time', 0.03)
|
|
decay = params.get('decay', 0.5)
|
|
mix = params.get('mix', 0.3)
|
|
|
|
# Simple reverb using delay and feedback
|
|
delay_samples = int(delay_time * self.config.sample_rate)
|
|
|
|
if delay_samples >= len(audio_data):
|
|
return audio_data
|
|
|
|
# Create delayed version
|
|
delayed = np.zeros_like(audio_data)
|
|
delayed[delay_samples:] = audio_data[:-delay_samples] * decay
|
|
|
|
# Mix with original
|
|
return audio_data * (1 - mix) + delayed * mix
|
|
|
|
def _apply_delay(self, audio_data: np.ndarray, params: Dict) -> np.ndarray:
|
|
"""Apply delay effect"""
|
|
delay_time = params.get('delay_time', 0.25)
|
|
feedback = params.get('feedback', 0.4)
|
|
mix = params.get('mix', 0.3)
|
|
|
|
delay_samples = int(delay_time * self.config.sample_rate)
|
|
|
|
if delay_samples >= len(audio_data):
|
|
return audio_data
|
|
|
|
# Create delayed signal with feedback
|
|
delayed = np.zeros_like(audio_data)
|
|
delayed[delay_samples:] = audio_data[:-delay_samples]
|
|
|
|
# Add feedback
|
|
for i in range(delay_samples, len(audio_data)):
|
|
delayed[i] += delayed[i - delay_samples] * feedback
|
|
|
|
# Mix with original
|
|
return audio_data * (1 - mix) + delayed * mix
|
|
|
|
def _apply_filter(self, audio_data: np.ndarray, params: Dict) -> np.ndarray:
|
|
"""Apply filter effect"""
|
|
filter_type = params.get('type', 'lowpass')
|
|
cutoff = params.get('cutoff', 1000)
|
|
order = params.get('order', 4)
|
|
|
|
nyquist = self.config.sample_rate / 2
|
|
normalized_cutoff = cutoff / nyquist
|
|
|
|
if filter_type == 'lowpass':
|
|
b, a = signal.butter(order, normalized_cutoff, btype='low')
|
|
elif filter_type == 'highpass':
|
|
b, a = signal.butter(order, normalized_cutoff, btype='high')
|
|
elif filter_type == 'bandpass':
|
|
low = params.get('low', 500) / nyquist
|
|
high = params.get('high', 2000) / nyquist
|
|
b, a = signal.butter(order, [low, high], btype='band')
|
|
else:
|
|
return audio_data
|
|
|
|
# Apply filter to each channel
|
|
filtered = np.zeros_like(audio_data)
|
|
for ch in range(audio_data.shape[1]):
|
|
filtered[:, ch] = signal.filtfilt(b, a, audio_data[:, ch])
|
|
|
|
return filtered
|
|
|
|
def _apply_eq(self, audio_data: np.ndarray, params: Dict) -> np.ndarray:
|
|
"""Apply EQ effect"""
|
|
# Simple 3-band EQ
|
|
low_gain = params.get('low_gain', 0) # dB
|
|
mid_gain = params.get('mid_gain', 0) # dB
|
|
high_gain = params.get('high_gain', 0) # dB
|
|
|
|
# Convert dB to linear
|
|
low_gain_lin = 10 ** (low_gain / 20)
|
|
mid_gain_lin = 10 ** (mid_gain / 20)
|
|
high_gain_lin = 10 ** (high_gain / 20)
|
|
|
|
# Apply simple EQ (would need more sophisticated implementation)
|
|
result = audio_data.copy()
|
|
|
|
# Apply gains (simplified - real EQ would use filters)
|
|
result *= (low_gain_lin + mid_gain_lin + high_gain_lin) / 3
|
|
|
|
return result
|
|
|
|
def _apply_compressor(self, audio_data: np.ndarray, params: Dict) -> np.ndarray:
|
|
"""Apply compressor effect"""
|
|
threshold = params.get('threshold', 0.7)
|
|
ratio = params.get('ratio', 4)
|
|
attack = params.get('attack', 0.003)
|
|
release = params.get('release', 0.1)
|
|
|
|
# Simple compressor implementation
|
|
result = audio_data.copy()
|
|
|
|
for ch in range(audio_data.shape[1]):
|
|
channel_data = audio_data[:, ch]
|
|
|
|
# Calculate envelope
|
|
envelope = np.abs(channel_data)
|
|
|
|
# Apply gain reduction
|
|
gain_reduction = np.where(
|
|
envelope > threshold,
|
|
1 - (envelope - threshold) * (1 - 1/ratio) / envelope,
|
|
1.0
|
|
)
|
|
|
|
# Smooth gain reduction
|
|
gain_reduction = self._smooth_gain(gain_reduction, attack, release)
|
|
|
|
# Apply gain reduction
|
|
result[:, ch] *= gain_reduction
|
|
|
|
return result
|
|
|
|
def _apply_distortion(self, audio_data: np.ndarray, params: Dict) -> np.ndarray:
|
|
"""Apply distortion effect"""
|
|
drive = params.get('drive', 5)
|
|
mix = params.get('mix', 0.5)
|
|
|
|
# Apply distortion
|
|
distorted = np.tanh(audio_data * drive)
|
|
|
|
# Mix with original
|
|
return audio_data * (1 - mix) + distorted * mix
|
|
|
|
def _smooth_gain(self, gain_reduction: np.ndarray, attack: float, release: float) -> np.ndarray:
|
|
"""Smooth gain reduction with attack and release"""
|
|
# Simplified gain smoothing
|
|
smoothed = np.zeros_like(gain_reduction)
|
|
smoothed[0] = gain_reduction[0]
|
|
|
|
attack_coeff = np.exp(-1.0 / (attack * self.config.sample_rate))
|
|
release_coeff = np.exp(-1.0 / (release * self.config.sample_rate))
|
|
|
|
for i in range(1, len(gain_reduction)):
|
|
if gain_reduction[i] < smoothed[i-1]:
|
|
# Attack
|
|
smoothed[i] = attack_coeff * smoothed[i-1] + (1 - attack_coeff) * gain_reduction[i]
|
|
else:
|
|
# Release
|
|
smoothed[i] = release_coeff * smoothed[i-1] + (1 - release_coeff) * gain_reduction[i]
|
|
|
|
return smoothed
|
|
|
|
class AudioStreamManager:
|
|
"""Manage audio input/output streams"""
|
|
|
|
def __init__(self, processor: RealTimeAudioProcessor):
|
|
self.processor = processor
|
|
self.input_stream = None
|
|
self.output_stream = None
|
|
|
|
def start_input_stream(self, device_id: Optional[int] = None):
|
|
"""Start audio input stream"""
|
|
try:
|
|
self.input_stream = sd.InputStream(
|
|
samplerate=self.processor.config.sample_rate,
|
|
channels=self.processor.config.channels,
|
|
dtype=self.processor.config.dtype,
|
|
blocksize=self.processor.config.block_size,
|
|
device=device_id,
|
|
callback=self._input_callback
|
|
)
|
|
self.input_stream.start()
|
|
logger.info("Audio input stream started")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start input stream: {e}")
|
|
raise
|
|
|
|
def stop_input_stream(self):
|
|
"""Stop audio input stream"""
|
|
if self.input_stream:
|
|
self.input_stream.stop()
|
|
self.input_stream.close()
|
|
self.input_stream = None
|
|
logger.info("Audio input stream stopped")
|
|
|
|
def start_output_stream(self, device_id: Optional[int] = None):
|
|
"""Start audio output stream"""
|
|
try:
|
|
self.output_stream = sd.OutputStream(
|
|
samplerate=self.processor.config.sample_rate,
|
|
channels=self.processor.config.channels,
|
|
dtype=self.processor.config.dtype,
|
|
blocksize=self.processor.config.block_size,
|
|
device=device_id,
|
|
callback=self._output_callback
|
|
)
|
|
self.output_stream.start()
|
|
logger.info("Audio output stream started")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start output stream: {e}")
|
|
raise
|
|
|
|
def stop_output_stream(self):
|
|
"""Stop audio output stream"""
|
|
if self.output_stream:
|
|
self.output_stream.stop()
|
|
self.output_stream.close()
|
|
self.output_stream = None
|
|
logger.info("Audio output stream stopped")
|
|
|
|
def _input_callback(self, indata, frames, time, status):
|
|
"""Audio input callback"""
|
|
if status:
|
|
logger.warning(f"Input stream status: {status}")
|
|
|
|
# Process incoming audio
|
|
self.processor.process_audio_chunk(indata)
|
|
|
|
def _output_callback(self, outdata, frames, time, status):
|
|
"""Audio output callback"""
|
|
if status:
|
|
logger.warning(f"Output stream status: {status}")
|
|
|
|
# Generate output (would need audio source)
|
|
outdata.fill(0) # Silence for now
|