Move server code to this repo (#95)

move server code to this repo
This commit is contained in:
Mungai Njoroge
2023-01-13 20:01:52 +03:00
committed by GitHub
parent dd257e919d
commit 198957bcae
318 changed files with 6259 additions and 16797 deletions
View File
+30
View File
@@ -0,0 +1,30 @@
"""
This module combines all API blueprints into a single Flask app instance.
"""
from flask import Flask
from flask_cors import CORS
from app.api import album, artist, favorites, folder, playlist, search, track
from app.imgserver import imgbp as imgserver
def create_api():
"""
Creates the Flask instance, registers modules and registers all the API blueprints.
"""
app = Flask(__name__, static_url_path="")
CORS(app)
with app.app_context():
app.register_blueprint(album.albumbp)
app.register_blueprint(artist.artistbp)
app.register_blueprint(track.trackbp)
app.register_blueprint(search.searchbp)
app.register_blueprint(folder.folderbp)
app.register_blueprint(playlist.playlistbp)
app.register_blueprint(favorites.favbp)
app.register_blueprint(imgserver)
return app
+151
View File
@@ -0,0 +1,151 @@
"""
Contains all the album routes.
"""
from dataclasses import asdict
from flask import Blueprint, request
from app import utils
from app.db.sqlite.albums import SQLiteAlbumMethods as adb
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.store import Store
from app.models import FavType, Track
get_album_by_id = adb.get_album_by_id
get_albums_by_albumartist = adb.get_albums_by_albumartist
check_is_fav = favdb.check_is_favorite
albumbp = Blueprint("album", __name__, url_prefix="")
@albumbp.route("/album", methods=["POST"])
def get_album():
"""Returns all the tracks in the given album."""
data = request.get_json()
error_msg = {"msg": "No hash provided"}
if data is None:
return error_msg, 400
try:
albumhash = data["hash"]
except KeyError:
return error_msg, 400
error_msg = {"error": "Album not created yet."}
album = Store.get_album_by_hash(albumhash)
if album is None:
return error_msg, 204
tracks = Store.get_tracks_by_albumhash(albumhash)
if tracks is None:
return error_msg, 404
if len(tracks) == 0:
return error_msg, 204
def get_album_genres(tracks: list[Track]):
genres = set()
for track in tracks:
if track.genre is not None:
genres.update(track.genre)
return list(genres)
album.genres = get_album_genres(tracks)
tracks = utils.remove_duplicates(tracks)
album.count = len(tracks)
for track in tracks:
if track.date != "Unknown":
album.date = track.date
break
try:
album.duration = sum((t.duration for t in tracks))
except AttributeError:
album.duration = 0
if (
album.count == 1
and tracks[0].title == album.title
# and tracks[0].track == 1
# and tracks[0].disc == 1
):
album.is_single = True
else:
album.check_type()
album.is_favorite = check_is_fav(albumhash, FavType.album)
return {"tracks": tracks, "info": album}
@albumbp.route("/album/<albumhash>/tracks", methods=["GET"])
def get_album_tracks(albumhash: str):
"""
Returns all the tracks in the given album.
"""
tracks = Store.get_tracks_by_albumhash(albumhash)
tracks = [asdict(t) for t in tracks]
for t in tracks:
track = str(t["track"]).zfill(3)
t["pos"] = int(f"{t['disc']}{track}")
tracks = sorted(tracks, key=lambda t: t["pos"])
return {"tracks": tracks}
@albumbp.route("/album/from-artist", methods=["POST"])
def get_artist_albums():
data = request.get_json()
if data is None:
return {"msg": "No albumartist provided"}
albumartists: str = data["albumartists"] # type: ignore
limit: int = data.get("limit")
exclude: str = data.get("exclude")
albumartists: list[str] = albumartists.split(",") # type: ignore
albums = [
{
"artisthash": a,
"albums": Store.get_albums_by_albumartist(a, limit, exclude=exclude),
}
for a in albumartists
]
albums = [a for a in albums if len(a["albums"]) > 0]
return {"data": albums}
# @album_bp.route("/album/bio", methods=["POST"])
# def get_album_bio():
# """Returns the album bio for the given album."""
# data = request.get_json()
# album_hash = data["hash"]
# err_msg = {"bio": "No bio found"}
# album = instances.album_instance.find_album_by_hash(album_hash)
# if album is None:
# return err_msg, 404
# bio = FetchAlbumBio(album["title"], album["artist"])()
# if bio is None:
# return err_msg, 404
# return {"bio": bio}
+323
View File
@@ -0,0 +1,323 @@
"""
Contains all the artist(s) routes.
"""
from collections import deque
from flask import Blueprint, request
from app.db.store import Store
from app.models import Album, FavType, Track
from app.utils import remove_duplicates
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
artistbp = Blueprint("artist", __name__, url_prefix="/")
class CacheEntry:
"""
The cache entry class for the artists cache.
"""
def __init__(
self, artisthash: str, albumhashes: set[str], tracks: list[Track]
) -> None:
self.albums: list[Album] = []
self.tracks: list[Track] = []
self.artisthash: str = artisthash
self.albumhashes: set[str] = albumhashes
if len(tracks) > 0:
self.tracks: list[Track] = tracks
self.type_checked = False
self.albums_fetched = False
class ArtistsCache:
"""
Holds artist page cache.
"""
artists: deque[CacheEntry] = deque(maxlen=6)
@classmethod
def get_albums_by_artisthash(cls, artisthash: str):
"""
Returns the cached albums for the given artisthash.
"""
for (index, albums) in enumerate(cls.artists):
if albums.artisthash == artisthash:
return (albums.albums, index)
return ([], -1)
@classmethod
def albums_cached(cls, artisthash: str) -> bool:
"""
Returns True if the artist is in the cache.
"""
for entry in cls.artists:
if entry.artisthash == artisthash and len(entry.albums) > 0:
return True
return False
@classmethod
def albums_fetched(cls, artisthash: str):
"""
Checks if the albums have been fetched for the given artisthash.
"""
for entry in cls.artists:
if entry.artisthash == artisthash:
return entry.albums_fetched
@classmethod
def tracks_cached(cls, artisthash: str) -> bool:
"""
Checks if the tracks have been cached for the given artisthash.
"""
for entry in cls.artists:
if entry.artisthash == artisthash and len(entry.tracks) > 0:
return True
return False
@classmethod
def add_entry(cls, artisthash: str, albumhashes: set[str], tracks: list[Track]):
"""
Adds a new entry to the cache.
"""
cls.artists.append(CacheEntry(artisthash, albumhashes, tracks))
@classmethod
def get_tracks(cls, artisthash: str):
"""
Returns the cached tracks for the given artisthash.
"""
entry = [a for a in cls.artists if a.artisthash == artisthash][0]
return entry.tracks
@classmethod
def get_albums(cls, artisthash: str):
"""
Returns the cached albums for the given artisthash.
"""
entry = [a for a in cls.artists if a.artisthash == artisthash][0]
albums = [Store.get_album_by_hash(h) for h in entry.albumhashes]
entry.albums = [album for album in albums if album is not None]
store_albums = Store.get_albums_by_artisthash(artisthash)
all_albums_hash = "-".join([a.albumhash for a in entry.albums])
for album in store_albums:
if album.albumhash not in all_albums_hash:
entry.albums.append(album)
entry.albums_fetched = True
@classmethod
def process_album_type(cls, artisthash: str):
"""
Checks the cached albums type for the given artisthash.
"""
entry = [a for a in cls.artists if a.artisthash == artisthash][0]
for album in entry.albums:
album.check_type()
album_tracks = Store.get_tracks_by_albumhash(album.albumhash)
album_tracks = remove_duplicates(album_tracks)
album.check_is_single(album_tracks)
entry.type_checked = True
def add_albums_to_cache(artisthash: str):
"""
Fetches albums and adds them to the cache.
"""
tracks = Store.get_tracks_by_artist(artisthash)
if len(tracks) == 0:
return False
albumhashes = set(t.albumhash for t in tracks)
ArtistsCache.add_entry(artisthash, albumhashes, [])
return True
# =======================================================
# ===================== ROUTES ==========================
# =======================================================
@artistbp.route("/artist/<artisthash>", methods=["GET"])
def get_artist(artisthash: str):
"""
Get artist data.
"""
limit = request.args.get("limit")
if limit is None:
limit = 6
limit = int(limit)
artist = Store.get_artist_by_hash(artisthash)
if artist is None:
return {"error": "Artist not found"}, 404
tracks_cached = ArtistsCache.tracks_cached(artisthash)
if tracks_cached:
tracks = ArtistsCache.get_tracks(artisthash)
else:
tracks = Store.get_tracks_by_artist(artisthash)
albumhashes = set(t.albumhash for t in tracks)
hashes_from_albums = set(
a.albumhash for a in Store.get_albums_by_artisthash(artisthash)
)
albumhashes = albumhashes.union(hashes_from_albums)
ArtistsCache.add_entry(artisthash, albumhashes, tracks)
tcount = len(tracks)
acount = Store.count_albums_by_artisthash(artisthash)
if acount == 0 and tcount < 10:
limit = tcount
artist.trackcount = tcount
artist.albumcount = acount
artist.duration = sum(t.duration for t in tracks)
artist.is_favorite = favdb.check_is_favorite(artisthash, FavType.artist)
return {"artist": artist, "tracks": tracks[:limit]}
@artistbp.route("/artist/<artisthash>/albums", methods=["GET"])
def get_artist_albums(artisthash: str):
limit = request.args.get("limit")
if limit is None:
limit = 6
return_all = request.args.get("all")
limit = int(limit)
all_albums = []
is_cached = ArtistsCache.albums_cached(artisthash)
if not is_cached:
add_albums_to_cache(artisthash)
albums_fetched = ArtistsCache.albums_fetched(artisthash)
if not albums_fetched:
ArtistsCache.get_albums(artisthash)
all_albums, index = ArtistsCache.get_albums_by_artisthash(artisthash)
if not ArtistsCache.artists[index].type_checked:
ArtistsCache.process_album_type(artisthash)
singles = [a for a in all_albums if a.is_single]
eps = [a for a in all_albums if a.is_EP]
def remove_EPs_and_singles(albums: list[Album]):
albums = [a for a in albums if not a.is_EP]
albums = [a for a in albums if not a.is_single]
return albums
albums = filter(lambda a: artisthash in a.albumartisthash, all_albums)
albums = list(albums)
albums = remove_EPs_and_singles(albums)
appearances = filter(lambda a: artisthash not in a.albumartisthash, all_albums)
appearances = list(appearances)
appearances = remove_EPs_and_singles(appearances)
artist = Store.get_artist_by_hash(artisthash)
if return_all is not None:
limit = len(all_albums)
return {
"artistname": artist.name,
"albums": albums[:limit],
"singles": singles[:limit],
"eps": eps[:limit],
"appearances": appearances[:limit],
}
@artistbp.route("/artist/<artisthash>/tracks", methods=["GET"])
def get_artist_tracks(artisthash: str):
"""
Returns all artists by a given artist.
"""
tracks = Store.get_tracks_by_artist(artisthash)
return {"tracks": tracks}
# artist = Store.get_artist_by_hash(artisthash)
# if artist is None:
# return {"error": "Artist not found"}, 404
# return {"albums": albums[:limit]}
# @artist_bp.route("/artist/<artist>")
# @cache.cached()
# def get_artist_data(artist: str):
# """Returns the artist's data, tracks and albums"""
# artist = urllib.parse.unquote(artist)
# artist_obj = instances.artist_instance.get_artists_by_name(artist)
# def get_artist_tracks():
# songs = instances.tracks_instance.find_songs_by_artist(artist)
# return songs
# artist_songs = get_artist_tracks()
# songs = utils.remove_duplicates(artist_songs)
# def get_artist_albums():
# artist_albums = []
# albums_with_count = []
# albums = instances.tracks_instance.find_songs_by_albumartist(artist)
# for song in albums:
# if song["album"] not in artist_albums:
# artist_albums.append(song["album"])
# for album in artist_albums:
# count = 0
# length = 0
# for song in artist_songs:
# if song["album"] == album:
# count = count + 1
# length = length + song["length"]
# album_ = {"title": album, "count": count, "length": length}
# albums_with_count.append(album_)
# return albums_with_count
# return {
# "artist": artist_obj,
# "songs": songs,
# "albums": get_artist_albums()
# }
+210
View File
@@ -0,0 +1,210 @@
from flask import Blueprint, request
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.store import Store
from app.models import FavType
from app.utils import UseBisection
favbp = Blueprint("favorite", __name__, url_prefix="/")
def remove_none(items: list):
return [i for i in items if i is not None]
@favbp.route("/favorite/add", methods=["POST"])
def add_favorite():
"""
Adds a favorite to the database.
"""
data = request.get_json()
if data is None:
return {"error": "No data provided"}, 400
itemhash = data.get("hash")
itemtype = data.get("type")
favdb.insert_one_favorite(itemtype, itemhash)
if itemtype == FavType.track:
Store.add_fav_track(itemhash)
return {"msg": "Added to favorites"}
@favbp.route("/favorite/remove", methods=["POST"])
def remove_favorite():
"""
Removes a favorite from the database.
"""
data = request.get_json()
if data is None:
return {"error": "No data provided"}, 400
itemhash = data.get("hash")
itemtype = data.get("type")
favdb.delete_favorite(itemtype, itemhash)
if itemtype == FavType.track:
Store.remove_fav_track(itemhash)
return {"msg": "Removed from favorites"}
@favbp.route("/albums/favorite")
def get_favorite_albums():
limit = request.args.get("limit")
if limit is None:
limit = 6
limit = int(limit)
albums = favdb.get_fav_albums()
albumhashes = [a[1] for a in albums]
albumhashes.reverse()
src_albums = sorted(Store.albums, key=lambda x: x.albumhash)
fav_albums = UseBisection(src_albums, "albumhash", albumhashes)()
fav_albums = remove_none(fav_albums)
if limit == 0:
limit = len(albums)
return {"albums": fav_albums[:limit]}
@favbp.route("/tracks/favorite")
def get_favorite_tracks():
limit = request.args.get("limit")
if limit is None:
limit = 6
limit = int(limit)
tracks = favdb.get_fav_tracks()
trackhashes = [t[1] for t in tracks]
trackhashes.reverse()
src_tracks = sorted(Store.tracks, key=lambda x: x.trackhash)
tracks = UseBisection(src_tracks, "trackhash", trackhashes)()
tracks = remove_none(tracks)
if limit == 0:
limit = len(tracks)
return {"tracks": tracks[:limit]}
@favbp.route("/artists/favorite")
def get_favorite_artists():
limit = request.args.get("limit")
if limit is None:
limit = 6
limit = int(limit)
artists = favdb.get_fav_artists()
artisthashes = [a[1] for a in artists]
artisthashes.reverse()
src_artists = sorted(Store.artists, key=lambda x: x.artisthash)
artists = UseBisection(src_artists, "artisthash", artisthashes)()
artists = remove_none(artists)
if limit == 0:
limit = len(artists)
return {"artists": artists[:limit]}
@favbp.route("/favorites")
def get_all_favorites():
"""
Returns all the favorites in the database.
"""
track_limit = request.args.get("track_limit")
album_limit = request.args.get("album_limit")
artist_limit = request.args.get("artist_limit")
if track_limit is None:
track_limit = 6
if album_limit is None:
album_limit = 6
if artist_limit is None:
artist_limit = 6
track_limit = int(track_limit)
album_limit = int(album_limit)
artist_limit = int(artist_limit)
favs = favdb.get_all()
favs.reverse()
tracks = []
albums = []
artists = []
for fav in favs:
if (
len(tracks) >= track_limit
and len(albums) >= album_limit
and len(artists) >= artist_limit
):
break
if fav[2] == FavType.track:
tracks.append(fav[1])
elif fav[2] == FavType.album:
albums.append(fav[1])
elif fav[2] == FavType.artist:
artists.append(fav[1])
src_tracks = sorted(Store.tracks, key=lambda x: x.trackhash)
src_albums = sorted(Store.albums, key=lambda x: x.albumhash)
src_artists = sorted(Store.artists, key=lambda x: x.artisthash)
tracks = tracks[:track_limit]
albums = albums[:album_limit]
artists = artists[:artist_limit]
tracks = UseBisection(src_tracks, "trackhash", tracks)()
albums = UseBisection(src_albums, "albumhash", albums)()
artists = UseBisection(src_artists, "artisthash", artists)()
tracks = remove_none(tracks)
albums = remove_none(albums)
artists = remove_none(artists)
return {
"tracks": tracks,
"albums": albums,
"artists": artists,
}
@favbp.route("/favorites/check")
def check_favorite():
"""
Checks if a favorite exists in the database.
"""
itemhash = request.args.get("hash")
itemtype = request.args.get("type")
if itemhash is None:
return {"error": "No hash provided"}, 400
if itemtype is None:
return {"error": "No type provided"}, 400
exists = favdb.check_is_favorite(itemhash, itemtype)
return {"is_favorite": exists}
+32
View File
@@ -0,0 +1,32 @@
"""
Contains all the folder routes.
"""
from flask import Blueprint, request
from app import settings
from app.lib.folderslib import GetFilesAndDirs
folderbp = Blueprint("folder", __name__, url_prefix="/")
@folderbp.route("/folder", methods=["POST"])
def get_folder_tree():
"""
Returns a list of all the folders and tracks in the given folder.
"""
data = request.get_json()
if data is not None:
req_dir: str = data["folder"]
else:
req_dir = settings.HOME_DIR
if req_dir == "$home":
req_dir = settings.HOME_DIR
tracks, folders = GetFilesAndDirs(req_dir)()
return {
"tracks": tracks,
"folders": sorted(folders, key=lambda i: i.name),
}
+226
View File
@@ -0,0 +1,226 @@
"""
All playlist-related routes.
"""
import json
from datetime import datetime
from flask import Blueprint, request
from PIL import UnidentifiedImageError
from app import models, serializer
from app.db.sqlite.playlists import SQLitePlaylistMethods
from app.db.store import Store
from app.lib import playlistlib
from app.utils import create_new_date, remove_duplicates
playlistbp = Blueprint("playlist", __name__, url_prefix="/")
PL = SQLitePlaylistMethods
insert_one_playlist = PL.insert_one_playlist
get_playlist_by_name = PL.get_playlist_by_name
count_playlist_by_name = PL.count_playlist_by_name
get_all_playlists = PL.get_all_playlists
get_playlist_by_id = PL.get_playlist_by_id
tracks_to_playlist = PL.add_tracks_to_playlist
add_artist_to_playlist = PL.add_artist_to_playlist
update_playlist = PL.update_playlist
delete_playlist = PL.delete_playlist
# get_tracks_by_trackhashes = SQLiteTrackMethods.get_tracks_by_trackhashes
@playlistbp.route("/playlists", methods=["GET"])
def send_all_playlists():
"""
Gets all the playlists.
"""
playlists = get_all_playlists()
playlists = list(playlists)
playlists.sort(
key=lambda p: datetime.strptime(p.last_updated, "%Y-%m-%d %H:%M:%S"),
reverse=True,
)
return {"data": playlists}
@playlistbp.route("/playlist/new", methods=["POST"])
def create_playlist():
"""
Creates a new playlist. Accepts POST method with a JSON body.
"""
data = request.get_json()
if data is None:
return {"error": "Playlist name not provided"}, 400
existing_playlist_count = count_playlist_by_name(data["name"])
if existing_playlist_count > 0:
return {"error": "Playlist already exists"}, 409
playlist = {
"artisthashes": json.dumps([]),
"banner_pos": 50,
"has_gif": 0,
"image": None,
"last_updated": create_new_date(),
"name": data["name"],
"trackhashes": json.dumps([]),
}
playlist = insert_one_playlist(playlist)
if playlist is None:
return {"error": "Playlist could not be created"}, 500
return {"playlist": playlist}, 201
@playlistbp.route("/playlist/<playlist_id>/add", methods=["POST"])
def add_track_to_playlist(playlist_id: str):
"""
Takes a playlist ID and a track hash, and adds the track to the playlist
"""
data = request.get_json()
if data is None:
return {"error": "Track hash not provided"}, 400
trackhash = data["track"]
insert_count = tracks_to_playlist(int(playlist_id), [trackhash])
if insert_count == 0:
return {"error": "Track already exists in playlist"}, 409
add_artist_to_playlist(int(playlist_id), trackhash)
return {"msg": "Done"}, 200
@playlistbp.route("/playlist/<playlistid>")
def get_playlist(playlistid: str):
"""
Gets a playlist by id, and if it exists, it gets all the tracks in the playlist and returns them.
"""
playlist = get_playlist_by_id(int(playlistid))
if playlist is None:
return {"msg": "Playlist not found"}, 404
tracks = Store.get_tracks_by_trackhashes(list(playlist.trackhashes))
tracks = remove_duplicates(tracks)
duration = sum(t.duration for t in tracks)
playlist.last_updated = serializer.date_string_to_time_passed(playlist.last_updated)
playlist.duration = duration
return {"info": playlist, "tracks": tracks}
@playlistbp.route("/playlist/<playlistid>/update", methods=["PUT"])
def update_playlist_info(playlistid: str):
if playlistid is None:
return {"error": "Playlist ID not provided"}, 400
db_playlist = get_playlist_by_id(int(playlistid))
if db_playlist is None:
return {"error": "Playlist not found"}, 404
image = None
if "image" in request.files:
image = request.files["image"]
data = request.form
playlist = {
"id": int(playlistid),
"artisthashes": json.dumps([]),
"banner_pos": db_playlist.banner_pos,
"has_gif": 0,
"image": db_playlist.image,
"last_updated": create_new_date(),
"name": str(data.get("name")).strip(),
"trackhashes": json.dumps([]),
}
if image:
try:
playlist["image"] = playlistlib.save_p_image(image, playlistid)
if image.content_type == "image/gif":
playlist["has_gif"] = 1
# reset banner position to center.
playlist["banner_pos"] = 50
PL.update_banner_pos(int(playlistid), 50)
except UnidentifiedImageError:
return {"error": "Failed: Invalid image"}, 400
p_tuple = (*playlist.values(),)
print("banner pos:", playlist["banner_pos"])
update_playlist(int(playlistid), playlist)
playlist = models.Playlist(*p_tuple)
playlist.last_updated = serializer.date_string_to_time_passed(playlist.last_updated)
return {
"data": playlist,
}
# @playlist_bp.route("/playlist/artists", methods=["POST"])
# def get_playlist_artists():
# data = request.get_json()
# pid = data["pid"]
# artists = playlistlib.GetPlaylistArtists(pid)()
# return {"data": artists}
@playlistbp.route("/playlist/delete", methods=["POST"])
def remove_playlist():
"""
Deletes a playlist by ID.
"""
message = {"error": "Playlist ID not provided"}
data = request.get_json()
if data is None:
return message, 400
try:
pid = data["pid"]
except KeyError:
return message, 400
delete_playlist(pid)
return {"msg": "Done"}, 200
@playlistbp.route("/playlist/<pid>/set-image-pos", methods=["POST"])
def update_image_position(pid: int):
data = request.get_json()
message = {"msg": "No data provided"}
if data is None:
return message, 400
try:
pos = data["pos"]
except KeyError:
return message, 400
PL.update_banner_pos(pid, pos)
return {"msg": "Image position saved"}, 200
+218
View File
@@ -0,0 +1,218 @@
"""
Contains all the search routes.
"""
from flask import Blueprint, request
from app import models, utils
from app.db.store import Store
from app.lib import searchlib
searchbp = Blueprint("search", __name__, url_prefix="/")
SEARCH_COUNT = 12
"""The max amount of items to return per request"""
class SearchResults:
"""
Holds all the search results.
"""
query: str = ""
tracks: list[models.Track] = []
albums: list[models.Album] = []
playlists: list[models.Playlist] = []
artists: list[models.Artist] = []
class DoSearch:
"""Class containing the methods that perform searching."""
def __init__(self, query: str) -> None:
"""
:param :str:`query`: the search query.
"""
self.tracks: list[models.Track] = []
self.query = query
SearchResults.query = query
def search_tracks(self):
"""Calls :class:`SearchTracks` which returns the tracks that fuzzily match
the search terms. Then adds them to the `SearchResults` store.
"""
self.tracks = Store.tracks
tracks = searchlib.SearchTracks(self.tracks, self.query)()
if len(tracks) == 0:
return []
tracks = utils.remove_duplicates(tracks)
SearchResults.tracks = tracks
return tracks
def search_artists(self):
"""Calls :class:`SearchArtists` which returns the artists that fuzzily match
the search term. Then adds them to the `SearchResults` store.
"""
# self.artists = utils.Get.get_all_artists()
artists = [a.name for a in Store.artists]
artists = searchlib.SearchArtists(artists, self.query)()
SearchResults.artists = artists
return artists
def search_albums(self):
"""Calls :class:`SearchAlbums` which returns the albums that fuzzily match
the search term. Then adds them to the `SearchResults` store.
"""
# albums = utils.Get.get_all_albums()
albums = Store.albums
albums = searchlib.SearchAlbums(albums, self.query)()
SearchResults.albums = albums
return albums
# def search_playlists(self):
# """Calls :class:`SearchPlaylists` which returns the playlists that fuzzily match
# the search term. Then adds them to the `SearchResults` store.
# """
# playlists = utils.Get.get_all_playlists()
# playlists = [serializer.Playlist(playlist) for playlist in playlists]
# playlists = searchlib.SearchPlaylists(playlists, self.query)()
# SearchResults.playlists = playlists
# return playlists
def search_all(self):
"""Calls all the search methods."""
self.search_tracks()
self.search_albums()
self.search_artists()
# self.search_playlists()
@searchbp.route("/search/tracks", methods=["GET"])
def search_tracks():
"""
Searches for tracks that match the search query.
"""
query = request.args.get("q")
if not query:
return {"error": "No query provided"}, 400
tracks = DoSearch(query).search_tracks()
return {
"tracks": tracks[:SEARCH_COUNT],
"more": len(tracks) > SEARCH_COUNT,
}
@searchbp.route("/search/albums", methods=["GET"])
def search_albums():
"""
Searches for albums.
"""
query = request.args.get("q")
if not query:
return {"error": "No query provided"}, 400
tracks = DoSearch(query).search_albums()
return {
"albums": tracks[:SEARCH_COUNT],
"more": len(tracks) > SEARCH_COUNT,
}
@searchbp.route("/search/artists", methods=["GET"])
def search_artists():
"""
Searches for artists.
"""
query = request.args.get("q")
if not query:
return {"error": "No query provided"}, 400
artists = DoSearch(query).search_artists()
return {
"artists": artists[:SEARCH_COUNT],
"more": len(artists) > SEARCH_COUNT,
}
# @searchbp.route("/search/playlists", methods=["GET"])
# def search_playlists():
# """
# Searches for playlists.
# """
# query = request.args.get("q")
# if not query:
# return {"error": "No query provided"}, 400
# playlists = DoSearch(query).search_playlists()
# return {
# "playlists": playlists[:SEARCH_COUNT],
# "more": len(playlists) > SEARCH_COUNT,
# }
@searchbp.route("/search/top", methods=["GET"])
def get_top_results():
"""
Returns the top results for the search query.
"""
query = request.args.get("q")
if not query:
return {"error": "No query provided"}, 400
DoSearch(query).search_all()
max = 2
return {
"tracks": SearchResults.tracks[:max],
"albums": SearchResults.albums[:max],
"artists": SearchResults.artists[:max],
"playlists": SearchResults.playlists[:max],
}
@searchbp.route("/search/loadmore")
def search_load_more():
"""
Returns more songs, albums or artists from a search query.
"""
s_type = request.args.get("type")
index = int(request.args.get("index") or 0)
if s_type == "tracks":
t = SearchResults.tracks
return {
"tracks": t[index : index + SEARCH_COUNT],
"more": len(t) > index + SEARCH_COUNT,
}
elif s_type == "albums":
a = SearchResults.albums
return {
"albums": a[index : index + SEARCH_COUNT],
"more": len(a) > index + SEARCH_COUNT,
}
elif s_type == "artists":
a = SearchResults.artists
return {
"artists": a[index : index + SEARCH_COUNT],
"more": len(a) > index + SEARCH_COUNT,
}
+33
View File
@@ -0,0 +1,33 @@
"""
Contains all the track routes.
"""
from flask import Blueprint, send_file
from app.db.store import Store
trackbp = Blueprint("track", __name__, url_prefix="/")
@trackbp.route("/file/<trackhash>")
def send_track_file(trackhash: str):
"""
Returns an audio file that matches the passed id to the client.
Falls back to track hash if id is not found.
"""
msg = {"msg": "File Not Found"}
if trackhash is None:
return msg, 404
try:
track = Store.get_tracks_by_trackhashes([trackhash])[0]
except IndexError:
track = None
if track is None:
return msg, 404
audio_type = track.filepath.rsplit(".", maxsplit=1)[-1]
try:
return send_file(track.filepath, mimetype=f"audio/{audio_type}")
except FileNotFoundError:
return msg, 404
+214
View File
@@ -0,0 +1,214 @@
class AlbumMethods:
"""
Lists all the methods that can be found in the Albums class.
"""
def insert_album():
"""
Inserts a new album object into the database.
"""
pass
def get_all_albums():
"""
Returns all the albums in the database.
"""
pass
def get_album_by_id():
"""
Returns a single album matching the passed id.
"""
pass
def get_album_by_name():
"""
Returns a single album matching the passed name.
"""
pass
def get_album_by_artist():
"""
Returns a single album matching the passed artist name.
"""
pass
class ArtistMethods:
"""
Lists all the methods that can be found in the Artists class.
"""
def insert_artist():
"""
Inserts a new artist object into the database.
"""
pass
def get_all_artists():
"""
Returns all the artists in the database.
"""
pass
def get_artist_by_id():
"""
Returns an artist matching the mongo Id.
"""
pass
def get_artists_by_name():
"""
Returns all the artists matching the query.
"""
pass
class PlaylistMethods:
"""
Lists all the methods that can be found in the Playlists class.
"""
def insert_playlist():
"""
Inserts a new playlist object into the database.
"""
pass
def get_all_playlists():
"""
Returns all the playlists in the database.
"""
pass
def get_playlist_by_id():
"""
Returns a single playlist matching the id in the query params.
"""
pass
def add_track_to_playlist():
"""
Adds a track to a playlist.
"""
pass
def get_playlist_by_name():
"""
Returns a single playlist matching the name in the query params.
"""
pass
def update_playlist():
"""
Updates a playlist.
"""
pass
class TrackMethods:
"""
Lists all the methods that can be found in the Tracks class.
"""
def insert_one_track():
"""
Inserts a new track object into the database.
"""
pass
def drop_db():
"""
Drops the entire database.
"""
pass
def get_all_tracks():
"""
Returns all the tracks in the database.
"""
pass
def get_track_by_id():
"""
Returns a single track matching the id in the query params.
"""
pass
def get_track_by_album():
"""
Returns a single track matching the album in the query params.
"""
pass
def search_tracks_by_album():
"""
Returns all the tracks matching the albums in the query params (using regex).
"""
pass
def search_tracks_by_artist():
"""
Returns all the tracks matching the artists in the query params.
"""
pass
def find_track_by_title():
"""
Finds all the tracks matching the title in the query params.
"""
pass
def find_tracks_by_album():
"""
Finds all the tracks matching the album in the query params.
"""
pass
def find_tracks_by_folder():
"""
Finds all the tracks matching the folder in the query params.
"""
pass
def find_tracks_by_artist():
"""
Finds all the tracks matching the artist in the query params.
"""
pass
def find_tracks_by_albumartist():
"""
Finds all the tracks matching the album artist in the query params.
"""
pass
def get_track_by_path():
"""
Returns a single track matching the path in the query params.
"""
pass
def remove_track_by_path():
"""
Removes a track from the database. Returns a boolean indicating success or failure of the operation.
"""
pass
def remove_track_by_id():
"""
Removes a track from the database. Returns a boolean indicating success or failure of the operation.
"""
pass
def find_tracks_by_albumhash():
"""
Returns all the tracks matching the passed hash.
"""
pass
def get_dir_t_count():
"""
Returns a list of all the tracks matching the path in the query params.
"""
pass
+47
View File
@@ -0,0 +1,47 @@
"""
This module contains the functions to interact with the SQLite database.
"""
import sqlite3
from pathlib import Path
from sqlite3 import Connection as SqlConn
from app.settings import APP_DB_PATH
def create_connection(db_file: str) -> SqlConn:
"""
Creates a connection to the specified database.
"""
conn = sqlite3.connect(db_file)
return conn
def get_sqlite_conn():
"""
It opens a connection to the database
:return: A connection to the database.
"""
return create_connection(APP_DB_PATH)
def create_tables(conn: SqlConn, sql_query: str):
"""
Executes the specifiend SQL file to create database tables.
"""
# with open(sql_query, "r", encoding="utf-8") as sql_file:
conn.executescript(sql_query)
def setup_search_db():
"""
Creates the search database.
"""
db = sqlite3.connect(":memory:")
sql_file = "queries/fts5.sql"
current_path = Path(__file__).parent.resolve()
sql_path = current_path.joinpath(sql_file)
with open(sql_path, "r", encoding="utf-8") as sql_file:
db.executescript(sql_file.read())
+125
View File
@@ -0,0 +1,125 @@
from sqlite3 import Cursor
from app.db import AlbumMethods
from .utils import SQLiteManager, tuple_to_album, tuples_to_albums
class SQLiteAlbumMethods(AlbumMethods):
@classmethod
def insert_one_album(cls, cur: Cursor, albumhash: str, colors: str):
"""
Inserts one album into the database
"""
sql = """INSERT INTO albums(
albumhash,
colors
) VALUES(?,?)
"""
cur.execute(sql, (albumhash, colors))
return cur.lastrowid
# @classmethod
# def insert_many_albums(cls, albums: list[dict]):
# """
# Takes a generator of albums, and inserts them into the database
# Parameters
# ----------
# albums : Generator
# Generator
# """
# with SQLiteManager() as cur:
# for album in albums:
# cls.insert_one_album(cur, album["albumhash"], album["colors"])
@classmethod
def get_all_albums(cls):
with SQLiteManager() as cur:
cur.execute("SELECT * FROM albums")
albums = cur.fetchall()
if albums is not None:
return albums
return []
# @staticmethod
# def get_album_by_id(album_id: int):
# conn = get_sqlite_conn()
# cur = conn.cursor()
# cur.execute("SELECT * FROM albums WHERE id=?", (album_id,))
# album = cur.fetchone()
# conn.close()
# if album is None:
# return None
# return tuple_to_album(album)
@staticmethod
def get_album_by_hash(album_hash: str):
with SQLiteManager() as cur:
cur.execute("SELECT * FROM albums WHERE albumhash=?", (album_hash,))
album = cur.fetchone()
if album is not None:
return tuple_to_album(album)
return None
@classmethod
def get_albums_by_hashes(cls, album_hashes: list):
"""
Gets all the albums with the specified hashes. Returns a generator of albums or an empty list.
"""
with SQLiteManager() as cur:
hashes = ",".join("?" * len(album_hashes))
cur.execute(
f"SELECT * FROM albums WHERE albumhash IN ({hashes})", album_hashes
)
albums = cur.fetchall()
if albums is not None:
return tuples_to_albums(albums)
return []
# @staticmethod
# def update_album_colors(album_hash: str, colors: list[str]):
# sql = "UPDATE albums SET colors=? WHERE albumhash=?"
# colors_str = json.dumps(colors)
# with SQLiteManager() as cur:
# cur.execute(sql, (colors_str, album_hash))
@staticmethod
def get_albums_by_albumartist(albumartist: str):
with SQLiteManager() as cur:
cur.execute("SELECT * FROM albums WHERE albumartist=?", (albumartist,))
albums = cur.fetchall()
if albums is not None:
return tuples_to_albums(albums)
return []
@staticmethod
def get_all_albums_raw():
"""
Returns all the albums in the database, as a list of tuples.
"""
with SQLiteManager() as cur:
cur.execute("SELECT * FROM albums")
albums = cur.fetchall()
if albums is not None:
return albums
return []
+36
View File
@@ -0,0 +1,36 @@
"""
Contains methods for reading and writing to the sqlite artists database.
"""
import json
from .utils import SQLiteManager
class SQLiteArtistMethods:
@classmethod
def insert_one_artist(cls, artisthash: str, colors: str | list[str]):
"""
Inserts a single artist into the database.
"""
sql = """INSERT INTO artists(
artisthash,
colors
) VALUES(?,?)
"""
colors = json.dumps(colors)
with SQLiteManager() as cur:
cur.execute(sql, (artisthash, colors))
@classmethod
def get_all_artists(cls):
"""
Get all artists from the database and return a generator of Artist objects
"""
sql = """SELECT * FROM artists"""
with SQLiteManager() as cur:
cur.execute(sql)
for artist in cur.fetchall():
yield artist
+77
View File
@@ -0,0 +1,77 @@
from app.models import FavType
from .utils import SQLiteManager
class SQLiteFavoriteMethods:
"""THis class contains methods for interacting with the favorites table."""
@classmethod
def insert_one_favorite(cls, fav_type: str, fav_hash: str):
"""
Inserts a single favorite into the database.
"""
sql = """INSERT INTO favorites(type, hash) VALUES(?,?)"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (fav_type, fav_hash))
@classmethod
def get_all(cls) -> list[tuple]:
"""
Returns a list of all favorites.
"""
sql = """SELECT * FROM favorites"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql)
return cur.fetchall()
@classmethod
def get_favorites(cls, fav_type: str) -> list[tuple]:
"""
Returns a list of favorite tracks.
"""
sql = """SELECT * FROM favorites WHERE type = ?"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (fav_type,))
return cur.fetchall()
@classmethod
def get_fav_tracks(cls) -> list[tuple]:
"""
Returns a list of favorite tracks.
"""
return cls.get_favorites(FavType.track)
@classmethod
def get_fav_albums(cls) -> list[tuple]:
"""
Returns a list of favorite albums.
"""
return cls.get_favorites(FavType.album)
@classmethod
def get_fav_artists(cls) -> list[tuple]:
"""
Returns a list of favorite artists.
"""
return cls.get_favorites(FavType.artist)
@classmethod
def delete_favorite(cls, fav_type: str, fav_hash: str):
"""
Deletes a favorite from the database.
"""
sql = """DELETE FROM favorites WHERE hash = ? AND type = ?"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (fav_hash, fav_type))
@classmethod
def check_is_favorite(cls, itemhash: str, fav_type: str):
"""
Checks if an item is favorited.
"""
sql = """SELECT * FROM favorites WHERE hash = ? AND type = ?"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (itemhash, fav_type))
items = cur.fetchall()
return len(items) > 0
+179
View File
@@ -0,0 +1,179 @@
import json
from collections import OrderedDict
from app.db.sqlite.tracks import SQLiteTrackMethods
from app.db.sqlite.utils import SQLiteManager, tuple_to_playlist, tuples_to_playlists
from app.models import Artist
from app.utils import background
class SQLitePlaylistMethods:
"""
This class contains methods for interacting with the playlists table.
"""
@staticmethod
def insert_one_playlist(playlist: dict):
sql = """INSERT INTO playlists(
artisthashes,
banner_pos,
has_gif,
image,
last_updated,
name,
trackhashes
) VALUES(?,?,?,?,?,?,?)
"""
playlist = OrderedDict(sorted(playlist.items()))
params = (*playlist.values(),)
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, params)
pid = cur.lastrowid
params = (pid, *params)
return tuple_to_playlist(params)
@staticmethod
def get_playlist_by_name(name: str):
sql = "SELECT * FROM playlists WHERE name = ?"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (name,))
data = cur.fetchone()
if data is not None:
return tuple_to_playlist(data)
return None
@staticmethod
def count_playlist_by_name(name: str):
sql = "SELECT COUNT(*) FROM playlists WHERE name = ?"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (name,))
data = cur.fetchone()
return int(data[0])
@staticmethod
def get_all_playlists():
with SQLiteManager(userdata_db=True) as cur:
cur.execute("SELECT * FROM playlists")
playlists = cur.fetchall()
if playlists is not None:
return tuples_to_playlists(playlists)
return []
@staticmethod
def get_playlist_by_id(playlist_id: int):
sql = "SELECT * FROM playlists WHERE id = ?"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (playlist_id,))
data = cur.fetchone()
if data is not None:
return tuple_to_playlist(data)
return None
# FIXME: Extract the "add_track_to_playlist" method to use it for both the artisthash and trackhash lists.
@staticmethod
def add_item_to_json_list(playlist_id: int, field: str, items: list[str]):
"""
Adds a string item to a json dumped list using a playlist id and field name. Takes the playlist ID, a field name, an item to add to the field, and an error to raise if the item is already in the field.
Parameters
----------
playlist_id : int
The ID of the playlist to add the item to.
field : str
The field in the database that you want to add the item to.
item : str
The item to add to the list.
error : Exception
The error to raise if the item is already in the list.
Returns
-------
A list of strings.
"""
sql = f"SELECT {field} FROM playlists WHERE id = ?"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (playlist_id,))
data = cur.fetchone()
if data is not None:
db_items: list[str] = json.loads(data[0])
for item in items:
if item in db_items:
items.remove(item)
db_items.extend(items)
sql = f"UPDATE playlists SET {field} = ? WHERE id = ?"
cur.execute(sql, (json.dumps(db_items), playlist_id))
return len(items)
@classmethod
def add_tracks_to_playlist(cls, playlist_id: int, trackhashes: list[str]):
return cls.add_item_to_json_list(playlist_id, "trackhashes", trackhashes)
@classmethod
@background
def add_artist_to_playlist(cls, playlist_id: int, trackhash: str):
track = SQLiteTrackMethods.get_track_by_trackhash(trackhash)
if track is None:
return
artists: list[Artist] = track.artist # type: ignore
artisthashes = [a.artisthash for a in artists]
cls.add_item_to_json_list(playlist_id, "artisthashes", artisthashes)
@staticmethod
def update_playlist(playlist_id: int, playlist: dict):
sql = """UPDATE playlists SET
has_gif = ?,
image = ?,
last_updated = ?,
name = ?
WHERE id = ?
"""
del playlist["id"]
del playlist["trackhashes"]
del playlist["artisthashes"]
del playlist['banner_pos']
playlist = OrderedDict(sorted(playlist.items()))
params = (*playlist.values(), playlist_id)
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, params)
@staticmethod
def delete_playlist(pid: str):
sql = "DELETE FROM playlists WHERE id = ?"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (pid,))
@staticmethod
def update_banner_pos(playlistid: int, pos: int):
sql = """UPDATE playlists SET banner_pos = ? WHERE id = ?"""
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql, (pos, playlistid))
+65
View File
@@ -0,0 +1,65 @@
"""
This file contains the SQL queries to create the database tables.
"""
CREATE_USERDATA_TABLES = """
CREATE TABLE IF NOT EXISTS playlists (
id integer PRIMARY KEY,
artisthashes text,
banner_pos integer NOT NULL,
has_gif integer,
image text,
last_updated text not null,
name text not null,
trackhashes text
);
CREATE TABLE IF NOT EXISTS favorites (
id integer PRIMARY KEY,
hash text not null,
type text not null
);
"""
CREATE_APPDB_TABLES = """
CREATE TABLE IF NOT EXISTS tracks (
id integer PRIMARY KEY,
album text NOT NULL,
albumartist text NOT NULL,
albumhash text NOT NULL,
artist text NOT NULL,
bitrate integer NOT NULL,
copyright text,
date text NOT NULL,
disc integer NOT NULL,
duration integer NOT NULL,
filepath text NOT NULL,
folder text NOT NULL,
genre text,
title text NOT NULL,
track integer NOT NULL,
trackhash text NOT NULL
);
CREATE TABLE IF NOT EXISTS albums (
id integer PRIMARY KEY,
albumhash text NOT NULL,
colors text NOT NULL
);
CREATE TABLE IF NOT EXISTS artists (
id integer PRIMARY KEY,
artisthash text NOT NULL,
colors text,
bio text
);
CREATE TABLE IF NOT EXISTS folders (
id integer PRIMARY KEY,
path text NOT NULL,
trackcount integer NOT NULL
);
"""
+142
View File
@@ -0,0 +1,142 @@
"""
Contains the SQLiteTrackMethods class which contains methods for
interacting with the tracks table.
"""
from sqlite3 import Cursor
from app.db.sqlite.utils import tuple_to_track, tuples_to_tracks
from .utils import SQLiteManager
class SQLiteTrackMethods:
"""
This class contains all methods for interacting with the tracks table.
"""
@classmethod
def insert_one_track(cls, track: dict, cur: Cursor):
"""
Inserts a single track into the database.
"""
sql = """INSERT INTO tracks(
album,
albumartist,
albumhash,
artist,
bitrate,
copyright,
date,
disc,
duration,
filepath,
folder,
genre,
title,
track,
trackhash
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
"""
cur.execute(
sql,
(
track["album"],
track["albumartist"],
track["albumhash"],
track["artist"],
track["bitrate"],
track["copyright"],
track["date"],
track["disc"],
track["duration"],
track["filepath"],
track["folder"],
track["genre"],
track["title"],
track["track"],
track["trackhash"],
),
)
@classmethod
def insert_many_tracks(cls, tracks: list[dict]):
"""
Inserts a list of tracks into the database.
"""
with SQLiteManager() as cur:
for track in tracks:
cls.insert_one_track(track, cur)
@staticmethod
def get_all_tracks():
"""
Get all tracks from the database and return a generator of Track objects
or an empty list.
"""
with SQLiteManager() as cur:
cur.execute("SELECT * FROM tracks")
rows = cur.fetchall()
if rows is not None:
return tuples_to_tracks(rows)
return []
@staticmethod
def get_track_by_trackhash(trackhash: str):
"""
Gets a track using its trackhash. Returns a Track object or None.
"""
with SQLiteManager() as cur:
cur.execute("SELECT * FROM tracks WHERE trackhash=?", (trackhash,))
row = cur.fetchone()
if row is not None:
return tuple_to_track(row)
return None
@staticmethod
def get_tracks_by_trackhashes(hashes: list[str]):
"""
Gets all tracks in a list of trackhashes.
Returns a generator of Track objects or an empty list.
"""
sql = "SELECT * FROM tracks WHERE trackhash IN ({})".format(
",".join("?" * len(hashes))
)
with SQLiteManager() as cur:
cur.execute(sql, hashes)
rows = cur.fetchall()
if rows is not None:
return tuples_to_tracks(rows)
return []
@staticmethod
def remove_track_by_filepath(filepath: str):
"""
Removes a track from the database using its filepath.
"""
with SQLiteManager() as cur:
cur.execute("DELETE FROM tracks WHERE filepath=?", (filepath,))
@staticmethod
def track_exists(filepath: str):
"""
Checks if a track exists in the database using its filepath.
"""
with SQLiteManager() as cur:
cur.execute("SELECT * FROM tracks WHERE filepath=?", (filepath,))
row = cur.fetchone()
if row is not None:
return True
return False
+93
View File
@@ -0,0 +1,93 @@
"""
Helper functions for use with the SQLite database.
"""
import sqlite3
from sqlite3 import Connection, Cursor
from app.models import Album, Playlist, Track
from app.settings import APP_DB_PATH, USERDATA_DB_PATH
def tuple_to_track(track: tuple):
"""
Takes a tuple and returns a Track object
"""
return Track(*track[1:]) # rowid is removed from the tuple
def tuples_to_tracks(tracks: list[tuple]):
"""
Takes a list of tuples and returns a generator that yields a Track object for each tuple
"""
for track in tracks:
yield tuple_to_track(track)
def tuple_to_album(album: tuple):
"""
Takes a tuple and returns an Album object
"""
return Album(*album[1:]) # rowid is removed from the tuple
def tuples_to_albums(albums: list[tuple]):
"""
Takes a list of tuples and returns a generator that yields an album object for each tuple
"""
for album in albums:
yield tuple_to_album(album)
def tuple_to_playlist(playlist: tuple):
"""
Takes a tuple and returns a Playlist object
"""
return Playlist(*playlist)
def tuples_to_playlists(playlists: list[tuple]):
"""
Takes a list of tuples and returns a list of Playlist objects
"""
for playlist in playlists:
yield tuple_to_playlist(playlist)
class SQLiteManager:
"""
This is a context manager that handles the connection and cursor
for you. It also commits and closes the connection when you're done.
"""
def __init__(self, conn: Connection | None = None, userdata_db=False) -> None:
"""
When a connection is passed in, don't close the connection, because it's
a connection to the search database [in memory db].
"""
self.conn: Connection | None = conn
self.CLOSE_CONN = True
self.userdata_db = userdata_db
if conn:
self.conn = conn
self.CLOSE_CONN = False
def __enter__(self) -> Cursor:
if self.conn is not None:
return self.conn.cursor()
db_path = APP_DB_PATH
if self.userdata_db:
db_path = USERDATA_DB_PATH
self.conn = sqlite3.connect(db_path)
return self.conn.cursor()
def __exit__(self, exc_type, exc_value, exc_traceback):
if self.conn:
self.conn.commit()
if self.CLOSE_CONN:
self.conn.close()
+459
View File
@@ -0,0 +1,459 @@
"""
In memory store.
"""
import json
import random
from pathlib import Path
from tqdm import tqdm
from app.db.sqlite.albums import SQLiteAlbumMethods as aldb
from app.db.sqlite.artists import SQLiteArtistMethods as ardb
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.sqlite.tracks import SQLiteTrackMethods as tdb
from app.models import Album, Artist, Folder, Track
from app.utils import (
UseBisection,
create_folder_hash,
get_all_artists,
remove_duplicates,
)
class Store:
"""
This class holds all tracks in memory and provides methods for
interacting with them.
"""
tracks: list[Track] = []
folders: list[Folder] = []
albums: list[Album] = []
artists: list[Artist] = []
@classmethod
def load_all_tracks(cls):
"""
Loads all tracks from the database into the store.
"""
cls.tracks = list(tdb.get_all_tracks())
fav_hashes = favdb.get_fav_tracks()
fav_hashes = [t[1] for t in fav_hashes]
for track in tqdm(cls.tracks, desc="Loading tracks"):
if track.trackhash in fav_hashes:
track.is_favorite = True
@classmethod
def add_track(cls, track: Track):
"""
Adds a single track to the store.
"""
cls.tracks.append(track)
@classmethod
def add_tracks(cls, tracks: list[Track]):
"""
Adds multiple tracks to the store.
"""
cls.tracks.extend(tracks)
@classmethod
def get_tracks_by_trackhashes(cls, trackhashes: list[str]) -> list[Track]:
"""
Returns a list of tracks by their hashes.
"""
tracks = []
for trackhash in trackhashes:
for track in cls.tracks:
if track.trackhash == trackhash:
tracks.append(track)
return tracks
@classmethod
def remove_track_by_filepath(cls, filepath: str):
"""
Removes a track from the store by its filepath.
"""
for track in cls.tracks:
if track.filepath == filepath:
cls.tracks.remove(track)
break
@classmethod
def count_tracks_by_hash(cls, trackhash: str) -> int:
"""
Counts the number of tracks with a specific hash.
"""
count = 0
for track in cls.tracks:
if track.trackhash == trackhash:
count += 1
return count
# ====================================================
# =================== FAVORITES ======================
# ====================================================
@classmethod
def add_fav_track(cls, trackhash: str):
"""
Adds a track to the favorites.
"""
for track in cls.tracks:
if track.trackhash == trackhash:
track.is_favorite = True
@classmethod
def remove_fav_track(cls, trackhash: str):
"""
Removes a track from the favorites.
"""
for track in cls.tracks:
if track.trackhash == trackhash:
track.is_favorite = False
# ====================================================
# ==================== FOLDERS =======================
# ====================================================
@classmethod
def check_has_tracks(cls, path: str): # type: ignore
"""
Checks if a folder has tracks.
"""
path_hashes = "".join(f.path_hash for f in cls.folders)
path_hash = create_folder_hash(*Path(path).parts[1:])
return path_hash in path_hashes
@classmethod
def is_empty_folder(cls, path: str):
"""
Checks if a folder has tracks using tracks in the store.
"""
all_folders = set(track.folder for track in cls.tracks)
folder_hashes = "".join(
create_folder_hash(*Path(f).parts[1:]) for f in all_folders
)
path_hash = create_folder_hash(*Path(path).parts[1:])
return path_hash in folder_hashes
@staticmethod
def create_folder(path: str) -> Folder:
"""
Creates a folder object from a path.
"""
folder = Path(path)
return Folder(
name=folder.name,
path=str(folder),
is_sym=folder.is_symlink(),
has_tracks=True,
path_hash=create_folder_hash(*folder.parts[1:]),
)
@classmethod
def add_folder(cls, path: str):
"""
Adds a folder to the store.
"""
if cls.check_has_tracks(path):
return
folder = cls.create_folder(path)
cls.folders.append(folder)
@classmethod
def remove_folder(cls, path: str):
"""
Removes a folder from the store.
"""
for folder in cls.folders:
if folder.path == path:
cls.folders.remove(folder)
break
@classmethod
def process_folders(cls):
"""
Creates a list of folders from the tracks in the store.
"""
all_folders = [track.folder for track in cls.tracks]
all_folders = set(all_folders)
all_folders = [
folder for folder in all_folders if not cls.check_has_tracks(folder)
]
all_folders = [Path(f) for f in all_folders]
all_folders = [f for f in all_folders if f.exists()]
for path in tqdm(all_folders, desc="Processing folders"):
folder = cls.create_folder(str(path))
cls.folders.append(folder)
@classmethod
def get_folder(cls, path: str): # type: ignore
"""
Returns a folder object by its path.
"""
folders = sorted(cls.folders, key=lambda x: x.path)
folder = UseBisection(folders, "path", [path])()[0]
if folder is not None:
return folder
has_tracks = cls.check_has_tracks(path)
if not has_tracks:
return None
folder = cls.create_folder(path)
cls.folders.append(folder)
return folder
@classmethod
def get_tracks_by_filepaths(cls, paths: list[str]) -> list[Track]:
"""
Returns all tracks matching the given paths.
"""
tracks = sorted(cls.tracks, key=lambda x: x.filepath)
tracks = UseBisection(tracks, "filepath", paths)()
return [track for track in tracks if track is not None]
@classmethod
def get_tracks_by_albumhash(cls, album_hash: str) -> list[Track]:
"""
Returns all tracks matching the given album hash.
"""
return [t for t in cls.tracks if t.albumhash == album_hash]
@classmethod
def get_tracks_by_artist(cls, artisthash: str) -> list[Track]:
"""
Returns all tracks matching the given artist. Duplicate tracks are removed.
"""
tracks = [t for t in cls.tracks if artisthash in t.artist_hashes]
return remove_duplicates(tracks)
# ====================================================
# ==================== ALBUMS ========================
# ====================================================
@staticmethod
def create_album(track: Track):
"""
Creates album object from a track
"""
return Album(
albumhash=track.albumhash,
albumartists=track.albumartist, # type: ignore
title=track.album,
)
@classmethod
def load_albums(cls):
"""
Loads all albums from the database into the store.
"""
albumhashes = set(t.albumhash for t in cls.tracks)
for albumhash in tqdm(albumhashes, desc="Loading albums"):
for track in cls.tracks:
if track.albumhash == albumhash:
cls.albums.append(cls.create_album(track))
break
db_albums: list[tuple] = aldb.get_all_albums()
for album in tqdm(db_albums, desc="Mapping album colors"):
albumhash = album[1]
colors = json.loads(album[2])
for al in cls.albums:
if al.albumhash == albumhash:
al.set_colors(colors)
break
@classmethod
def add_album(cls, album: Album):
"""
Adds an album to the store.
"""
cls.albums.append(album)
@classmethod
def add_albums(cls, albums: list[Album]):
"""
Adds multiple albums to the store.
"""
cls.albums.extend(albums)
@classmethod
def get_albums_by_albumartist(
cls, artisthash: str, limit: int, exclude: str
) -> list[Album]:
"""
Returns N albums by the given albumartist, excluding the specified album.
"""
albums = [album for album in cls.albums if artisthash in album.albumartisthash]
albums = [album for album in albums if album.albumhash != exclude]
if len(albums) > limit:
random.shuffle(albums)
# TODO: Merge this with `cls.get_albums_by_artisthash()`
return albums[:limit]
@classmethod
def get_album_by_hash(cls, albumhash: str) -> Album | None:
"""
Returns an album by its hash.
"""
try:
return [a for a in cls.albums if a.albumhash == albumhash][0]
except IndexError:
return None
@classmethod
def get_albums_by_artisthash(cls, artisthash: str) -> list[Album]:
"""
Returns all albums by the given artist.
"""
return [album for album in cls.albums if artisthash in album.albumartisthash]
@classmethod
def count_albums_by_artisthash(cls, artisthash: str):
"""
Count albums for the given artisthash.
"""
albumartists = [a.albumartists for a in cls.albums]
artisthashes = []
for artist in albumartists:
artisthashes.extend([a.artisthash for a in artist]) # type: ignore
master_string = "-".join(artisthashes)
return master_string.count(artisthash)
@classmethod
def album_exists(cls, albumhash: str) -> bool:
"""
Checks if an album exists.
"""
return albumhash in "-".join([a.albumhash for a in cls.albums])
@classmethod
def remove_album_by_hash(cls, albumhash: str):
"""
Removes an album from the store.
"""
cls.albums = [a for a in cls.albums if a.albumhash != albumhash]
# ====================================================
# ==================== ARTISTS =======================
# ====================================================
@classmethod
def load_artists(cls):
"""
Loads all artists from the database into the store.
"""
cls.artists = get_all_artists(cls.tracks, cls.albums)
db_artists: list[tuple] = list(ardb.get_all_artists())
for art in tqdm(db_artists, desc="Loading artists"):
cls.map_artist_color(art)
@classmethod
def map_artist_color(cls, artist_tuple: tuple):
"""
Maps a color to the corresponding artist.
"""
artisthash = artist_tuple[1]
color = json.loads(artist_tuple[2])
for artist in cls.artists:
if artist.artisthash == artisthash:
artist.colors = color
break
@classmethod
def add_artist(cls, artist: Artist):
"""
Adds an artist to the store.
"""
cls.artists.append(artist)
@classmethod
def add_artists(cls, artists: list[Artist]):
"""
Adds multiple artists to the store.
"""
for artist in artists:
if artist not in cls.artists:
cls.artists.append(artist)
@classmethod
def get_artist_by_hash(cls, artisthash: str) -> Artist:
"""
Returns an artist by its hash.
"""
artists = sorted(cls.artists, key=lambda x: x.artisthash)
artist = UseBisection(artists, "artisthash", [artisthash])()[0]
return artist
@classmethod
def artist_exists(cls, artisthash: str) -> bool:
"""
Checks if an artist exists.
"""
return artisthash in "-".join([a.artisthash for a in cls.artists])
@classmethod
def artist_has_tracks(cls, artisthash: str) -> bool:
"""
Checks if an artist has tracks.
"""
artists: set[str] = set()
for track in cls.tracks:
artists.update(track.artist_hashes)
album_artists: list[str] = [a.artisthash for a in track.albumartist]
artists.update(album_artists)
master_hash = "-".join(artists)
return artisthash in master_hash
@classmethod
def remove_artist_by_hash(cls, artisthash: str):
"""
Removes an artist from the store.
"""
cls.artists = [a for a in cls.artists if a.artisthash != artisthash]
+40
View File
@@ -0,0 +1,40 @@
"""
This module contains functions for the server
"""
import time
from requests import ConnectionError as RequestConnectionError
from requests import ReadTimeout
from app import utils
from app.lib.artistlib import CheckArtistImages
from app.lib.colorlib import ProcessAlbumColors, ProcessArtistColors
from app.lib.populate import Populate, ProcessTrackThumbnails
from app.lib.trackslib import validate_tracks
from app.logger import log
@utils.background
def run_periodic_checks():
"""
Checks for new songs every N minutes.
"""
# ValidateAlbumThumbs()
# ValidatePlaylistThumbs()
validate_tracks()
while True:
Populate()
ProcessTrackThumbnails()
ProcessAlbumColors()
ProcessArtistColors()
if utils.Ping()():
try:
CheckArtistImages()
except (RequestConnectionError, ReadTimeout):
log.error(
"Internet connection lost. Downloading artist images stopped."
)
time.sleep(300)
+114
View File
@@ -0,0 +1,114 @@
import os
from pathlib import Path
from flask import Blueprint, request, send_from_directory
imgbp = Blueprint("imgserver", __name__, url_prefix="/img")
SUPPORTED_IMAGES = (".jpg", ".png", ".webp", ".jpeg")
HOME = os.path.expanduser("~")
APP_DIR = Path(HOME) / ".swing"
IMG_PATH = APP_DIR / "images"
ASSETS_PATH = APP_DIR / "assets"
THUMB_PATH = IMG_PATH / "thumbnails"
LG_THUMB_PATH = THUMB_PATH / "large"
SM_THUMB_PATH = THUMB_PATH / "small"
ARTIST_PATH = IMG_PATH / "artists"
ARTIST_LG_PATH = ARTIST_PATH / "large"
ARTIST_SM_PATH = ARTIST_PATH / "small"
PLAYLIST_PATH = IMG_PATH / "playlists"
@imgbp.route("/")
def hello():
return "<h1>Image Server</h1>"
def send_fallback_img(filename: str = "default.webp"):
img = ASSETS_PATH / filename
if not img.exists():
return "", 404
return send_from_directory(ASSETS_PATH, filename)
@imgbp.route("/t/<imgpath>")
def send_lg_thumbnail(imgpath: str):
fpath = LG_THUMB_PATH / imgpath
if fpath.exists():
return send_from_directory(LG_THUMB_PATH, imgpath)
return send_fallback_img()
@imgbp.route("/t/s/<imgpath>")
def send_sm_thumbnail(imgpath: str):
fpath = SM_THUMB_PATH / imgpath
if fpath.exists():
return send_from_directory(SM_THUMB_PATH, imgpath)
return send_fallback_img()
@imgbp.route("/a/<imgpath>")
def send_lg_artist_image(imgpath: str):
fpath = ARTIST_LG_PATH / imgpath
if fpath.exists():
return send_from_directory(ARTIST_LG_PATH, imgpath)
return send_fallback_img("artist.webp")
@imgbp.route("/a/s/<imgpath>")
def send_sm_artist_image(imgpath: str):
fpath = ARTIST_SM_PATH / imgpath
if fpath.exists():
return send_from_directory(ARTIST_SM_PATH, imgpath)
return send_fallback_img("artist.webp")
@imgbp.route("/p/<imgpath>")
def send_playlist_image(imgpath: str):
fpath = PLAYLIST_PATH / imgpath
if fpath.exists():
return send_from_directory(PLAYLIST_PATH, imgpath)
return send_fallback_img("playlist.svg")
# @app.route("/raw")
# @app.route("/raw/<path:imgpath>")
# def send_from_filepath(imgpath: str = ""):
# imgpath = "/" + imgpath
# filename = path.basename(imgpath)
# def verify_is_image():
# _, ext = path.splitext(filename)
# return ext in SUPPORTED_IMAGES
# verified = verify_is_image()
# if not verified:
# return imgpath, 404
# exists = path.exists(imgpath)
# if verified and exists:
# return send_from_directory(path.dirname(imgpath), filename)
# return imgpath, 404
# def serve_imgs():
# app.run(threaded=True, port=1971, host="0.0.0.0", debug=True)
+3
View File
@@ -0,0 +1,3 @@
"""
This module contains all the data processing and non-API libraries
"""
+3
View File
@@ -0,0 +1,3 @@
"""
Contains methods relating to albums.
"""
+130
View File
@@ -0,0 +1,130 @@
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from io import BytesIO
from PIL import Image
import requests
import urllib
from tqdm import tqdm
from requests.exceptions import ConnectionError as ReqConnError, ReadTimeout
from app import settings
from app.models import Artist
from app.db.store import Store
from app.utils import create_hash
def get_artist_image_link(artist: str):
"""
Returns an artist image url.
"""
try:
query = urllib.parse.quote(artist) # type: ignore
url = f"https://api.deezer.com/search/artist?q={query}"
response = requests.get(url, timeout=30)
data = response.json()
for res in data["data"]:
res_hash = create_hash(res["name"], decode=True)
artist_hash = create_hash(artist, decode=True)
if res_hash == artist_hash:
return res["picture_big"]
return None
except (ReqConnError, ReadTimeout, IndexError, KeyError):
return None
class DownloadImage:
def __init__(self, url: str, name: str) -> None:
sm_path = Path(settings.ARTIST_IMG_SM_PATH) / name
lg_path = Path(settings.ARTIST_IMG_LG_PATH) / name
img = self.download(url)
if img is not None:
self.save_img(img, sm_path, lg_path)
@staticmethod
def download(url: str) -> Image.Image | None:
"""
Downloads the image from the url.
"""
return Image.open(BytesIO(requests.get(url, timeout=10).content))
@staticmethod
def save_img(img: Image.Image, sm_path: Path, lg_path: Path):
"""
Saves the image to the destinations.
"""
img.save(lg_path, format="webp")
sm_size = settings.SM_ARTIST_IMG_SIZE
img.resize((sm_size, sm_size), Image.ANTIALIAS).save(sm_path, format="webp")
class CheckArtistImages:
def __init__(self):
with ThreadPoolExecutor() as pool:
list(
tqdm(
pool.map(self.download_image, Store.artists),
total=len(Store.artists),
desc="Downloading artist images",
)
)
@staticmethod
def download_image(artist: Artist):
"""
Checks if an artist image exists and downloads it if not.
:param artistname: The artist name
"""
img_path = Path(settings.ARTIST_IMG_SM_PATH) / f"{artist.artisthash}.webp"
if img_path.exists():
return
url = get_artist_image_link(artist.name)
if url is not None:
return DownloadImage(url, name=f"{artist.artisthash}.webp")
# def fetch_album_bio(title: str, albumartist: str) -> str | None:
# """
# Returns the album bio for a given album.
# """
# last_fm_url = "http://ws.audioscrobbler.com/2.0/?method=album.getinfo&api_key={}&artist={}&album={}&format=json".format(
# settings.LAST_FM_API_KEY, albumartist, title
# )
# try:
# response = requests.get(last_fm_url)
# data = response.json()
# except:
# return None
# try:
# bio = data["album"]["wiki"]["summary"].split('<a href="https://www.last.fm/')[0]
# except KeyError:
# bio = None
# return bio
# class FetchAlbumBio:
# """
# Returns the album bio for a given album.
# """
# def __init__(self, title: str, albumartist: str):
# self.title = title
# self.albumartist = albumartist
# def __call__(self):
# return fetch_album_bio(self.title, self.albumartist)
+94
View File
@@ -0,0 +1,94 @@
"""
Contains everything that deals with image color extraction.
"""
import json
from pathlib import Path
import colorgram
from tqdm import tqdm
from app import settings
from app.db.sqlite.albums import SQLiteAlbumMethods as db
from app.db.sqlite.artists import SQLiteArtistMethods as adb
from app.db.sqlite.utils import SQLiteManager
from app.db.store import Store
from app.models import Album, Artist
def get_image_colors(image: str) -> list[str]:
"""Extracts 2 of the most dominant colors from an image."""
try:
colors = sorted(colorgram.extract(image, 1), key=lambda c: c.hsl.h)
except OSError:
return []
formatted_colors = []
for color in colors:
color = f"rgb({color.rgb.r}, {color.rgb.g}, {color.rgb.b})"
formatted_colors.append(color)
return formatted_colors
class ProcessAlbumColors:
"""
Extracts the most dominant color from the album art and saves it to the database.
"""
def __init__(self) -> None:
with SQLiteManager() as cur:
for album in tqdm(Store.albums, desc="Processing album colors"):
if len(album.colors) == 0:
colors = self.process_color(album)
if colors is None:
continue
album.set_colors(colors)
color_str = json.dumps(colors)
db.insert_one_album(cur, album.albumhash, color_str)
@staticmethod
def process_color(album: Album):
path = Path(settings.SM_THUMB_PATH) / album.image
if not path.exists():
return
colors = get_image_colors(str(path))
return colors
class ProcessArtistColors:
"""
Extracts the most dominant color from the artist art and saves it to the database.
"""
def __init__(self) -> None:
all_artists = Store.artists
if all_artists is None:
return
for artist in tqdm(all_artists, desc="Processing artist colors"):
if len(artist.colors) == 0:
self.process_color(artist)
@staticmethod
def process_color(artist: Artist):
path = Path(settings.ARTIST_IMG_SM_PATH) / artist.image
if not path.exists():
return
colors = get_image_colors(str(path))
if len(colors) > 0:
adb.insert_one_artist(artisthash=artist.artisthash, colors=colors)
Store.map_artist_color((0, artist.artisthash, json.dumps(colors)))
# TODO: Load album and artist colors into the store.
+47
View File
@@ -0,0 +1,47 @@
import os
import pathlib
from concurrent.futures import ThreadPoolExecutor
from app.db.store import Store
from app.models import Folder, Track
from app.settings import SUPPORTED_FILES
class GetFilesAndDirs:
"""
Get files and folders from a directory.
"""
def __init__(self, path: str) -> None:
self.path = path
def __call__(self) -> tuple[list[Track], list[Folder]]:
try:
entries = os.scandir(self.path)
except FileNotFoundError:
return ([], [])
dirs, files = [], []
for entry in entries:
ext = os.path.splitext(entry.name)[1].lower()
if entry.is_dir() and not entry.name.startswith("."):
dirs.append(entry.path)
elif entry.is_file() and ext in SUPPORTED_FILES:
files.append(entry.path)
# sort files by modified time
files.sort(
key=lambda f: os.path.getmtime(f) # pylint: disable=unnecessary-lambda
)
tracks = Store.get_tracks_by_filepaths(files)
with ThreadPoolExecutor() as pool:
iterable = pool.map(Store.get_folder, dirs)
folders = [i for i in iterable if i is not None]
folders = filter(lambda f: f.has_tracks, folders)
return (tracks, folders) # type: ignore
+115
View File
@@ -0,0 +1,115 @@
"""
This library contains all the functions related to playlists.
"""
import os
import random
import string
from datetime import datetime
from typing import Any
from PIL import Image, ImageSequence
from app import settings
from app.logger import log
def create_thumbnail(image: Any, img_path: str) -> str:
"""
Creates a 250 x 250 thumbnail from a playlist image
"""
thumb_path = "thumb_" + img_path
full_thumb_path = os.path.join(settings.APP_DIR, "images", "playlists", thumb_path)
aspect_ratio = image.width / image.height
new_w = round(250 * aspect_ratio)
thumb = image.resize((new_w, 250), Image.ANTIALIAS)
thumb.save(full_thumb_path, "webp")
return thumb_path
def create_gif_thumbnail(image: Any, img_path: str):
"""
Creates a 250 x 250 thumbnail from a playlist image
"""
thumb_path = "thumb_" + img_path
full_thumb_path = os.path.join(settings.APP_DIR, "images", "playlists", thumb_path)
frames = []
for frame in ImageSequence.Iterator(image):
aspect_ratio = frame.width / frame.height
new_w = round(250 * aspect_ratio)
thumb = frame.resize((new_w, 250), Image.ANTIALIAS)
frames.append(thumb)
frames[0].save(full_thumb_path, save_all=True, append_images=frames[1:])
return thumb_path
def save_p_image(file, pid: str):
"""
Saves the image of a playlist to the database.
"""
img = Image.open(file)
random_str = "".join(random.choices(string.ascii_letters + string.digits, k=5))
img_path = pid + str(random_str) + ".webp"
full_img_path = os.path.join(settings.APP_DIR, "images", "playlists", img_path)
if file.content_type == "image/gif":
frames = []
for frame in ImageSequence.Iterator(img):
frames.append(frame.copy())
frames[0].save(full_img_path, save_all=True, append_images=frames[1:])
create_gif_thumbnail(img, img_path=img_path)
return img_path
img.save(full_img_path, "webp")
create_thumbnail(img, img_path=img_path)
return img_path
class ValidatePlaylistThumbs:
"""
Removes all unused images in the images/playlists folder.
"""
def __init__(self) -> None:
images = []
playlists = Get.get_all_playlists()
log.info("Validating playlist thumbnails")
for playlist in playlists:
if playlist.image:
img_path = playlist.image.split("/")[-1]
thumb_path = playlist.thumb.split("/")[-1]
images.append(img_path)
images.append(thumb_path)
p_path = os.path.join(settings.APP_DIR, "images", "playlists")
for image in os.listdir(p_path):
if image not in images:
os.remove(os.path.join(p_path, image))
log.info("Validating playlist thumbnails ... ✅")
def create_new_date():
return datetime.now()
# TODO: Fix ValidatePlaylistThumbs
+100
View File
@@ -0,0 +1,100 @@
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from app import settings
from app.db.sqlite.tracks import SQLiteTrackMethods
from app.db.store import Store
from app.lib.taglib import extract_thumb, get_tags
from app.logger import log
from app.models import Album, Artist, Track
from app.utils import run_fast_scandir
get_all_tracks = SQLiteTrackMethods.get_all_tracks
insert_many_tracks = SQLiteTrackMethods.insert_many_tracks
class Populate:
"""
Populates the database with all songs in the music directory
checks if the song is in the database, if not, it adds it
also checks if the album art exists in the image path, if not tries to extract it.
"""
def __init__(self) -> None:
tracks = get_all_tracks()
tracks = list(tracks)
files = run_fast_scandir(settings.HOME_DIR, full=True)[1]
untagged = self.filter_untagged(tracks, files)
if len(untagged) == 0:
log.info("All clear, no unread files.")
return
self.tag_untagged(untagged)
@staticmethod
def filter_untagged(tracks: list[Track], files: list[str]):
tagged_files = [t.filepath for t in tracks]
return set(files) - set(tagged_files)
@staticmethod
def tag_untagged(untagged: set[str]):
log.info("Found %s new tracks", len(untagged))
tagged_tracks: list[dict] = []
tagged_count = 0
for file in tqdm(untagged, desc="Reading files"):
tags = get_tags(file)
if tags is not None:
tagged_tracks.append(tags)
track = Track(**tags)
Store.add_track(track)
Store.add_folder(track.folder)
if not Store.album_exists(track.albumhash):
Store.add_album(Store.create_album(track))
for artist in track.artist:
if not Store.artist_exists(artist.artisthash):
Store.add_artist(Artist(artist.name))
for artist in track.albumartist:
if not Store.artist_exists(artist.artisthash):
Store.add_artist(Artist(artist.name))
tagged_count += 1
else:
log.warning("Could not read file: %s", file)
if len(tagged_tracks) > 0:
insert_many_tracks(tagged_tracks)
log.info("Added %s/%s tracks", tagged_count, len(untagged))
def get_image(album: Album):
for track in Store.tracks:
if track.albumhash == album.albumhash:
extract_thumb(track.filepath, track.image)
break
class ProcessTrackThumbnails:
def __init__(self) -> None:
with ThreadPoolExecutor(max_workers=4) as pool:
results = list(
tqdm(
pool.map(get_image, Store.albums),
total=len(Store.albums),
desc="Extracting track images",
)
)
results = [r for r in results]
+125
View File
@@ -0,0 +1,125 @@
"""
This library contains all the functions related to the search functionality.
"""
from typing import List
from rapidfuzz import fuzz, process
from app import models
ratio = fuzz.ratio
wratio = fuzz.WRatio
class Cutoff:
"""
Holds all the default cutoff values.
"""
tracks: int = 60
albums: int = 60
artists: int = 60
playlists: int = 60
class Limit:
"""
Holds all the default limit values.
"""
tracks: int = 50
albums: int = 50
artists: int = 50
playlists: int = 50
class SearchTracks:
def __init__(self, tracks: List[models.Track], query: str) -> None:
self.query = query
self.tracks = tracks
def __call__(self) -> List[models.Track]:
"""
Gets all songs with a given title.
"""
tracks = [track.title for track in self.tracks]
results = process.extract(
self.query,
tracks,
scorer=fuzz.WRatio,
score_cutoff=Cutoff.tracks,
limit=Limit.tracks,
)
return [self.tracks[i[2]] for i in results]
class SearchArtists:
def __init__(self, artists: list[str], query: str) -> None:
self.query = query
self.artists = artists
def __call__(self) -> list:
"""
Gets all artists with a given name.
"""
results = process.extract(
self.query,
self.artists,
scorer=fuzz.WRatio,
score_cutoff=Cutoff.artists,
limit=Limit.artists,
)
artists = [a[0] for a in results]
return [models.Artist(a) for a in artists]
class SearchAlbums:
def __init__(self, albums: List[models.Album], query: str) -> None:
self.query = query
self.albums = albums
def __call__(self) -> List[models.Album]:
"""
Gets all albums with a given title.
"""
albums = [a.title.lower() for a in self.albums]
results = process.extract(
self.query,
albums,
scorer=fuzz.WRatio,
score_cutoff=Cutoff.albums,
limit=Limit.albums,
)
return [self.albums[i[2]] for i in results]
# get all artists that matched the query
# for get all albums from the artists
# get all albums that matched the query
# return [**artist_albums **albums]
# recheck next and previous artist on play next or add to playlist
class SearchPlaylists:
def __init__(self, playlists: List[models.Playlist], query: str) -> None:
self.playlists = playlists
self.query = query
def __call__(self) -> List[models.Playlist]:
playlists = [p.name for p in self.playlists]
results = process.extract(
self.query,
playlists,
scorer=fuzz.WRatio,
score_cutoff=Cutoff.playlists,
limit=Limit.playlists,
)
return [self.playlists[i[2]] for i in results]
+159
View File
@@ -0,0 +1,159 @@
import os
import datetime
from io import BytesIO
from tinytag import TinyTag
from PIL import Image, UnidentifiedImageError
from app import settings
from app.utils import create_hash
def parse_album_art(filepath: str):
"""
Returns the album art for a given audio file.
"""
try:
tags = TinyTag.get(filepath, image=True)
return tags.get_image()
except: # pylint: disable=bare-except
return None
def extract_thumb(filepath: str, webp_path: str) -> bool:
"""
Extracts the thumbnail from an audio file. Returns the path to the thumbnail.
"""
img_path = os.path.join(settings.LG_THUMBS_PATH, webp_path)
sm_img_path = os.path.join(settings.SM_THUMB_PATH, webp_path)
tsize = settings.THUMB_SIZE
sm_tsize = settings.SM_THUMB_SIZE
def save_image(img: Image.Image):
img.resize((sm_tsize, sm_tsize), Image.ANTIALIAS).save(sm_img_path, "webp")
img.resize((tsize, tsize), Image.ANTIALIAS).save(img_path, "webp")
if os.path.exists(img_path):
img_size = os.path.getsize(img_path)
if img_size > 0:
return True
album_art = parse_album_art(filepath)
if album_art is not None:
try:
img = Image.open(BytesIO(album_art))
except (UnidentifiedImageError, OSError):
return False
try:
save_image(img)
except OSError:
try:
png = img.convert("RGB")
save_image(png)
except: # pylint: disable=bare-except
return False
return True
return False
def extract_date(date_str: str | None) -> int:
current_year = datetime.date.today().today().year
if date_str is None:
return current_year
try:
return int(date_str.split("-")[0])
except: # pylint: disable=bare-except
return current_year
def get_tags(filepath: str):
filetype = filepath.split(".")[-1]
filename = (filepath.split("/")[-1]).replace(f".{filetype}", "")
try:
tags = TinyTag.get(filepath)
except: # pylint: disable=bare-except
return None
no_albumartist: bool = (tags.albumartist == "") or (tags.albumartist is None)
no_artist: bool = (tags.artist == "") or (tags.artist is None)
if no_albumartist and not no_artist:
tags.albumartist = tags.artist
if no_artist and not no_albumartist:
tags.artist = tags.albumartist
to_filename = ["title", "album"]
for tag in to_filename:
p = getattr(tags, tag)
if p == "" or p is None:
setattr(tags, tag, filename)
to_check = ["album", "artist", "year", "albumartist"]
for prop in to_check:
p = getattr(tags, prop)
if (p is None) or (p == ""):
setattr(tags, prop, "Unknown")
to_round = ["bitrate", "duration"]
for prop in to_round:
try:
setattr(tags, prop, round(getattr(tags, prop)))
except TypeError:
setattr(tags, prop, 0)
to_int = ["track", "disc"]
for prop in to_int:
try:
setattr(tags, prop, int(getattr(tags, prop)))
except (ValueError, TypeError):
setattr(tags, prop, 1)
try:
tags.copyright = tags.extra["copyright"]
except KeyError:
tags.copyright = None
tags.albumhash = create_hash(tags.album, tags.albumartist)
tags.trackhash = create_hash(tags.artist, tags.album, tags.title)
tags.image = f"{tags.albumhash}.webp"
tags.folder = os.path.dirname(filepath)
tags.date = extract_date(tags.year)
tags.filepath = filepath
tags.filetype = filetype
tags = tags.__dict__
# delete all tag properties that start with _ (tinytag internals)
for tag in list(tags):
if tag.startswith("_"):
del tags[tag]
to_delete = [
"filesize",
"audio_offset",
"channels",
"comment",
"composer",
"disc_total",
"extra",
"samplerate",
"track_total",
"year",
]
for tag in to_delete:
del tags[tag]
return tags
+19
View File
@@ -0,0 +1,19 @@
"""
This library contains all the functions related to tracks.
"""
import os
from tqdm import tqdm
from app.db.store import Store
from app.db.sqlite.tracks import SQLiteTrackMethods as tdb
def validate_tracks() -> None:
"""
Gets all songs under the ~/ directory.
"""
for track in tqdm(Store.tracks, desc="Removing deleted tracks"):
if not os.path.exists(track.filepath):
Store.tracks.remove(track)
tdb.remove_track_by_filepath(track.filepath)
+172
View File
@@ -0,0 +1,172 @@
"""
This library contains the classes and functions related to the watchdog file watcher.
"""
import os
import time
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
from app.db.sqlite.tracks import SQLiteManager
from app.db.sqlite.tracks import SQLiteTrackMethods as db
from app.db.store import Store
from app.lib.taglib import get_tags
from app.logger import log
from app.models import Artist, Track
class Watcher:
"""
Contains the methods for initializing and starting watchdog.
"""
home_dir = os.path.expanduser("~")
dirs = [home_dir]
observers: list[Observer] = []
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = Handler()
for dir_ in self.dirs:
self.observer.schedule(
event_handler, os.path.realpath(dir_), recursive=True
)
self.observers.append(self.observer)
try:
self.observer.start()
except OSError:
log.error("Could not start watchdog.")
return
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
for obsv in self.observers:
obsv.unschedule_all()
obsv.stop()
for obsv in self.observers:
obsv.join()
def add_track(filepath: str) -> None:
"""
Processes the audio tags for a given file ands add them to the database and store.
Then creates the folder, album and artist objects for the added track and adds them to the store.
"""
tags = get_tags(filepath)
if tags is None:
return
with SQLiteManager() as cur:
db.remove_track_by_filepath(tags["filepath"])
db.insert_one_track(tags, cur)
track = Track(**tags)
Store().add_track(track)
Store.add_folder(track.folder)
if not Store.album_exists(track.albumhash):
album = Store.create_album(track)
Store.add_album(album)
artists: list[Artist] = track.artist + track.albumartist # type: ignore
for artist in artists:
if not Store.artist_exists(artist.artisthash):
Store.add_artist(Artist(artist.name))
def remove_track(filepath: str) -> None:
"""
Removes a track from the music dict.
"""
try:
track = Store.get_tracks_by_filepaths([filepath])[0]
except IndexError:
return
db.remove_track_by_filepath(filepath)
Store.remove_track_by_filepath(filepath)
empty_album = Store.count_tracks_by_hash(track.albumhash) > 0
if empty_album:
Store.remove_album_by_hash(track.albumhash)
artists: list[Artist] = track.artist + track.albumartist # type: ignore
for artist in artists:
empty_artist = not Store.artist_has_tracks(artist.artisthash)
if empty_artist:
Store.remove_artist_by_hash(artist.artisthash)
empty_folder = Store.is_empty_folder(track.folder)
if empty_folder:
Store.remove_folder(track.folder)
class Handler(PatternMatchingEventHandler):
files_to_process = []
def __init__(self):
log.info("✅ started watchdog")
PatternMatchingEventHandler.__init__(
self,
patterns=["*.flac", "*.mp3"],
ignore_directories=True,
case_sensitive=False,
)
def on_created(self, event):
"""
Fired when a supported file is created.
"""
self.files_to_process.append(event.src_path)
def on_deleted(self, event):
"""
Fired when a delete event occurs on a supported file.
"""
remove_track(event.src_path)
def on_moved(self, event):
"""
Fired when a move event occurs on a supported file.
"""
trash = "share/Trash"
if trash in event.dest_path:
remove_track(event.src_path)
elif trash in event.src_path:
add_track(event.dest_path)
elif trash not in event.dest_path and trash not in event.src_path:
add_track(event.dest_path)
remove_track(event.src_path)
def on_closed(self, event):
"""
Fired when a created file is closed.
"""
try:
self.files_to_process.remove(event.src_path)
if os.path.getsize(event.src_path) > 0:
add_track(event.src_path)
except ValueError:
pass
# watcher = Watcher()
+49
View File
@@ -0,0 +1,49 @@
"""
Logger module
"""
import logging
class CustomFormatter(logging.Formatter):
"""
Custom log formatter
"""
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
# format = (
# "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
# )
format_ = "[%(asctime)s]@%(name)s%(message)s"
FORMATS = {
logging.DEBUG: grey + format_ + reset,
logging.INFO: grey + format_ + reset,
logging.WARNING: yellow + format_ + reset,
logging.ERROR: red + format_ + reset,
logging.CRITICAL: bold_red + format_ + reset,
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt, "%H:%M:%S")
return formatter.format(record)
log = logging.getLogger("swing")
log.propagate = False
log.setLevel(logging.DEBUG)
# create console handler with a higher log level
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(CustomFormatter())
log.addHandler(handler)
# copied from: https://stackoverflow.com/a/56944256:
+206
View File
@@ -0,0 +1,206 @@
"""
Contains all the models for objects generation and typing.
"""
import dataclasses
import json
from dataclasses import dataclass
from app import utils
@dataclass(slots=True)
class Artist:
"""
Artist class
"""
name: str
artisthash: str = ""
image: str = ""
trackcount: int = 0
albumcount: int = 0
duration: int = 0
colors: list[str] = dataclasses.field(default_factory=list)
is_favorite: bool = False
def __post_init__(self):
self.artisthash = utils.create_hash(self.name, decode=True)
self.image = self.artisthash + ".webp"
self.colors = json.loads(str(self.colors))
@dataclass(slots=True)
class Track:
"""
Track class
"""
album: str
albumartist: str | list[Artist]
albumhash: str
artist: str | list[Artist]
bitrate: int
copyright: str
date: str
disc: int
duration: int
filepath: str
folder: str
genre: str | list[str]
title: str
track: int
trackhash: str
filetype: str = ""
image: str = ""
artist_hashes: list[str] = dataclasses.field(default_factory=list)
is_favorite: bool = False
def __post_init__(self):
if self.artist is not None:
artist_str = str(self.artist).split(", ")
self.artist_hashes = [utils.create_hash(a, decode=True) for a in artist_str]
self.artist = [Artist(a) for a in artist_str]
albumartists = str(self.albumartist).split(", ")
self.albumartist = [Artist(a) for a in albumartists]
self.filetype = self.filepath.rsplit(".", maxsplit=1)[-1]
self.image = self.albumhash + ".webp"
if self.genre is not None:
self.genre = str(self.genre).replace("/", ", ")
self.genre = str(self.genre).lower().split(", ")
@dataclass
class Album:
"""
Creates an album object
"""
albumhash: str
title: str
albumartists: list[Artist]
albumartisthash: str = ""
image: str = ""
count: int = 0
duration: int = 0
colors: list[str] = dataclasses.field(default_factory=list)
date: str = ""
is_soundtrack: bool = False
is_compilation: bool = False
is_single: bool = False
is_EP: bool = False
is_favorite: bool = False
genres: list[str] = dataclasses.field(default_factory=list)
def __post_init__(self):
self.image = self.albumhash + ".webp"
self.albumartisthash = "-".join(a.artisthash for a in self.albumartists)
def set_colors(self, colors: list[str]):
self.colors = colors
def check_type(self):
"""
Runs all the checks to determine the type of album.
"""
self.is_soundtrack = self.check_is_soundtrack()
if self.is_soundtrack:
return
self.is_compilation = self.check_is_compilation()
if self.is_compilation:
return
self.is_EP = self.check_is_EP()
def check_is_soundtrack(self) -> bool:
"""
Checks if the album is a soundtrack.
"""
keywords = ["motion picture", "soundtrack"]
for keyword in keywords:
if keyword in self.title.lower():
return True
return False
def check_is_compilation(self) -> bool:
"""
Checks if the album is a compilation.
"""
artists = [a.name for a in self.albumartists] # type: ignore
artists = "".join(artists).lower()
return "various artists" in artists
def check_is_EP(self) -> bool:
"""
Checks if the album is an EP.
"""
return self.title.strip().endswith(" EP")
def check_is_single(self, tracks: list[Track]):
"""
Checks if the album is a single.
"""
if (
len(tracks) == 1
and tracks[0].title == self.title
and tracks[0].track == 1
and tracks[0].disc == 1
):
self.is_single = True
@dataclass
class Playlist:
"""Creates playlist objects"""
id: int
artisthashes: str | list[str]
banner_pos: int
has_gif: str | bool
image: str
last_updated: str
name: str
trackhashes: str | list[str]
thumb: str = ""
count: int = 0
duration: int = 0
def __post_init__(self):
self.trackhashes = json.loads(str(self.trackhashes))
self.artisthashes = json.loads(str(self.artisthashes))
self.count = len(self.trackhashes)
self.has_gif = bool(int(self.has_gif))
if self.image is not None:
self.thumb = "thumb_" + self.image
else:
self.image = "None"
self.thumb = "None"
@dataclass
class Folder:
name: str
path: str
has_tracks: bool
is_sym: bool = False
path_hash: str = ""
class FavType:
"""Favorite types enum"""
track = "track"
album = "album"
artist = "artist"
+55
View File
@@ -0,0 +1,55 @@
from datetime import datetime
def date_string_to_time_passed(prev_date: str) -> str:
"""
Converts a date string to time passed. eg. 2 minutes ago, 1 hour ago, yesterday, 2 days ago, 2 weeks ago, etc.
"""
now = datetime.now()
then = datetime.strptime(prev_date, "%Y-%m-%d %H:%M:%S")
diff = now - then
days = diff.days
if days < 0:
return "in the future"
if days == 0:
seconds = diff.seconds
if seconds < 15:
return "now"
if seconds < 60:
return str(seconds) + " seconds ago"
if seconds < 3600:
return str(seconds // 60) + " minutes ago"
return str(seconds // 3600) + " hours ago"
if days == 1:
return "yesterday"
if days < 7:
return str(days) + " days ago"
if days < 30:
if days < 14:
return "1 week ago"
return str(days // 7) + " weeks ago"
if days < 365:
if days < 60:
return "1 month ago"
return str(days // 30) + " months ago"
if days > 365:
if days < 730:
return "1 year ago"
return str(days // 365) + " years ago"
return "I honestly don't know"
+103
View File
@@ -0,0 +1,103 @@
"""
Contains default configs
"""
import multiprocessing
import os
APP_VERSION = "Swing v0.0.1.alpha"
# paths
CONFIG_FOLDER = ".swing"
HOME_DIR = os.path.expanduser("~")
APP_DIR = os.path.join(HOME_DIR, CONFIG_FOLDER)
IMG_PATH = os.path.join(APP_DIR, "images")
ARTIST_IMG_PATH = os.path.join(IMG_PATH, "artists")
ARTIST_IMG_SM_PATH = os.path.join(ARTIST_IMG_PATH, "small")
ARTIST_IMG_LG_PATH = os.path.join(ARTIST_IMG_PATH, "large")
THUMBS_PATH = os.path.join(IMG_PATH, "thumbnails")
SM_THUMB_PATH = os.path.join(THUMBS_PATH, "small")
LG_THUMBS_PATH = os.path.join(THUMBS_PATH, "large")
# TEST_DIR = "/home/cwilvx/Downloads/Telegram Desktop"
# TEST_DIR = "/mnt/dfc48e0f-103b-426e-9bf9-f25d3743bc96/Music/Chill/Wolftyla Radio"
# HOME_DIR = TEST_DIR
# URLS
IMG_BASE_URI = "http://127.0.0.1:8900/images/"
IMG_ARTIST_URI = IMG_BASE_URI + "artists/"
IMG_THUMB_URI = IMG_BASE_URI + "thumbnails/"
IMG_PLAYLIST_URI = IMG_BASE_URI + "playlists/"
# defaults
DEFAULT_ARTIST_IMG = IMG_ARTIST_URI + "0.webp"
LAST_FM_API_KEY = "762db7a44a9e6fb5585661f5f2bdf23a"
CPU_COUNT = multiprocessing.cpu_count()
THUMB_SIZE = 400
SM_THUMB_SIZE = 64
SM_ARTIST_IMG_SIZE = 64
"""
The size of extracted images in pixels
"""
LOGGER_ENABLE: bool = True
FILES = ["flac", "mp3", "wav", "m4a"]
SUPPORTED_FILES = tuple(f".{file}" for file in FILES)
SUPPORTED_IMAGES = (".jpg", ".png", ".webp", ".jpeg")
SUPPORTED_DIR_IMAGES = [
"folder",
"cover",
"album",
"front",
]
# ===== DB =========
USE_MONGO = False
# ===== SQLite =====
APP_DB_NAME = "swing.db"
USER_DATA_DB_NAME = "userdata.db"
APP_DB_PATH = os.path.join(APP_DIR, APP_DB_NAME)
USERDATA_DB_PATH = os.path.join(APP_DIR, USER_DATA_DB_NAME)
# ===== Store =====
USE_STORE = True
HELP_MESSAGE = """
Usage: swing [options]
Options:
--build: Build the application
--host: Set the host
--port: Set the port
--help, -h: Show this help message
--version, -v: Show the version
"""
class TCOLOR:
"""
Terminal colors
"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
# credits: https://stackoverflow.com/a/287944
+128
View File
@@ -0,0 +1,128 @@
"""
Contains the functions to prepare the server for use.
"""
import os
import shutil
from configparser import ConfigParser
from app import settings
from app.db.sqlite import create_connection, create_tables, queries
from app.db.store import Store
from app.settings import APP_DB_PATH, USERDATA_DB_PATH
from app.utils import get_home_res_path
config = ConfigParser()
config_path = get_home_res_path("pyinstaller.config.ini")
config.read(config_path)
try:
IS_BUILD = config["DEFAULT"]["BUILD"] == "True"
except KeyError:
# If the key doesn't exist, it means that the app is being executed in dev mode.
IS_BUILD = False
class CopyFiles:
"""Copies assets to the app directory."""
def __init__(self) -> None:
assets_dir = "assets"
if IS_BUILD:
assets_dir = get_home_res_path("assets")
files = [
{
"src": assets_dir,
"dest": os.path.join(settings.APP_DIR, "assets"),
"is_dir": True,
}
]
for entry in files:
src = os.path.join(os.getcwd(), entry["src"])
if entry["is_dir"]:
shutil.copytree(
src,
entry["dest"],
ignore=shutil.ignore_patterns(
"*.pyc",
),
copy_function=shutil.copy2,
dirs_exist_ok=True,
)
break
shutil.copy2(src, entry["dest"])
def create_config_dir() -> None:
"""
Creates the config directory if it doesn't exist.
"""
home_dir = os.path.expanduser("~")
config_folder = os.path.join(home_dir, settings.CONFIG_FOLDER)
thumb_path = os.path.join("images", "thumbnails")
small_thumb_path = os.path.join(thumb_path, "small")
large_thumb_path = os.path.join(thumb_path, "large")
artist_img_path = os.path.join("images", "artists")
small_artist_img_path = os.path.join(artist_img_path, "small")
large_artist_img_path = os.path.join(artist_img_path, "large")
playlist_img_path = os.path.join("images", "playlists")
dirs = [
"", # creates the config folder
"images",
thumb_path,
small_thumb_path,
large_thumb_path,
artist_img_path,
small_artist_img_path,
large_artist_img_path,
playlist_img_path,
]
for _dir in dirs:
path = os.path.join(config_folder, _dir)
exists = os.path.exists(path)
if not exists:
os.makedirs(path)
os.chmod(path, 0o755)
CopyFiles()
def setup_sqlite():
"""
Create Sqlite databases and tables.
"""
# if os.path.exists(DB_PATH):
# os.remove(DB_PATH)
app_db_conn = create_connection(APP_DB_PATH)
playlist_db_conn = create_connection(USERDATA_DB_PATH)
create_tables(app_db_conn, queries.CREATE_APPDB_TABLES)
create_tables(playlist_db_conn, queries.CREATE_USERDATA_TABLES)
app_db_conn.close()
playlist_db_conn.close()
Store.load_all_tracks()
Store.process_folders()
Store.load_albums()
Store.load_artists()
def run_setup():
create_config_dir()
setup_sqlite()
+226
View File
@@ -0,0 +1,226 @@
"""
This module contains mini functions for the server.
"""
import os
import hashlib
from pathlib import Path
import threading
from datetime import datetime
from unidecode import unidecode
import requests
from app import models
from app.settings import SUPPORTED_FILES
CWD = Path(__file__).parent.resolve()
def background(func):
"""
a threading decorator
use @background above the function you want to run in the background
"""
def background_func(*a, **kw):
threading.Thread(target=func, args=a, kwargs=kw).start()
return background_func
def run_fast_scandir(__dir: str, full=False) -> tuple[list[str], list[str]]:
"""
Scans a directory for files with a specific extension. Returns a list of files and folders in the directory.
"""
subfolders = []
files = []
for f in os.scandir(__dir):
if f.is_dir() and not f.name.startswith("."):
subfolders.append(f.path)
if f.is_file():
ext = os.path.splitext(f.name)[1].lower()
if ext in SUPPORTED_FILES:
files.append(f.path)
if full or len(files) == 0:
for _dir in list(subfolders):
sf, f = run_fast_scandir(_dir, full=True)
subfolders.extend(sf)
files.extend(f)
return subfolders, files
def remove_duplicates(tracks: list[models.Track]) -> list[models.Track]:
"""
Removes duplicate tracks from a list of tracks.
"""
hashes = []
for track in tracks:
if track.trackhash not in hashes:
hashes.append(track.trackhash)
tracks = sorted(tracks, key=lambda x: x.trackhash)
tracks = UseBisection(tracks, "trackhash", hashes)()
return [t for t in tracks if t is not None]
def create_hash(*args: str, decode=False, limit=7) -> str:
"""
Creates a simple hash for an album
"""
string = "".join(args)
if decode:
string = unidecode(string)
string = string.lower().strip().replace(" ", "")
string = "".join(t for t in string if t.isalnum())
string = string.encode("utf-8")
string = hashlib.sha256(string).hexdigest()
return string[-limit:]
def create_folder_hash(*args: str, limit=7) -> str:
"""
Creates a simple hash for an album
"""
strings = [s.lower().strip().replace(" ", "") for s in args]
strings = ["".join([t for t in s if t.isalnum()]) for s in strings]
strings = [s.encode("utf-8") for s in strings]
strings = [hashlib.sha256(s).hexdigest()[-limit:] for s in strings]
return "".join(strings)
def create_new_date():
"""
It creates a new date and time string in the format of "YYYY-MM-DD HH:MM:SS"
:return: A string of the current date and time.
"""
now = datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S")
class UseBisection:
"""
Uses bisection to find a list of items in another list.
returns a list of found items with `None` items being not found
items.
"""
def __init__(self, source: list, search_from: str, queries: list[str]) -> None:
self.source_list = source
self.queries_list = queries
self.attr = search_from
def find(self, query: str):
left = 0
right = len(self.source_list) - 1
while left <= right:
mid = (left + right) // 2
if self.source_list[mid].__getattribute__(self.attr) == query:
return self.source_list[mid]
elif self.source_list[mid].__getattribute__(self.attr) > query:
right = mid - 1
else:
left = mid + 1
return None
def __call__(self) -> list:
if len(self.source_list) == 0:
return [None]
return [self.find(query) for query in self.queries_list]
class Ping:
"""
Checks if there is a connection to the internet by pinging google.com
"""
@staticmethod
def __call__() -> bool:
try:
requests.get("https://google.com", timeout=10)
return True
except (requests.exceptions.ConnectionError, requests.Timeout):
return False
def get_artists_from_tracks(tracks: list[models.Track]) -> set[str]:
"""
Extracts all artists from a list of tracks. Returns a list of Artists.
"""
artists = set()
master_artist_list = [[x.name for x in t.artist] for t in tracks] # type: ignore
artists = artists.union(*master_artist_list)
return artists
def get_albumartists(albums: list[models.Album]) -> set[str]:
artists = set()
# master_artist_list = [a.albumartists for a in albums]
for album in albums:
albumartists = [a.name for a in album.albumartists] # type: ignore
artists.update(albumartists)
# return [models.Artist(a) for a in artists]
return artists
def get_all_artists(
tracks: list[models.Track], albums: list[models.Album]
) -> list[models.Artist]:
artists_from_tracks = get_artists_from_tracks(tracks)
artist_from_albums = get_albumartists(albums)
artists = list(artists_from_tracks.union(artist_from_albums))
artists = sorted(artists)
lower_artists = set(a.lower().strip() for a in artists)
indices = [[ar.lower().strip() for ar in artists].index(a) for a in lower_artists]
artists = [artists[i] for i in indices]
return [models.Artist(a) for a in artists]
def bisection_search_string(strings: list[str], target: str) -> str | None:
"""
Finds a string in a list of strings using bisection search.
"""
if not strings:
return None
strings = sorted(strings)
left = 0
right = len(strings) - 1
while left <= right:
middle = (left + right) // 2
if strings[middle] == target:
return strings[middle]
if strings[middle] < target:
left = middle + 1
else:
right = middle - 1
return None
def get_home_res_path(filename: str):
"""
Returns a path to resources in the home directory of this project. Used to resolve resources in builds.
"""
return (CWD / ".." / filename).resolve()