""" Spotify Metadata Client for SwingMusic Handles fetching metadata from Spotify API for catalog browsing and downloads """ import os import json import time import base64 import requests from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass from urllib.parse import urlencode from swingmusic.logger import log as logger @dataclass class SpotifyTrack: """Spotify track metadata""" id: str name: str artists: List[Dict[str, Any]] album: Dict[str, Any] duration_ms: int popularity: int preview_url: Optional[str] explicit: bool external_urls: Dict[str, str] track_number: int disc_number: int available_markets: List[str] @dataclass class SpotifyAlbum: """Spotify album metadata""" id: str name: str artists: List[Dict[str, Any]] release_date: str total_tracks: int popularity: int images: List[Dict[str, str]] external_urls: Dict[str, str] available_markets: List[str] album_type: str # album, single, compilation @dataclass class SpotifyArtist: """Spotify artist metadata""" id: str name: str popularity: int followers: Dict[str, int] genres: List[str] images: List[Dict[str, str]] external_urls: Dict[str, str] @dataclass class SpotifyPlaylist: """Spotify playlist metadata""" id: str name: str description: Optional[str] owner: Dict[str, Any] public: bool collaborative: bool tracks: Dict[str, Any] # Contains href, total, limit images: List[Dict[str, str]] external_urls: Dict[str, str] class SpotifyMetadataClient: """Client for accessing Spotify Web API for metadata""" def __init__(self): self.client_id = os.getenv('SPOTIFY_CLIENT_ID', '') self.client_secret = os.getenv('SPOTIFY_CLIENT_SECRET', '') self.access_token = None self.token_expires_at = 0 self.base_url = 'https://api.spotify.com/v1' self.rate_limit_remaining = 0 self.rate_limit_reset = 0 # Fallback to demo/public endpoints for development self.use_demo_mode = not (self.client_id and self.client_secret) if self.use_demo_mode: logger.warning("Spotify client credentials not configured, using demo mode") def _get_access_token(self) -> Optional[str]: """Get or refresh Spotify access token""" if self.use_demo_mode: return "demo_token" # Check if current token is still valid if self.access_token and time.time() < self.token_expires_at: return self.access_token try: # Request new token auth_string = base64.b64encode( f"{self.client_id}:{self.client_secret}".encode('utf-8') ).decode('utf-8') response = requests.post( 'https://accounts.spotify.com/api/token', headers={ 'Authorization': f'Basic {auth_string}', 'Content-Type': 'application/x-www-form-urlencoded' }, data='grant_type=client_credentials' ) if response.status_code == 200: data = response.json() self.access_token = data['access_token'] self.token_expires_at = time.time() + data['expires_in'] - 60 # 1 minute buffer logger.info("Successfully obtained Spotify access token") return self.access_token else: logger.error(f"Failed to get Spotify token: {response.status_code} {response.text}") return None except Exception as e: logger.error(f"Error getting Spotify access token: {e}") return None def _make_request(self, endpoint: str, params: Dict[str, Any] = None) -> Optional[Dict[str, Any]]: """Make authenticated request to Spotify API""" if self.use_demo_mode: return self._demo_response(endpoint, params) token = self._get_access_token() if not token: return None # Check rate limiting if self.rate_limit_remaining <= 0 and time.time() < self.rate_limit_reset: wait_time = self.rate_limit_reset - time.time() logger.warning(f"Rate limited, waiting {wait_time:.2f} seconds") time.sleep(wait_time) try: url = f"{self.base_url}/{endpoint.lstrip('/')}" if params: url += f"?{urlencode(params)}" response = requests.get( url, headers={ 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } ) # Update rate limit info self.rate_limit_remaining = int(response.headers.get('X-RateLimit-Remaining', 0)) self.rate_limit_reset = int(response.headers.get('X-RateLimit-Reset', 0)) if response.status_code == 200: return response.json() elif response.status_code == 429: retry_after = int(response.headers.get('Retry-After', 5)) logger.warning(f"Rate limited, retrying after {retry_after} seconds") time.sleep(retry_after) return self._make_request(endpoint, params) elif response.status_code == 401: # Token expired, refresh and retry self.access_token = None return self._make_request(endpoint, params) else: logger.error(f"Spotify API error: {response.status_code} {response.text}") return None except Exception as e: logger.error(f"Error making Spotify API request: {e}") return None def _demo_response(self, endpoint: str, params: Dict[str, Any] = None) -> Optional[Dict[str, Any]]: """Generate demo responses for development""" logger.info(f"Demo mode response for: {endpoint}") if 'tracks' in endpoint: track_id = endpoint.split('/')[-1] if '/' in endpoint else 'demo_track' return { 'id': track_id, 'name': f'Demo Track {track_id}', 'artists': [{'id': 'demo_artist', 'name': 'Demo Artist'}], 'album': { 'id': 'demo_album', 'name': 'Demo Album', 'images': [{'url': 'https://via.placeholder.com/300'}] }, 'duration_ms': 180000, 'popularity': 75, 'preview_url': None, 'explicit': False, 'external_urls': {'spotify': f'https://open.spotify.com/track/{track_id}'}, 'track_number': 1, 'disc_number': 1, 'available_markets': ['US', 'GB', 'DE'] } elif 'albums' in endpoint: album_id = endpoint.split('/')[-1] if '/' in endpoint else 'demo_album' return { 'id': album_id, 'name': f'Demo Album {album_id}', 'artists': [{'id': 'demo_artist', 'name': 'Demo Artist'}], 'release_date': '2024-01-01', 'total_tracks': 10, 'popularity': 70, 'images': [{'url': 'https://via.placeholder.com/300'}], 'external_urls': {'spotify': f'https://open.spotify.com/album/{album_id}'}, 'available_markets': ['US', 'GB', 'DE'], 'album_type': 'album', 'tracks': { 'items': [ { 'id': f'demo_track_{i}', 'name': f'Demo Track {i+1}', 'duration_ms': 180000, 'track_number': i+1, 'explicit': False } for i in range(10) ] } } elif 'artists' in endpoint: if 'albums' in endpoint: return { 'items': [ { 'id': f'demo_album_{i}', 'name': f'Demo Album {i+1}', 'release_date': '2024-01-01', 'total_tracks': 10, 'images': [{'url': 'https://via.placeholder.com/300'}], 'album_type': 'album' } for i in range(5) ] } elif 'top-tracks' in endpoint: return { 'tracks': [ { 'id': f'demo_track_{i}', 'name': f'Demo Track {i+1}', 'artists': [{'id': 'demo_artist', 'name': 'Demo Artist'}], 'album': { 'id': 'demo_album', 'name': 'Demo Album', 'images': [{'url': 'https://via.placeholder.com/300'}] }, 'duration_ms': 180000, 'popularity': 80 - i, 'preview_url': None, 'explicit': False, 'external_urls': {'spotify': f'https://open.spotify.com/track/demo_track_{i}'}, 'track_number': i+1 } for i in range(15) ] } else: artist_id = endpoint.split('/')[-1] if '/' in endpoint else 'demo_artist' return { 'id': artist_id, 'name': f'Demo Artist {artist_id}', 'popularity': 75, 'followers': {'total': 1000000}, 'genres': ['Demo Genre', 'Test Genre'], 'images': [{'url': 'https://via.placeholder.com/300'}], 'external_urls': {'spotify': f'https://open.spotify.com/artist/{artist_id}'} } elif 'search' in endpoint: query = params.get('q', '') if params else '' return { 'tracks': { 'items': [ { 'id': f'search_track_{i}', 'name': f'{query} Track {i+1}', 'artists': [{'id': 'search_artist', 'name': f'{query} Artist'}], 'album': { 'id': 'search_album', 'name': f'{query} Album', 'images': [{'url': 'https://via.placeholder.com/300'}] }, 'duration_ms': 180000, 'popularity': 70 - i, 'explicit': False } for i in range(min(params.get('limit', 20) if params else 20, 20)) ], 'total': 100 }, 'albums': { 'items': [ { 'id': f'search_album_{i}', 'name': f'{query} Album {i+1}', 'artists': [{'id': 'search_artist', 'name': f'{query} Artist'}], 'release_date': '2024-01-01', 'total_tracks': 10, 'images': [{'url': 'https://via.placeholder.com/300'}], 'album_type': 'album' } for i in range(min(params.get('limit', 20) if params else 20, 20)) ], 'total': 50 }, 'artists': { 'items': [ { 'id': f'search_artist_{i}', 'name': f'{query} Artist {i+1}', 'popularity': 70 - i, 'followers': {'total': 100000 * (i+1)}, 'genres': ['Search Genre'], 'images': [{'url': 'https://via.placeholder.com/300'}] } for i in range(min(params.get('limit', 20) if params else 20, 20)) ], 'total': 25 } } return None def get_track(self, track_id: str) -> Optional[SpotifyTrack]: """Get track by ID""" data = self._make_request(f'tracks/{track_id}') if not data: return None return SpotifyTrack( id=data['id'], name=data['name'], artists=data['artists'], album=data['album'], duration_ms=data['duration_ms'], popularity=data['popularity'], preview_url=data.get('preview_url'), explicit=data['explicit'], external_urls=data['external_urls'], track_number=data['track_number'], disc_number=data.get('disc_number', 1), available_markets=data.get('available_markets', []) ) def get_album(self, album_id: str) -> Optional[SpotifyAlbum]: """Get album by ID""" data = self._make_request(f'albums/{album_id}') if not data: return None return SpotifyAlbum( id=data['id'], name=data['name'], artists=data['artists'], release_date=data['release_date'], total_tracks=data['total_tracks'], popularity=data.get('popularity', 0), images=data['images'], external_urls=data['external_urls'], available_markets=data.get('available_markets', []), album_type=data['album_type'] ) def get_album_tracks(self, album_id: str, limit: int = 50, offset: int = 0) -> List[SpotifyTrack]: """Get tracks from album""" data = self._make_request(f'albums/{album_id}/tracks', { 'limit': limit, 'offset': offset }) if not data or 'items' not in data: return [] tracks = [] for item in data['items']: # Get full track details for each track track = self.get_track(item['id']) if track: tracks.append(track) return tracks def get_artist(self, artist_id: str) -> Optional[SpotifyArtist]: """Get artist by ID""" data = self._make_request(f'artists/{artist_id}') if not data: return None return SpotifyArtist( id=data['id'], name=data['name'], popularity=data['popularity'], followers=data['followers'], genres=data['genres'], images=data['images'], external_urls=data['external_urls'] ) def get_artist_albums(self, artist_id: str, limit: int = 20, include_groups: str = 'album,single') -> List[SpotifyAlbum]: """Get artist albums""" data = self._make_request(f'artists/{artist_id}/albums', { 'limit': limit, 'include_groups': include_groups }) if not data or 'items' not in data: return [] albums = [] for item in data['items']: album = SpotifyAlbum( id=item['id'], name=item['name'], artists=item['artists'], release_date=item['release_date'], total_tracks=item['total_tracks'], popularity=item.get('popularity', 0), images=item['images'], external_urls=item['external_urls'], available_markets=item.get('available_markets', []), album_type=item['album_type'] ) albums.append(album) return albums def get_artist_top_tracks(self, artist_id: str, market: str = 'US') -> List[SpotifyTrack]: """Get artist's top tracks""" data = self._make_request(f'artists/{artist_id}/top-tracks', { 'market': market }) if not data or 'tracks' not in data: return [] tracks = [] for item in data['tracks']: track = SpotifyTrack( id=item['id'], name=item['name'], artists=item['artists'], album=item['album'], duration_ms=item['duration_ms'], popularity=item['popularity'], preview_url=item.get('preview_url'), explicit=item['explicit'], external_urls=item['external_urls'], track_number=item.get('track_number', 1), disc_number=item.get('disc_number', 1), available_markets=item.get('available_markets', []) ) tracks.append(track) return tracks def get_related_artists(self, artist_id: str) -> List[SpotifyArtist]: """Get related artists""" data = self._make_request(f'artists/{artist_id}/related-artists') if not data or 'artists' not in data: return [] artists = [] for item in data['artists']: artist = SpotifyArtist( id=item['id'], name=item['name'], popularity=item['popularity'], followers=item['followers'], genres=item['genres'], images=item['images'], external_urls=item['external_urls'] ) artists.append(artist) return artists def search(self, query: str, search_type: str = 'track', limit: int = 20, offset: int = 0, market: str = 'US') -> Dict[str, List]: """Search for content""" types = search_type if search_type in ['track', 'album', 'artist', 'playlist'] else 'track' data = self._make_request('search', { 'q': query, 'type': types, 'limit': limit, 'offset': offset, 'market': market }) if not data: return {'tracks': [], 'albums': [], 'artists': [], 'playlists': []} result = {'tracks': [], 'albums': [], 'artists': [], 'playlists': []} # Process tracks if 'tracks' in data and 'items' in data['tracks']: for item in data['tracks']['items']: track = SpotifyTrack( id=item['id'], name=item['name'], artists=item['artists'], album=item['album'], duration_ms=item['duration_ms'], popularity=item['popularity'], preview_url=item.get('preview_url'), explicit=item['explicit'], external_urls=item['external_urls'], track_number=item.get('track_number', 1), disc_number=item.get('disc_number', 1), available_markets=item.get('available_markets', []) ) result['tracks'].append(track) # Process albums if 'albums' in data and 'items' in data['albums']: for item in data['albums']['items']: album = SpotifyAlbum( id=item['id'], name=item['name'], artists=item['artists'], release_date=item['release_date'], total_tracks=item['total_tracks'], popularity=item.get('popularity', 0), images=item['images'], external_urls=item['external_urls'], available_markets=item.get('available_markets', []), album_type=item['album_type'] ) result['albums'].append(album) # Process artists if 'artists' in data and 'items' in data['artists']: for item in data['artists']['items']: artist = SpotifyArtist( id=item['id'], name=item['name'], popularity=item['popularity'], followers=item['followers'], genres=item['genres'], images=item['images'], external_urls=item['external_urls'] ) result['artists'].append(artist) # Process playlists if 'playlists' in data and 'items' in data['playlists']: for item in data['playlists']['items']: playlist = SpotifyPlaylist( id=item['id'], name=item['name'], description=item.get('description'), owner=item['owner'], public=item.get('public', False), collaborative=item.get('collaborative', False), tracks=item['tracks'], images=item.get('images', []), external_urls=item['external_urls'] ) result['playlists'].append(playlist) return result # Global instance spotify_metadata_client = SpotifyMetadataClient()