mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-04 20:43:04 +00:00
refactor most things to use the database directly
This commit is contained in:
@@ -4,12 +4,8 @@ This library contains all the functions related to albums.
|
||||
import random
|
||||
from typing import List
|
||||
|
||||
from app import api
|
||||
from app import helpers
|
||||
from app import instances
|
||||
from app import models
|
||||
from app.lib import taglib
|
||||
from app.lib import trackslib
|
||||
from app import api, helpers, instances, models
|
||||
from app.lib import taglib, trackslib
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
@@ -30,21 +26,12 @@ def get_all_albums() -> List[models.Album]:
|
||||
return albums
|
||||
|
||||
|
||||
def create_everything() -> List[models.Track]:
|
||||
def validate() -> None:
|
||||
"""
|
||||
Creates album objects for all albums and returns
|
||||
a list of track objects
|
||||
"""
|
||||
albums: list[models.Album] = get_all_albums()
|
||||
|
||||
api.ALBUMS = albums
|
||||
api.ALBUMS.sort(key=lambda x: x.hash)
|
||||
|
||||
tracks = trackslib.create_all_tracks()
|
||||
|
||||
api.TRACKS.clear()
|
||||
api.TRACKS.extend(tracks)
|
||||
api.TRACKS.sort(key=lambda x: x.title)
|
||||
|
||||
|
||||
def find_album(albums: List[models.Album], hash: str) -> int | None:
|
||||
@@ -142,8 +129,6 @@ class GetAlbumTracks:
|
||||
self.tracks.remove(track)
|
||||
index = trackslib.find_track(self.tracks, self.hash)
|
||||
|
||||
# self.tracks.extend(tracks)
|
||||
# self.tracks.sort(key=lambda x: x["albumhash"])
|
||||
return tracks
|
||||
|
||||
|
||||
|
||||
@@ -1,15 +1,11 @@
|
||||
from dataclasses import dataclass
|
||||
from os import scandir
|
||||
from time import time
|
||||
from typing import List
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
|
||||
from app import api
|
||||
from app import helpers
|
||||
from app.models import Folder
|
||||
from app.models import Track
|
||||
from tqdm import tqdm
|
||||
|
||||
from app import instances
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -18,20 +14,12 @@ class Dir:
|
||||
is_sym: bool
|
||||
|
||||
|
||||
def get_valid_folders() -> None:
|
||||
for track in api.TRACKS:
|
||||
api.VALID_FOLDERS.add(track.folder)
|
||||
|
||||
|
||||
def get_folder_track_count(foldername: str) -> int:
|
||||
"""
|
||||
Returns the number of files associated with a folder.
|
||||
"""
|
||||
count = 0
|
||||
for track in api.TRACKS:
|
||||
if foldername in track.folder:
|
||||
count += 1
|
||||
return count
|
||||
tracks = instances.tracks_instance.find_tracks_inside_path_regex(foldername)
|
||||
return len(tracks)
|
||||
|
||||
|
||||
def create_folder(dir: Dir) -> Folder:
|
||||
@@ -46,51 +34,6 @@ def create_folder(dir: Dir) -> Folder:
|
||||
return Folder(folder)
|
||||
|
||||
|
||||
def create_all_folders() -> Set[Folder]:
|
||||
folders: List[Folder] = []
|
||||
|
||||
for foldername in tqdm(api.VALID_FOLDERS, desc="Creating folders"):
|
||||
folder = create_folder(foldername)
|
||||
folders.append(folder)
|
||||
|
||||
return folders
|
||||
|
||||
|
||||
def get_subdirs(foldername: str) -> List[Folder]:
|
||||
"""
|
||||
Finds and Creates Folder objects for each sub-directory string in the foldername passed.
|
||||
"""
|
||||
subdirs = set()
|
||||
|
||||
for folder in api.VALID_FOLDERS:
|
||||
if foldername in folder:
|
||||
str0 = folder.replace(foldername, "")
|
||||
|
||||
try:
|
||||
str1 = str0.split("/")[1]
|
||||
except IndexError:
|
||||
str1 = None
|
||||
|
||||
if str1 is not None:
|
||||
subdirs.add(foldername + "/" + str1)
|
||||
return [create_folder(dir) for dir in subdirs]
|
||||
|
||||
|
||||
@helpers.background
|
||||
def run_scandir():
|
||||
"""
|
||||
Initiates the creation of all folder objects for each folder with a track in it.
|
||||
|
||||
Runs in a background thread after every 5 minutes.
|
||||
It calls the
|
||||
"""
|
||||
get_valid_folders()
|
||||
# folders_ = create_all_folders()
|
||||
"""Create all the folder objects before clearing api.FOLDERS"""
|
||||
|
||||
# api.FOLDERS = folders_
|
||||
|
||||
|
||||
class getFnF:
|
||||
"""
|
||||
Get files and folders from a directory.
|
||||
@@ -99,15 +42,6 @@ class getFnF:
|
||||
def __init__(self, path: str) -> None:
|
||||
self.path = path
|
||||
|
||||
@classmethod
|
||||
def get_tracks(cls, files: List[str]) -> List[Track]:
|
||||
"""
|
||||
Returns a list of Track objects for each file in the given list.
|
||||
"""
|
||||
tracks = helpers.UseBisection(api.TRACKS, "filepath", files)()
|
||||
tracks = filter(lambda t: t is not None, tracks)
|
||||
return list(tracks)
|
||||
|
||||
def __call__(self) -> Tuple[Track, Folder]:
|
||||
try:
|
||||
all = scandir(self.path)
|
||||
@@ -125,9 +59,11 @@ class getFnF:
|
||||
dirs.append(Dir(**dir))
|
||||
elif entry.is_file() and entry.name.endswith((".mp3", ".flac")):
|
||||
files.append(entry.path)
|
||||
tracks = self.get_tracks(files)
|
||||
tracks = instances.tracks_instance.find_songs_by_folder(self.path)
|
||||
tracks = [Track(track) for track in tracks]
|
||||
|
||||
folders = [create_folder(dir) for dir in dirs]
|
||||
|
||||
folders = filter(lambda f: f.trackcount > 0, folders)
|
||||
|
||||
return tracks, folders
|
||||
|
||||
@@ -5,6 +5,7 @@ import os
|
||||
import random
|
||||
import string
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
@@ -13,55 +14,37 @@ from app import exceptions
|
||||
from app import instances
|
||||
from app import models
|
||||
from app import settings
|
||||
from app.lib import trackslib
|
||||
from PIL import Image
|
||||
from PIL import ImageSequence
|
||||
from progress.bar import Bar
|
||||
from werkzeug import datastructures
|
||||
|
||||
from app.lib import trackslib
|
||||
|
||||
TrackExistsInPlaylist = exceptions.TrackExistsInPlaylist
|
||||
|
||||
|
||||
def add_track(playlistid: str, trackid: str):
|
||||
"""
|
||||
Adds a track to a playlist in the api.PLAYLISTS dict and to the database.
|
||||
Adds a track to a playlist to the database.
|
||||
"""
|
||||
for playlist in api.PLAYLISTS:
|
||||
if playlist.playlistid == playlistid:
|
||||
tt = trackslib.get_track_by_id(trackid)
|
||||
tt = instances.tracks_instance.get_track_by_id(trackid)
|
||||
|
||||
track = {
|
||||
"title": tt.title,
|
||||
"artists": tt.artists,
|
||||
"album": tt.album,
|
||||
}
|
||||
if tt is None:
|
||||
return
|
||||
|
||||
try:
|
||||
playlist.add_track(track)
|
||||
instances.playlist_instance.add_track_to_playlist(
|
||||
playlistid, track)
|
||||
return
|
||||
except TrackExistsInPlaylist as error:
|
||||
raise error
|
||||
track = models.Track(tt)
|
||||
|
||||
playlist = instances.playlist_instance.get_playlist_by_id(playlistid)
|
||||
|
||||
def get_playlist_tracks(pid: str):
|
||||
for p in api.PLAYLISTS:
|
||||
if p.playlistid == pid:
|
||||
return p.tracks
|
||||
track = {
|
||||
"title": track.title,
|
||||
"artists": tt["artists"],
|
||||
"album": track.album,
|
||||
}
|
||||
if track in playlist["pre_tracks"]:
|
||||
raise TrackExistsInPlaylist
|
||||
|
||||
|
||||
def create_all_playlists():
|
||||
"""
|
||||
Gets all playlists from the database.
|
||||
"""
|
||||
playlists = instances.playlist_instance.get_all_playlists()
|
||||
|
||||
|
||||
for playlist in tqdm(playlists, desc="Creating playlists"):
|
||||
api.PLAYLISTS.append(models.Playlist(playlist))
|
||||
|
||||
validate_images()
|
||||
instances.playlist_instance.add_track_to_playlist(playlistid, track)
|
||||
|
||||
|
||||
def create_thumbnail(image: any, img_path: str) -> str:
|
||||
@@ -69,8 +52,7 @@ def create_thumbnail(image: any, img_path: str) -> str:
|
||||
Creates a 250 x 250 thumbnail from a playlist image
|
||||
"""
|
||||
thumb_path = "thumb_" + img_path
|
||||
full_thumb_path = os.path.join(settings.APP_DIR, "images", "playlists",
|
||||
thumb_path)
|
||||
full_thumb_path = os.path.join(settings.APP_DIR, "images", "playlists", thumb_path)
|
||||
|
||||
aspect_ratio = image.width / image.height
|
||||
|
||||
@@ -88,13 +70,11 @@ def save_p_image(file: datastructures.FileStorage, pid: str):
|
||||
"""
|
||||
img = Image.open(file)
|
||||
|
||||
random_str = "".join(
|
||||
random.choices(string.ascii_letters + string.digits, k=5))
|
||||
random_str = "".join(random.choices(string.ascii_letters + string.digits, k=5))
|
||||
|
||||
img_path = pid + str(random_str) + ".webp"
|
||||
|
||||
full_img_path = os.path.join(settings.APP_DIR, "images", "playlists",
|
||||
img_path)
|
||||
full_img_path = os.path.join(settings.APP_DIR, "images", "playlists", img_path)
|
||||
|
||||
if file.content_type == "image/gif":
|
||||
frames = []
|
||||
@@ -118,8 +98,10 @@ def validate_images():
|
||||
Removes all unused images in the images/playlists folder.
|
||||
"""
|
||||
images = []
|
||||
p = instances.playlist_instance.get_all_playlists()
|
||||
playlists = [models.Playlist(p) for p in p]
|
||||
|
||||
for playlist in api.PLAYLISTS:
|
||||
for playlist in playlists:
|
||||
if playlist.image:
|
||||
img_path = playlist.image.split("/")[-1]
|
||||
thumb_path = playlist.thumb.split("/")[-1]
|
||||
@@ -136,3 +118,18 @@ def validate_images():
|
||||
|
||||
def create_new_date():
|
||||
return datetime.now()
|
||||
|
||||
|
||||
def create_playlist_tracks(playlist_tracks: List) -> List[models.Track]:
|
||||
"""
|
||||
Creates a list of model.Track objects from a list of playlist track dicts.
|
||||
"""
|
||||
tracks: List[models.Track] = []
|
||||
|
||||
for t in playlist_tracks:
|
||||
track = trackslib.get_p_track(t)
|
||||
|
||||
if track is not None:
|
||||
tracks.append(models.Track(track))
|
||||
|
||||
return tracks
|
||||
|
||||
+16
-43
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
from pprint import pprint
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from copy import deepcopy
|
||||
@@ -12,7 +13,6 @@ from app.helpers import create_album_hash
|
||||
from app.helpers import run_fast_scandir
|
||||
from app.instances import album_instance
|
||||
from app.instances import tracks_instance
|
||||
from app.lib import folderslib
|
||||
from app.lib.albumslib import create_album
|
||||
from app.lib.albumslib import find_album
|
||||
from app.lib.taglib import get_tags
|
||||
@@ -44,6 +44,7 @@ class Populate:
|
||||
self.db_tracks = tracks_instance.get_all_tracks()
|
||||
self.tag_count = 0
|
||||
self.exist_count = 0
|
||||
self.tracks = []
|
||||
|
||||
def run(self):
|
||||
self.check_untagged()
|
||||
@@ -53,17 +54,14 @@ class Populate:
|
||||
return
|
||||
|
||||
self.tagged_tracks.sort(key=lambda x: x["albumhash"])
|
||||
self.tracks = deepcopy(self.tagged_tracks)
|
||||
|
||||
self.pre_albums = self.create_pre_albums(self.tagged_tracks)
|
||||
self.create_albums(self.pre_albums)
|
||||
|
||||
self.albums.sort(key=lambda x: x.hash)
|
||||
api.ALBUMS.sort(key=lambda x: x.hash)
|
||||
|
||||
self.save_albums()
|
||||
self.create_tracks()
|
||||
# self.create_folders()
|
||||
|
||||
self.save_all()
|
||||
|
||||
def check_untagged(self):
|
||||
"""
|
||||
@@ -84,7 +82,6 @@ class Populate:
|
||||
|
||||
t["albumhash"] = create_album_hash(t["album"], t["albumartist"])
|
||||
self.tagged_tracks.append(t)
|
||||
api.DB_TRACKS.append(t)
|
||||
|
||||
self.folders.add(t["folder"])
|
||||
|
||||
@@ -95,8 +92,7 @@ class Populate:
|
||||
folder = tags["folder"]
|
||||
self.folders.add(folder)
|
||||
|
||||
tags["albumhash"] = create_album_hash(tags["album"],
|
||||
tags["albumartist"])
|
||||
tags["albumhash"] = create_album_hash(tags["album"], tags["albumartist"])
|
||||
self.tagged_tracks.append(tags)
|
||||
api.DB_TRACKS.append(tags)
|
||||
|
||||
@@ -110,13 +106,6 @@ class Populate:
|
||||
with ThreadPoolExecutor() as executor:
|
||||
executor.map(self.get_tags, self.files)
|
||||
|
||||
# with Pool(maxtasksperchild=10, processes=10) as p:
|
||||
# tags = p.map(get_tags, tqdm(self.files))
|
||||
# self.process_tags(tags)
|
||||
|
||||
# for t in tqdm(self.files):
|
||||
# self.get_tags(t)
|
||||
|
||||
d = time.time() - s
|
||||
Log(f"Tagged {len(self.tagged_tracks)} files in {d} seconds")
|
||||
|
||||
@@ -147,7 +136,6 @@ class Populate:
|
||||
self.exist_count += 1
|
||||
return
|
||||
|
||||
self.albums.sort(key=lambda x: x.hash)
|
||||
index = find_track(self.tagged_tracks, albumhash)
|
||||
|
||||
if index is None:
|
||||
@@ -163,7 +151,6 @@ class Populate:
|
||||
|
||||
album = Album(album)
|
||||
|
||||
api.ALBUMS.append(album)
|
||||
self.albums.append(album)
|
||||
|
||||
def create_albums(self, albums: List[dict]):
|
||||
@@ -173,8 +160,7 @@ class Populate:
|
||||
for album in tqdm(albums, desc="Building albums"):
|
||||
self.create_album(album)
|
||||
|
||||
Log(f"{self.exist_count} of {len(albums)} albums were already in the database"
|
||||
)
|
||||
Log(f"{self.exist_count} of {len(albums)} albums were already in the database")
|
||||
|
||||
def create_track(self, track: dict):
|
||||
"""
|
||||
@@ -196,38 +182,25 @@ class Populate:
|
||||
pass
|
||||
|
||||
track["image"] = album.image
|
||||
|
||||
upsert_id = tracks_instance.insert_song(track)
|
||||
track["_id"] = {"$oid": str(upsert_id)}
|
||||
|
||||
api.TRACKS.append(Track(track))
|
||||
return track
|
||||
|
||||
def create_tracks(self):
|
||||
"""
|
||||
Loops through all the tagged tracks creating complete track objects using the `models.Track` model.
|
||||
"""
|
||||
with ThreadPoolExecutor() as executor:
|
||||
executor.map(self.create_track, self.tagged_tracks)
|
||||
iterable = executor.map(self.create_track, self.tagged_tracks)
|
||||
|
||||
Log(f"Added {len(self.tagged_tracks)} new tracks and {len(self.albums)} new albums"
|
||||
)
|
||||
self.tracks = [t for t in iterable if t is not None]
|
||||
|
||||
def save_albums(self):
|
||||
Log(
|
||||
f"Added {len(self.tagged_tracks)} new tracks and {len(self.albums)} new albums"
|
||||
)
|
||||
|
||||
def save_all(self):
|
||||
"""
|
||||
Saves the albums to the database.
|
||||
"""
|
||||
|
||||
with ThreadPoolExecutor() as executor:
|
||||
executor.map(album_instance.insert_album, self.albums)
|
||||
|
||||
# def create_folders(self):
|
||||
# """
|
||||
# Creates the folder objects for all the tracks.
|
||||
# """
|
||||
# for folder in tqdm(self.folders, desc="Creating folders"):
|
||||
# api.VALID_FOLDERS.add(folder)
|
||||
|
||||
# fff = folderslib.create_folder(folder)
|
||||
# api.FOLDERS.append(fff)
|
||||
|
||||
# Log(f"Created {len(self.folders)} new folders")
|
||||
album_instance.insert_many([a.__dict__ for a in self.albums])
|
||||
tracks_instance.insert_many(self.tracks)
|
||||
|
||||
+11
-12
@@ -2,31 +2,25 @@
|
||||
This library contains all the functions related to tracks.
|
||||
"""
|
||||
import os
|
||||
from pprint import pprint
|
||||
from typing import List
|
||||
|
||||
from app import api
|
||||
from app import instances
|
||||
from app import models
|
||||
from app import api, instances, models
|
||||
from app.helpers import remove_duplicates
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
def create_all_tracks() -> List[models.Track]:
|
||||
def validate_tracks() -> None:
|
||||
"""
|
||||
Gets all songs under the ~/ directory.
|
||||
"""
|
||||
tracks: list[models.Track] = []
|
||||
entries = instances.tracks_instance.get_all_tracks()
|
||||
|
||||
for track in tqdm(api.DB_TRACKS, desc="Creating tracks"):
|
||||
for track in tqdm(entries, desc="Validating tracks"):
|
||||
try:
|
||||
os.chmod(track["filepath"], 0o755)
|
||||
except FileNotFoundError:
|
||||
instances.tracks_instance.remove_song_by_id(track["_id"]["$oid"])
|
||||
api.DB_TRACKS.remove(track)
|
||||
|
||||
tracks.append(models.Track(track))
|
||||
|
||||
return tracks
|
||||
|
||||
|
||||
def get_album_tracks(albumname, artist):
|
||||
@@ -48,7 +42,6 @@ def get_track_by_id(trackid: str) -> models.Track:
|
||||
return track
|
||||
except AttributeError:
|
||||
print("AttributeError")
|
||||
print(track)
|
||||
|
||||
|
||||
def find_track(tracks: list, hash: str) -> int or None:
|
||||
@@ -73,3 +66,9 @@ def find_track(tracks: list, hash: str) -> int or None:
|
||||
right = mid - 1
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_p_track(ptrack):
|
||||
return instances.tracks_instance.find_track_by_title_artists_album(
|
||||
ptrack["title"], ptrack["artists"], ptrack["album"]
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user