mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-05 13:03:02 +00:00
Save complete tracks and albums to the db for faster startup
- refactor function locations - add logger - check for new tracks instead of re-processing all files
This commit is contained in:
+65
-15
@@ -1,14 +1,37 @@
|
||||
"""
|
||||
This library contains all the functions related to albums.
|
||||
"""
|
||||
import random
|
||||
import urllib
|
||||
from pprint import pprint
|
||||
from typing import List
|
||||
from progress.bar import Bar
|
||||
|
||||
from app import api
|
||||
from app import functions
|
||||
from app import models
|
||||
from app.lib import trackslib
|
||||
from app import instances
|
||||
|
||||
|
||||
def get_all_albums() -> List[models.Album]:
|
||||
"""
|
||||
Returns a list of album objects for all albums in the database.
|
||||
"""
|
||||
print("Getting all albums...")
|
||||
|
||||
albums: List[models.Album] = []
|
||||
db_albums = instances.album_instance.get_all_albums()
|
||||
|
||||
_bar = Bar("Creating albums", max=len(db_albums))
|
||||
for album in db_albums:
|
||||
aa = models.Album(album)
|
||||
albums.append(aa)
|
||||
_bar.next()
|
||||
|
||||
_bar.finish()
|
||||
|
||||
return albums
|
||||
|
||||
|
||||
def create_everything() -> List[models.Track]:
|
||||
@@ -16,15 +39,39 @@ def create_everything() -> List[models.Track]:
|
||||
Creates album objects for all albums and returns
|
||||
a list of track objects
|
||||
"""
|
||||
albums: list[models.Album] = functions.get_all_albums()
|
||||
albums: list[models.Album] = get_all_albums()
|
||||
|
||||
api.ALBUMS.clear()
|
||||
api.ALBUMS.extend(albums)
|
||||
api.ALBUMS = albums
|
||||
api.ALBUMS.sort(key=lambda x: x.title)
|
||||
|
||||
tracks = trackslib.create_all_tracks()
|
||||
|
||||
api.TRACKS.clear()
|
||||
api.TRACKS.extend(tracks)
|
||||
api.TRACKS.sort(key=lambda x: x.title)
|
||||
|
||||
|
||||
def find_album(albumtitle: str, artist: str) -> models.Album:
|
||||
"""
|
||||
Finds an album by album title and artist.
|
||||
"""
|
||||
left = 0
|
||||
right = len(api.ALBUMS) - 1
|
||||
iter = 0
|
||||
|
||||
while left <= right:
|
||||
iter += 1
|
||||
mid = (left + right) // 2
|
||||
|
||||
if api.ALBUMS[mid].title == albumtitle and api.ALBUMS[mid].artist == artist:
|
||||
return mid
|
||||
|
||||
if api.ALBUMS[mid].title < albumtitle:
|
||||
left = mid + 1
|
||||
else:
|
||||
right = mid - 1
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_album_duration(album: list) -> int:
|
||||
@@ -40,32 +87,40 @@ def get_album_duration(album: list) -> int:
|
||||
return album_duration
|
||||
|
||||
|
||||
def use_defaults() -> str:
|
||||
"""
|
||||
Returns a path to a random image in the defaults directory.
|
||||
"""
|
||||
path = "defaults/" + str(random.randint(0, 20)) + ".webp"
|
||||
return path
|
||||
|
||||
|
||||
def get_album_image(album: list) -> str:
|
||||
"""
|
||||
Gets the image of an album.
|
||||
"""
|
||||
|
||||
for track in album:
|
||||
img_p = (track["album"] + track["albumartist"] + ".webp").replace(
|
||||
"/", "::")
|
||||
img_p = (track["album"] + track["albumartist"] + ".webp").replace("/", "::")
|
||||
img = functions.extract_thumb(track["filepath"], webp_path=img_p)
|
||||
|
||||
if img is not None:
|
||||
return img
|
||||
|
||||
return functions.use_defaults()
|
||||
return use_defaults()
|
||||
|
||||
|
||||
def get_album_tracks(album: str, artist: str) -> List:
|
||||
tracks = []
|
||||
|
||||
for track in api.PRE_TRACKS:
|
||||
for track in api.DB_TRACKS:
|
||||
try:
|
||||
if track["album"] == album and track["albumartist"] == artist:
|
||||
tracks.append(track)
|
||||
except TypeError:
|
||||
pprint(track, indent=4)
|
||||
print(album, artist)
|
||||
|
||||
return tracks
|
||||
|
||||
|
||||
@@ -85,19 +140,14 @@ def create_album(track) -> models.Album:
|
||||
album["date"] = album_tracks[0]["date"]
|
||||
|
||||
album["artistimage"] = urllib.parse.quote_plus(
|
||||
album_tracks[0]["albumartist"] + ".webp")
|
||||
album_tracks[0]["albumartist"] + ".webp"
|
||||
)
|
||||
|
||||
album["image"] = get_album_image(album_tracks)
|
||||
|
||||
return models.Album(album)
|
||||
|
||||
|
||||
def find_album(albumtitle, artist):
|
||||
for album in api.ALBUMS:
|
||||
if album.album == albumtitle and album.artist == artist:
|
||||
return album
|
||||
|
||||
|
||||
def search_albums_by_name(query: str) -> List[models.Album]:
|
||||
"""
|
||||
Searches albums by album name.
|
||||
@@ -106,7 +156,7 @@ def search_albums_by_name(query: str) -> List[models.Album]:
|
||||
artist_albums: List[models.Album] = []
|
||||
|
||||
for album in api.ALBUMS:
|
||||
if query.lower() in album.album.lower():
|
||||
if query.lower() in album.title.lower():
|
||||
title_albums.append(album)
|
||||
|
||||
for album in api.ALBUMS:
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
import colorgram
|
||||
from progress.bar import Bar
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
|
||||
from app import api, instances
|
||||
from app.lib.taglib import return_album_art
|
||||
|
||||
|
||||
def get_image_colors(image) -> list:
|
||||
"""Extracts 2 of the most dominant colors from an image."""
|
||||
try:
|
||||
colors = sorted(colorgram.extract(image, 2), key=lambda c: c.hsl.h)
|
||||
except OSError:
|
||||
return []
|
||||
|
||||
formatted_colors = []
|
||||
|
||||
for color in colors:
|
||||
color = f"rgb({color.rgb.r}, {color.rgb.g}, {color.rgb.b})"
|
||||
formatted_colors.append(color)
|
||||
|
||||
return formatted_colors
|
||||
|
||||
|
||||
def save_track_colors(img, filepath) -> None:
|
||||
"""Saves the track colors to the database"""
|
||||
|
||||
track_colors = get_image_colors(img)
|
||||
|
||||
tc_dict = {
|
||||
"filepath": filepath,
|
||||
"colors": track_colors,
|
||||
}
|
||||
|
||||
instances.track_color_instance.insert_track_color(tc_dict)
|
||||
|
||||
|
||||
def save_t_colors():
|
||||
_bar = Bar("Processing image colors", max=len(api.DB_TRACKS))
|
||||
|
||||
for track in api.DB_TRACKS:
|
||||
filepath = track["filepath"]
|
||||
album_art = return_album_art(filepath)
|
||||
|
||||
if album_art is not None:
|
||||
img = Image.open(BytesIO(album_art))
|
||||
save_track_colors(img, filepath)
|
||||
|
||||
_bar.next()
|
||||
|
||||
_bar.finish()
|
||||
@@ -1,5 +1,5 @@
|
||||
import time
|
||||
from typing import List
|
||||
from typing import List, Set
|
||||
|
||||
from app import api
|
||||
from app import helpers
|
||||
@@ -31,17 +31,18 @@ def create_folder(foldername: str) -> models.Folder:
|
||||
return models.Folder(folder)
|
||||
|
||||
|
||||
def create_all_folders() -> List[models.Folder]:
|
||||
folders_: List[models.Folder] = []
|
||||
def create_all_folders() -> Set[models.Folder]:
|
||||
folders: List[models.Folder] = []
|
||||
_bar = Bar("Creating folders", max=len(api.VALID_FOLDERS))
|
||||
|
||||
for foldername in api.VALID_FOLDERS:
|
||||
folder = create_folder(foldername)
|
||||
folders_.append(folder)
|
||||
folders.append(folder)
|
||||
|
||||
_bar.next()
|
||||
_bar.finish()
|
||||
|
||||
return folders_
|
||||
return folders
|
||||
|
||||
|
||||
def get_subdirs(foldername: str) -> List[models.Folder]:
|
||||
@@ -76,5 +77,4 @@ def run_scandir():
|
||||
folders_ = create_all_folders()
|
||||
"""Create all the folder objects before clearing api.FOLDERS"""
|
||||
|
||||
api.FOLDERS.clear()
|
||||
api.FOLDERS.extend(folders_)
|
||||
api.FOLDERS = folders_
|
||||
|
||||
@@ -0,0 +1,182 @@
|
||||
import os
|
||||
from mutagen.flac import FLAC
|
||||
from mutagen.id3 import ID3
|
||||
from mutagen.flac import MutagenError
|
||||
import mutagen
|
||||
import urllib
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
|
||||
from app import settings
|
||||
|
||||
|
||||
def return_album_art(filepath: str):
|
||||
"""
|
||||
Returns the album art for a given audio file.
|
||||
"""
|
||||
|
||||
if filepath.endswith(".flac"):
|
||||
try:
|
||||
audio = FLAC(filepath)
|
||||
return audio.pictures[0].data
|
||||
except:
|
||||
return None
|
||||
elif filepath.endswith(".mp3"):
|
||||
try:
|
||||
audio = ID3(filepath)
|
||||
return audio.getall("APIC")[0].data
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
def extract_thumb(audio_file_path: str, webp_path: str) -> str:
|
||||
"""
|
||||
Extracts the thumbnail from an audio file. Returns the path to the thumbnail.
|
||||
"""
|
||||
img_path = os.path.join(settings.THUMBS_PATH, webp_path)
|
||||
|
||||
if os.path.exists(img_path):
|
||||
return urllib.parse.quote(webp_path)
|
||||
|
||||
album_art = return_album_art(audio_file_path)
|
||||
|
||||
if album_art is not None:
|
||||
img = Image.open(BytesIO(album_art))
|
||||
|
||||
try:
|
||||
small_img = img.resize((250, 250), Image.ANTIALIAS)
|
||||
small_img.save(img_path, format="webp")
|
||||
except OSError:
|
||||
try:
|
||||
png = img.convert("RGB")
|
||||
small_img = png.resize((250, 250), Image.ANTIALIAS)
|
||||
small_img.save(webp_path, format="webp")
|
||||
except:
|
||||
return None
|
||||
|
||||
return urllib.parse.quote(webp_path)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def parse_artist_tag(audio):
|
||||
"""
|
||||
Parses the artist tag from an audio file.
|
||||
"""
|
||||
try:
|
||||
artists = audio["artist"][0]
|
||||
except (KeyError, IndexError):
|
||||
artists = "Unknown"
|
||||
|
||||
return artists
|
||||
|
||||
|
||||
def parse_title_tag(audio, full_path: str):
|
||||
"""
|
||||
Parses the title tag from an audio file.
|
||||
"""
|
||||
try:
|
||||
title = audio["title"][0]
|
||||
except (KeyError, IndexError):
|
||||
title = full_path.split("/")[-1]
|
||||
|
||||
return title
|
||||
|
||||
|
||||
def parse_album_artist_tag(audio):
|
||||
"""
|
||||
Parses the album artist tag from an audio file.
|
||||
"""
|
||||
try:
|
||||
albumartist = audio["albumartist"][0]
|
||||
except (KeyError, IndexError):
|
||||
albumartist = "Unknown"
|
||||
|
||||
return albumartist
|
||||
|
||||
|
||||
def parse_album_tag(audio, full_path: str):
|
||||
"""
|
||||
Parses the album tag from an audio file.
|
||||
"""
|
||||
try:
|
||||
album = audio["album"][0]
|
||||
except (KeyError, IndexError):
|
||||
album = full_path.split("/")[-1]
|
||||
|
||||
return album
|
||||
|
||||
|
||||
def parse_genre_tag(audio):
|
||||
"""
|
||||
Parses the genre tag from an audio file.
|
||||
"""
|
||||
try:
|
||||
genre = audio["genre"][0]
|
||||
except (KeyError, IndexError):
|
||||
genre = "Unknown"
|
||||
|
||||
return genre
|
||||
|
||||
|
||||
def parse_date_tag(audio):
|
||||
"""
|
||||
Parses the date tag from an audio file.
|
||||
"""
|
||||
try:
|
||||
date = audio["date"][0]
|
||||
except (KeyError, IndexError):
|
||||
date = "Unknown"
|
||||
|
||||
return date
|
||||
|
||||
|
||||
def parse_track_number(audio):
|
||||
"""
|
||||
Parses the track number from an audio file.
|
||||
"""
|
||||
try:
|
||||
track_number = audio["tracknumber"][0]
|
||||
except (KeyError, IndexError):
|
||||
track_number = "Unknown"
|
||||
|
||||
return track_number
|
||||
|
||||
|
||||
def parse_disk_number(audio):
|
||||
"""
|
||||
Parses the disk number from an audio file.
|
||||
"""
|
||||
try:
|
||||
disk_number = audio["discnumber"][0]
|
||||
except (KeyError, IndexError):
|
||||
disk_number = "Unknown"
|
||||
|
||||
return disk_number
|
||||
|
||||
|
||||
def get_tags(fullpath: str) -> dict:
|
||||
"""
|
||||
Returns a dictionary of tags for a given file.
|
||||
"""
|
||||
try:
|
||||
audio = mutagen.File(fullpath, easy=True)
|
||||
except MutagenError:
|
||||
return None
|
||||
|
||||
tags = {
|
||||
"artists": parse_artist_tag(audio),
|
||||
"title": parse_title_tag(audio, fullpath),
|
||||
"albumartist": parse_album_artist_tag(audio),
|
||||
"album": parse_album_tag(audio, fullpath),
|
||||
"genre": parse_genre_tag(audio),
|
||||
"date": parse_date_tag(audio)[:4],
|
||||
"tracknumber": parse_track_number(audio),
|
||||
"discnumber": parse_disk_number(audio),
|
||||
"length": round(audio.info.length),
|
||||
"bitrate": round(int(audio.info.bitrate) / 1000),
|
||||
"filepath": fullpath,
|
||||
"folder": os.path.dirname(fullpath),
|
||||
}
|
||||
|
||||
return tags
|
||||
@@ -18,18 +18,14 @@ def create_all_tracks() -> List[models.Track]:
|
||||
"""
|
||||
tracks: list[models.Track] = []
|
||||
|
||||
_bar = Bar("Creating tracks", max=len(api.PRE_TRACKS))
|
||||
_bar = Bar("Creating tracks", max=len(api.DB_TRACKS))
|
||||
|
||||
for track in api.PRE_TRACKS:
|
||||
for track in api.DB_TRACKS:
|
||||
try:
|
||||
os.chmod(track["filepath"], 0o755)
|
||||
except FileNotFoundError:
|
||||
instances.songs_instance.remove_song_by_filepath(track["filepath"])
|
||||
api.PRE_TRACKS.remove(track)
|
||||
|
||||
album = albumslib.find_album(track["album"], track["albumartist"])
|
||||
|
||||
track["image"] = album.image
|
||||
instances.tracks_instance.remove_song_by_id(track["_id"]["$oid"])
|
||||
api.DB_TRACKS.remove(track)
|
||||
|
||||
tracks.append(models.Track(track))
|
||||
_bar.next()
|
||||
|
||||
+11
-10
@@ -8,11 +8,12 @@ import os
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import PatternMatchingEventHandler
|
||||
|
||||
from app import instances, functions
|
||||
from app import instances
|
||||
from app import models
|
||||
from app.lib import albumslib
|
||||
from app import api
|
||||
from app.lib import folderslib
|
||||
from app.lib.taglib import get_tags
|
||||
from app.lib.albumslib import create_album
|
||||
|
||||
|
||||
class OnMyWatch:
|
||||
@@ -46,14 +47,14 @@ def add_track(filepath: str) -> None:
|
||||
|
||||
Then creates a folder object for the added track and adds it to api.FOLDERS
|
||||
"""
|
||||
tags = functions.get_tags(filepath)
|
||||
tags = get_tags(filepath)
|
||||
|
||||
if tags is not None:
|
||||
instances.songs_instance.insert_song(tags)
|
||||
tags = instances.songs_instance.get_song_by_path(tags["filepath"])
|
||||
instances.tracks_instance.insert_song(tags)
|
||||
tags = instances.tracks_instance.get_song_by_path(tags["filepath"])
|
||||
|
||||
api.PRE_TRACKS.append(tags)
|
||||
album = albumslib.create_album(tags)
|
||||
api.DB_TRACKS.append(tags)
|
||||
album = create_album(tags)
|
||||
api.ALBUMS.append(album)
|
||||
|
||||
tags["image"] = album.image
|
||||
@@ -64,7 +65,7 @@ def add_track(filepath: str) -> None:
|
||||
if folder not in api.VALID_FOLDERS:
|
||||
api.VALID_FOLDERS.add(folder)
|
||||
f = folderslib.create_folder(folder)
|
||||
api.FOLDERS.append(f)
|
||||
api.FOLDERS.add(f)
|
||||
|
||||
|
||||
def remove_track(filepath: str) -> None:
|
||||
@@ -75,12 +76,12 @@ def remove_track(filepath: str) -> None:
|
||||
fpath = filepath.replace(fname, "")
|
||||
|
||||
try:
|
||||
trackid = instances.songs_instance.get_song_by_path(filepath)["_id"]["$oid"]
|
||||
trackid = instances.tracks_instance.get_song_by_path(filepath)["_id"]["$oid"]
|
||||
except TypeError:
|
||||
print(f"💙 Watchdog Error: Error removing track {filepath} TypeError")
|
||||
return
|
||||
|
||||
instances.songs_instance.remove_song_by_id(trackid)
|
||||
instances.tracks_instance.remove_song_by_id(trackid)
|
||||
|
||||
for track in api.TRACKS:
|
||||
if track.trackid == trackid:
|
||||
|
||||
Reference in New Issue
Block a user