mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-05 04:53:01 +00:00
move the populate function to separate file
This commit is contained in:
+11
-129
@@ -1,25 +1,18 @@
|
|||||||
"""
|
"""
|
||||||
This module contains functions for the server
|
This module contains functions for the server
|
||||||
"""
|
"""
|
||||||
import datetime
|
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
from dataclasses import asdict
|
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from app import api
|
from app import api
|
||||||
from app import helpers
|
from app import helpers
|
||||||
from app import instances
|
|
||||||
from app import models
|
|
||||||
from app import settings
|
from app import settings
|
||||||
from app.lib import albumslib
|
|
||||||
from app.lib import folderslib
|
|
||||||
from app.lib import watchdoge
|
from app.lib import watchdoge
|
||||||
from app.lib.taglib import get_tags
|
|
||||||
from app.logger import Log
|
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
from progress.bar import Bar
|
from progress.bar import Bar
|
||||||
|
from app.lib.populate import Populate
|
||||||
|
|
||||||
|
|
||||||
@helpers.background
|
@helpers.background
|
||||||
@@ -27,6 +20,7 @@ def reindex_tracks():
|
|||||||
"""
|
"""
|
||||||
Checks for new songs every 5 minutes.
|
Checks for new songs every 5 minutes.
|
||||||
"""
|
"""
|
||||||
|
is_underway = False
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
populate()
|
populate()
|
||||||
@@ -44,129 +38,11 @@ def start_watchdog():
|
|||||||
|
|
||||||
|
|
||||||
def populate():
|
def populate():
|
||||||
"""
|
pop = Populate()
|
||||||
Populate the database with all songs in the music directory
|
pop.run()
|
||||||
|
|
||||||
checks if the song is in the database, if not, it adds it
|
|
||||||
also checks if the album art exists in the image path, if not tries to
|
|
||||||
extract it.
|
|
||||||
"""
|
|
||||||
start = time.time()
|
|
||||||
db_tracks = instances.tracks_instance.get_all_tracks()
|
|
||||||
tagged_tracks = []
|
|
||||||
albums = []
|
|
||||||
folders = set()
|
|
||||||
|
|
||||||
files = helpers.run_fast_scandir(settings.HOME_DIR, [".flac", ".mp3"], full=True)[1]
|
|
||||||
|
|
||||||
_bar = Bar("Checking files", max=len(files))
|
|
||||||
for track in db_tracks:
|
|
||||||
if track["filepath"] in files:
|
|
||||||
files.remove(track["filepath"])
|
|
||||||
_bar.next()
|
|
||||||
|
|
||||||
_bar.finish()
|
|
||||||
|
|
||||||
Log(f"Found {len(files)} untagged files")
|
|
||||||
|
|
||||||
_bar = Bar("Tagging files", max=len(files))
|
|
||||||
for file in files:
|
|
||||||
tags = get_tags(file)
|
|
||||||
foldername = os.path.dirname(file)
|
|
||||||
folders.add(foldername)
|
|
||||||
|
|
||||||
if tags is not None:
|
|
||||||
tagged_tracks.append(tags)
|
|
||||||
api.DB_TRACKS.append(tags)
|
|
||||||
|
|
||||||
_bar.next()
|
|
||||||
_bar.finish()
|
|
||||||
|
|
||||||
Log(f"Tagged {len(tagged_tracks)} tracks")
|
|
||||||
|
|
||||||
pre_albums = []
|
|
||||||
|
|
||||||
for t in tagged_tracks:
|
|
||||||
a = {
|
|
||||||
"title": t["album"],
|
|
||||||
"artist": t["albumartist"],
|
|
||||||
}
|
|
||||||
|
|
||||||
if a not in pre_albums:
|
|
||||||
pre_albums.append(a)
|
|
||||||
|
|
||||||
exist_count = 0
|
|
||||||
_bar = Bar("Creating albums", max=len(pre_albums))
|
|
||||||
for aa in pre_albums:
|
|
||||||
albumindex = albumslib.find_album(aa["title"], aa["artist"])
|
|
||||||
|
|
||||||
if albumindex is None:
|
|
||||||
track = [
|
|
||||||
track
|
|
||||||
for track in tagged_tracks
|
|
||||||
if track["album"] == aa["title"]
|
|
||||||
and track["albumartist"] == aa["artist"]
|
|
||||||
][0]
|
|
||||||
|
|
||||||
album = albumslib.create_album(track)
|
|
||||||
api.ALBUMS.append(album)
|
|
||||||
albums.append(album)
|
|
||||||
|
|
||||||
instances.album_instance.insert_album(asdict(album))
|
|
||||||
|
|
||||||
else:
|
|
||||||
exist_count += 1
|
|
||||||
|
|
||||||
_bar.next()
|
|
||||||
|
|
||||||
_bar.finish()
|
|
||||||
|
|
||||||
Log(f"{exist_count} of {len(albums)} were already in the database")
|
|
||||||
|
|
||||||
_bar = Bar("Creating tracks", max=len(tagged_tracks))
|
|
||||||
for track in tagged_tracks:
|
|
||||||
try:
|
|
||||||
album_index = albumslib.find_album(track["album"], track["albumartist"])
|
|
||||||
album = api.ALBUMS[album_index]
|
|
||||||
|
|
||||||
track["image"] = album.image
|
|
||||||
upsert_id = instances.tracks_instance.insert_song(track)
|
|
||||||
|
|
||||||
track["_id"] = {"$oid": str(upsert_id)}
|
|
||||||
api.TRACKS.append(models.Track(track))
|
|
||||||
except TypeError:
|
|
||||||
# Bug: some albums are not found although they exist in `api.ALBUMS`. It has something to do with the bisection method used or sorting. Not sure yet.
|
|
||||||
pass
|
|
||||||
|
|
||||||
_bar.next()
|
|
||||||
|
|
||||||
_bar.finish()
|
|
||||||
|
|
||||||
Log(f"Added {len(tagged_tracks)} new tracks and {len(albums)} new albums")
|
|
||||||
|
|
||||||
_bar = Bar("Creating folders", max=len(folders))
|
|
||||||
for folder in folders:
|
|
||||||
if folder not in api.VALID_FOLDERS:
|
|
||||||
api.VALID_FOLDERS.add(folder)
|
|
||||||
fff = folderslib.create_folder(folder)
|
|
||||||
api.FOLDERS.append(fff)
|
|
||||||
|
|
||||||
_bar.next()
|
|
||||||
|
|
||||||
_bar.finish()
|
|
||||||
|
|
||||||
Log(f"Created {len(api.FOLDERS)} folders")
|
|
||||||
|
|
||||||
end = time.time()
|
|
||||||
|
|
||||||
print(
|
|
||||||
str(datetime.timedelta(seconds=round(end - start)))
|
|
||||||
+ " elapsed for "
|
|
||||||
+ str(len(files))
|
|
||||||
+ " files"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
|
@helpers.background
|
||||||
def fetch_image_path(artist: str) -> str or None:
|
def fetch_image_path(artist: str) -> str or None:
|
||||||
"""
|
"""
|
||||||
Returns a direct link to an artist image.
|
Returns a direct link to an artist image.
|
||||||
@@ -185,6 +61,7 @@ def fetch_image_path(artist: str) -> str or None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@helpers.background
|
||||||
def fetch_artist_images():
|
def fetch_artist_images():
|
||||||
"""Downloads the artists images"""
|
"""Downloads the artists images"""
|
||||||
|
|
||||||
@@ -238,3 +115,8 @@ def fetch_album_bio(title: str, albumartist: str):
|
|||||||
bio = None
|
bio = None
|
||||||
|
|
||||||
return bio
|
return bio
|
||||||
|
|
||||||
|
|
||||||
|
# TODO
|
||||||
|
# - Move the populate function to a new file and probably into a new class
|
||||||
|
# - Start movement from functional programming to OOP to OOP
|
||||||
|
|||||||
@@ -23,12 +23,11 @@ def hello():
|
|||||||
|
|
||||||
@app.route("/thumb/<path>")
|
@app.route("/thumb/<path>")
|
||||||
def send_thumbnail(path: str):
|
def send_thumbnail(path: str):
|
||||||
name = path + ".webp"
|
fpath = join(THUMB_PATH, path)
|
||||||
path = join(THUMB_PATH, name)
|
exists = os.path.exists(fpath)
|
||||||
exists = os.path.exists(path)
|
|
||||||
|
|
||||||
if exists:
|
if exists:
|
||||||
return send_from_directory(THUMB_PATH, name)
|
return send_from_directory(THUMB_PATH, path)
|
||||||
|
|
||||||
return {"msg": "Not found"}, 404
|
return {"msg": "Not found"}, 404
|
||||||
|
|
||||||
@@ -36,12 +35,11 @@ def send_thumbnail(path: str):
|
|||||||
@app.route("/artist/<path>")
|
@app.route("/artist/<path>")
|
||||||
def send_artist_image(path: str):
|
def send_artist_image(path: str):
|
||||||
print(ARTIST_PATH)
|
print(ARTIST_PATH)
|
||||||
name = path + ".webp"
|
fpath = join(ARTIST_PATH, path)
|
||||||
path = join(ARTIST_PATH, name)
|
exists = os.path.exists(fpath)
|
||||||
exists = os.path.exists(path)
|
|
||||||
|
|
||||||
if exists:
|
if exists:
|
||||||
return send_from_directory(ARTIST_PATH, name)
|
return send_from_directory(ARTIST_PATH, path)
|
||||||
|
|
||||||
return {"msg": "Not found"}, 404
|
return {"msg": "Not found"}, 404
|
||||||
|
|
||||||
|
|||||||
@@ -114,17 +114,15 @@ def get_album_image(album: list) -> str:
|
|||||||
Gets the image of an album.
|
Gets the image of an album.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
uri = settings.IMG_THUMB_URI
|
|
||||||
|
|
||||||
for track in album:
|
for track in album:
|
||||||
img_p = gen_random_path()
|
img_p = gen_random_path()
|
||||||
|
|
||||||
exists = taglib.extract_thumb(track["filepath"], webp_path=img_p)
|
exists = taglib.extract_thumb(track["filepath"], webp_path=img_p)
|
||||||
|
|
||||||
if exists:
|
if exists:
|
||||||
return uri + img_p
|
return img_p
|
||||||
|
|
||||||
return uri + use_defaults()
|
return use_defaults()
|
||||||
|
|
||||||
|
|
||||||
def get_album_tracks(album: str, artist: str) -> List:
|
def get_album_tracks(album: str, artist: str) -> List:
|
||||||
|
|||||||
@@ -0,0 +1,171 @@
|
|||||||
|
from dataclasses import asdict
|
||||||
|
from app.helpers import run_fast_scandir
|
||||||
|
from app import settings
|
||||||
|
from app.instances import tracks_instance, album_instance
|
||||||
|
from progress.bar import Bar
|
||||||
|
from app.logger import Log
|
||||||
|
from app.lib.taglib import get_tags
|
||||||
|
from os import path
|
||||||
|
from app.lib.albumslib import find_album, create_album
|
||||||
|
from app import api
|
||||||
|
from app.models import Track
|
||||||
|
from app.lib import folderslib
|
||||||
|
|
||||||
|
|
||||||
|
class Populate:
|
||||||
|
"""
|
||||||
|
Populate the database with all songs in the music directory
|
||||||
|
|
||||||
|
checks if the song is in the database, if not, it adds it
|
||||||
|
also checks if the album art exists in the image path, if not tries to
|
||||||
|
extract it.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.files = []
|
||||||
|
self.db_tracks = []
|
||||||
|
self.tagged_tracks = []
|
||||||
|
self.folders = set()
|
||||||
|
self.pre_albums = []
|
||||||
|
self.albums = []
|
||||||
|
|
||||||
|
self.files = run_fast_scandir(settings.HOME_DIR, [".flac", ".mp3"])[1]
|
||||||
|
self.db_tracks = tracks_instance.get_all_tracks()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.check_untagged()
|
||||||
|
|
||||||
|
if len(self.files) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.tag_files()
|
||||||
|
self.create_pre_albums()
|
||||||
|
self.create_albums()
|
||||||
|
self.create_tracks()
|
||||||
|
self.create_folders()
|
||||||
|
|
||||||
|
def check_untagged(self):
|
||||||
|
"""
|
||||||
|
Loops through all the tracks in db tracks removing each
|
||||||
|
from the list of tagged tracks if it exists.
|
||||||
|
We will now only have untagged tracks left in `files`.
|
||||||
|
"""
|
||||||
|
bar = Bar("Checking untagged", max=len(self.db_tracks))
|
||||||
|
for track in self.db_tracks:
|
||||||
|
if track["filepath"] in self.files:
|
||||||
|
self.files.remove(track["filepath"])
|
||||||
|
bar.next()
|
||||||
|
|
||||||
|
bar.finish()
|
||||||
|
Log(f"Found {len(self.files)} untagged tracks")
|
||||||
|
|
||||||
|
def tag_files(self):
|
||||||
|
"""
|
||||||
|
Loops through all the untagged files and tags them.
|
||||||
|
"""
|
||||||
|
bar = Bar("Tagging files", max=len(self.files))
|
||||||
|
for file in self.files:
|
||||||
|
tags = get_tags(file)
|
||||||
|
folder = path.dirname(file)
|
||||||
|
self.folders.add(folder)
|
||||||
|
|
||||||
|
if tags is not None:
|
||||||
|
self.tagged_tracks.append(tags)
|
||||||
|
api.DB_TRACKS.append(tags)
|
||||||
|
|
||||||
|
bar.next()
|
||||||
|
bar.finish()
|
||||||
|
Log(f"Tagged {len(self.tagged_tracks)} files")
|
||||||
|
|
||||||
|
def create_pre_albums(self):
|
||||||
|
"""
|
||||||
|
Creates pre-albums for the all tagged tracks.
|
||||||
|
"""
|
||||||
|
bar = Bar("Creating pre-albums", max=len(self.tagged_tracks))
|
||||||
|
for track in self.tagged_tracks:
|
||||||
|
album = {"title": track["album"], "artist": track["albumartist"]}
|
||||||
|
|
||||||
|
if album not in self.pre_albums:
|
||||||
|
self.pre_albums.append(album)
|
||||||
|
bar.next()
|
||||||
|
|
||||||
|
bar.finish()
|
||||||
|
Log(f"Created {len(self.pre_albums)} pre-albums")
|
||||||
|
|
||||||
|
def create_albums(self):
|
||||||
|
"""
|
||||||
|
Uses the pre-albums to create new albums and add them to the database.
|
||||||
|
"""
|
||||||
|
exist_count = 0
|
||||||
|
|
||||||
|
bar = Bar("Creating albums", max=len(self.pre_albums))
|
||||||
|
for album in self.pre_albums:
|
||||||
|
index = find_album(album["title"], album["artist"])
|
||||||
|
|
||||||
|
if index is None:
|
||||||
|
try:
|
||||||
|
track = [
|
||||||
|
track
|
||||||
|
for track in self.tagged_tracks
|
||||||
|
if track["album"] == album["title"]
|
||||||
|
and track["albumartist"] == album["artist"]
|
||||||
|
][0]
|
||||||
|
|
||||||
|
album = create_album(track)
|
||||||
|
api.ALBUMS.append(album)
|
||||||
|
self.albums.append(album)
|
||||||
|
|
||||||
|
album_instance.insert_album(asdict(album))
|
||||||
|
|
||||||
|
except IndexError:
|
||||||
|
print("😠\n")
|
||||||
|
print(album)
|
||||||
|
|
||||||
|
else:
|
||||||
|
exist_count += 1
|
||||||
|
|
||||||
|
bar.next()
|
||||||
|
bar.finish()
|
||||||
|
Log(
|
||||||
|
f"{exist_count} of {len(self.pre_albums)} albums were already in the database"
|
||||||
|
)
|
||||||
|
|
||||||
|
def create_tracks(self):
|
||||||
|
"""
|
||||||
|
Loops through all the tagged tracks creating complete track objects using the `models.Track` model.
|
||||||
|
"""
|
||||||
|
bar = Bar("Creating tracks", max=len(self.tagged_tracks))
|
||||||
|
failed_count = 0
|
||||||
|
for track in self.tagged_tracks:
|
||||||
|
try:
|
||||||
|
album_index = find_album(track["album"], track["albumartist"])
|
||||||
|
album = api.ALBUMS[album_index]
|
||||||
|
track["image"] = album.image
|
||||||
|
upsert_id = tracks_instance.insert_song(track)
|
||||||
|
track["_id"] = {"$oid": str(upsert_id)}
|
||||||
|
api.TRACKS.append(Track(track))
|
||||||
|
except:
|
||||||
|
# Bug: some albums are not found although they exist in `api.ALBUMS`. It has something to do with the bisection method used or sorting. Not sure yet.
|
||||||
|
failed_count += 1
|
||||||
|
bar.next()
|
||||||
|
bar.finish()
|
||||||
|
|
||||||
|
Log(
|
||||||
|
f"Added {len(self.tagged_tracks) - failed_count} of {len(self.tagged_tracks)} new tracks and {len(self.albums)} new albums"
|
||||||
|
)
|
||||||
|
|
||||||
|
def create_folders(self):
|
||||||
|
"""
|
||||||
|
Creates the folder objects for all the tracks.
|
||||||
|
"""
|
||||||
|
bar = Bar("Creating folders", max=len(self.folders))
|
||||||
|
old_f_count = len(api.FOLDERS)
|
||||||
|
for folder in self.folders:
|
||||||
|
api.VALID_FOLDERS.add(folder)
|
||||||
|
fff = folderslib.create_folder(folder)
|
||||||
|
api.FOLDERS.append(fff)
|
||||||
|
bar.next()
|
||||||
|
|
||||||
|
bar.finish()
|
||||||
|
|
||||||
|
Log(f"Created {len(self.folders)} new folders")
|
||||||
@@ -26,8 +26,11 @@ def create_all_tracks() -> List[models.Track]:
|
|||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
instances.tracks_instance.remove_song_by_id(track["_id"]["$oid"])
|
instances.tracks_instance.remove_song_by_id(track["_id"]["$oid"])
|
||||||
api.DB_TRACKS.remove(track)
|
api.DB_TRACKS.remove(track)
|
||||||
|
try:
|
||||||
tracks.append(models.Track(track))
|
tracks.append(models.Track(track))
|
||||||
|
except KeyError:
|
||||||
|
print(track)
|
||||||
|
|
||||||
_bar.next()
|
_bar.next()
|
||||||
|
|
||||||
_bar.finish()
|
_bar.finish()
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ class Track:
|
|||||||
discnumber: int
|
discnumber: int
|
||||||
|
|
||||||
def __init__(self, tags):
|
def __init__(self, tags):
|
||||||
|
|
||||||
self.trackid = tags["_id"]["$oid"]
|
self.trackid = tags["_id"]["$oid"]
|
||||||
self.title = tags["title"]
|
self.title = tags["title"]
|
||||||
self.artists = tags["artists"].split(", ")
|
self.artists = tags["artists"].split(", ")
|
||||||
|
|||||||
Reference in New Issue
Block a user