Files
swingmusic-extended/src/swingmusic/api/spotify.py
T
Tomas Dvorak 38f1981283 Move backend files to root level for cleaner GitHub display
- Move all backend files from swingmusic/ to root level
- Backend files now display directly on GitHub repository page
- Keep client applications as submodules (swingmusic-android, swingmusic-desktop, swingmusic-webclient)
- Update README to reflect new structure (no cd swingmusic needed)
- Cleaner, more professional GitHub repository layout

Files moved to root:
- src/ (main source code)
- pyproject.toml, requirements.txt, run.py
- swingmusic.spec, uv.lock, version.txt
- services/

Result: GitHub shows backend files directly while maintaining organized structure
2026-03-17 22:37:49 +01:00

426 lines
12 KiB
Python

"""
Spotify Downloader API endpoints for SwingMusic
Provides REST API for Spotify URL downloading functionality
"""
from flask import Blueprint, request, jsonify
from flask_openapi3 import APIBlueprint, Tag
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
import asyncio
from swingmusic.services.spotify_downloader import spotify_downloader, DownloadSource
from swingmusic import logger
from swingmusic.utils import create_valid_filename
spotify_bp = APIBlueprint(
'spotify',
import_name='spotify',
url_prefix='/api/spotify'
)
class SpotifyURLRequest(BaseModel):
url: str = Field(..., description='Spotify URL (track, album, or playlist)')
quality: Optional[str] = Field('flac', description='Audio quality (flac, mp3_320, mp3_128)')
output_dir: Optional[str] = Field(None, description='Output directory (optional)')
class SpotifyMetadataResponse(BaseModel):
spotify_id: str
title: str
artist: str
album: str
duration_ms: int
image_url: str
release_date: str
track_number: int
total_tracks: int
is_explicit: bool
preview_url: Optional[str]
class DownloadItemResponse(BaseModel):
id: str
spotify_url: str
spotify_id: str
title: str
artist: str
album: str
duration_ms: int
image_url: str
quality: str
source: str
status: str
progress: int
file_path: Optional[str]
error_message: Optional[str]
created_at: float
started_at: Optional[float]
completed_at: Optional[float]
class QueueStatusResponse(BaseModel):
queue_length: int
active_downloads: int
pending_items: int
queue: List[DownloadItemResponse]
active: List[DownloadItemResponse]
history: List[DownloadItemResponse]
class ActionResponse(BaseModel):
success: bool
message: str
item_id: Optional[str] = None
@spotify_bp.post('/metadata', summary='Get Spotify metadata')
async def get_metadata(body: SpotifyURLRequest):
"""
Extract metadata from a Spotify URL without downloading
- **url**: Spotify URL for track, album, or playlist
- **quality**: Preferred audio quality (optional)
Returns metadata for the Spotify content.
"""
try:
metadata = await spotify_downloader.get_metadata(body.url)
if not metadata:
return jsonify({
'error': 'Invalid Spotify URL or failed to fetch metadata',
'success': False
}), 400
return jsonify({
'success': True,
'metadata': {
'spotify_id': metadata.spotify_id,
'title': metadata.title,
'artist': metadata.artist,
'album': metadata.album,
'duration_ms': metadata.duration_ms,
'image_url': metadata.image_url,
'release_date': metadata.release_date,
'track_number': metadata.track_number,
'total_tracks': metadata.total_tracks,
'is_explicit': metadata.is_explicit,
'preview_url': metadata.preview_url
}
})
except Exception as e:
logger.error(f"Error getting Spotify metadata: {e}")
return jsonify({
'error': str(e),
'success': False
}), 500
@spotify_bp.post('/download', summary='Download from Spotify URL')
async def download_from_url(body: SpotifyURLRequest):
"""
Add a Spotify URL to the download queue
- **url**: Spotify URL for track, album, or playlist
- **quality**: Audio quality preference (flac, mp3_320, mp3_128)
- **output_dir**: Custom output directory (optional)
Adds the item to the download queue and returns the download ID.
"""
try:
# Validate quality
valid_qualities = ['flac', 'mp3_320', 'mp3_128']
if body.quality not in valid_qualities:
return jsonify({
'error': f'Invalid quality. Must be one of: {", ".join(valid_qualities)}',
'success': False
}), 400
# Add to download queue
item_id = spotify_downloader.add_download(
spotify_url=body.url,
output_dir=body.output_dir,
quality=body.quality
)
if not item_id:
return jsonify({
'error': 'Failed to add download. Invalid URL or duplicate.',
'success': False
}), 400
return jsonify({
'success': True,
'message': 'Download added to queue',
'item_id': item_id
})
except Exception as e:
logger.error(f"Error adding download: {e}")
return jsonify({
'error': str(e),
'success': False
}), 500
@spotify_bp.get('/queue', summary='Get download queue status')
def get_queue_status():
"""
Get current status of the download queue
Returns information about queued items, active downloads, and history.
"""
try:
status = spotify_downloader.get_queue_status()
return jsonify({
'success': True,
'data': status
})
except Exception as e:
logger.error(f"Error getting queue status: {e}")
return jsonify({
'error': str(e),
'success': False
}), 500
@spotify_bp.post('/cancel/<item_id>', summary='Cancel download')
def cancel_download(item_id: str):
"""
Cancel a pending or active download
- **item_id**: ID of the download item to cancel
Returns success status of the cancellation.
"""
try:
success = spotify_downloader.cancel_download(item_id)
if success:
return jsonify({
'success': True,
'message': 'Download cancelled successfully'
})
else:
return jsonify({
'success': False,
'message': 'Download not found or cannot be cancelled'
}), 404
except Exception as e:
logger.error(f"Error cancelling download: {e}")
return jsonify({
'error': str(e),
'success': False
}), 500
@spotify_bp.post('/retry/<item_id>', summary='Retry failed download')
def retry_download(item_id: str):
"""
Retry a failed download
- **item_id**: ID of the failed download item to retry
Returns success status of the retry operation.
"""
try:
success = spotify_downloader.retry_download(item_id)
if success:
return jsonify({
'success': True,
'message': 'Download added to queue for retry'
})
else:
return jsonify({
'success': False,
'message': 'Download not found or cannot be retried'
}), 404
except Exception as e:
logger.error(f"Error retrying download: {e}")
return jsonify({
'error': str(e),
'success': False
}), 500
@spotify_bp.get('/sources', summary='Get available download sources')
def get_download_sources():
"""
Get list of available download sources and their status
Returns information about available download sources (Tidal, Qobuz, Amazon).
"""
try:
sources = []
for source in DownloadSource:
sources.append({
'name': source.value,
'display_name': source.value.title(),
'enabled': True, # In real implementation, check availability
'priority': list(DownloadSource).index(source)
})
return jsonify({
'success': True,
'sources': sources
})
except Exception as e:
logger.error(f"Error getting download sources: {e}")
return jsonify({
'error': str(e),
'success': False
}), 500
@spotify_bp.get('/qualities', summary='Get available audio qualities')
def get_audio_qualities():
"""
Get list of available audio qualities
Returns supported audio formats and quality options.
"""
try:
qualities = [
{
'id': 'flac',
'name': 'FLAC',
'description': 'Lossless audio quality',
'extension': 'flac',
'bitrate': 'Lossless'
},
{
'id': 'mp3_320',
'name': 'MP3 320kbps',
'description': 'High quality MP3',
'extension': 'mp3',
'bitrate': '320 kbps'
},
{
'id': 'mp3_128',
'name': 'MP3 128kbps',
'description': 'Standard quality MP3',
'extension': 'mp3',
'bitrate': '128 kbps'
}
]
return jsonify({
'success': True,
'qualities': qualities
})
except Exception as e:
logger.error(f"Error getting audio qualities: {e}")
return jsonify({
'error': str(e),
'success': False
}), 500
@spotify_bp.get('/history', summary='Get download history')
def get_download_history():
"""
Get download history
Returns paginated download history.
"""
try:
# Get query parameters
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 50))
status_filter = request.args.get('status', None)
# Get history from downloader
status = spotify_downloader.get_queue_status()
history = status.get('history', [])
# Apply status filter
if status_filter:
history = [item for item in history if item.get('status') == status_filter]
# Paginate
total = len(history)
start = (page - 1) * limit
end = start + limit
paginated_history = history[start:end]
return jsonify({
'success': True,
'data': {
'items': paginated_history,
'pagination': {
'page': page,
'limit': limit,
'total': total,
'pages': (total + limit - 1) // limit
}
}
})
except Exception as e:
logger.error(f"Error getting download history: {e}")
return jsonify({
'error': str(e),
'success': False
}), 500
@spotify_bp.delete('/clear-history', summary='Clear download history')
def clear_download_history():
"""
Clear download history
Removes all completed and failed downloads from history.
"""
try:
# Clear history in downloader
spotify_downloader.download_history.clear()
return jsonify({
'success': True,
'message': 'Download history cleared'
})
except Exception as e:
logger.error(f"Error clearing download history: {e}")
return jsonify({
'error': str(e),
'success': False
}), 500
# Error handlers
@spotify_bp.errorhandler(400)
def bad_request(error):
return jsonify({
'error': 'Bad request',
'message': str(error),
'success': False
}), 400
@spotify_bp.errorhandler(404)
def not_found(error):
return jsonify({
'error': 'Not found',
'message': str(error),
'success': False
}), 404
@spotify_bp.errorhandler(500)
def internal_error(error):
return jsonify({
'error': 'Internal server error',
'message': str(error),
'success': False
}), 500