mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-03 20:13:02 +00:00
use scandir to read dir content on file explorer (#64)
This commit is contained in:
@@ -1,13 +1,10 @@
|
||||
"""
|
||||
Contains all the folder routes.
|
||||
"""
|
||||
import datetime
|
||||
import os
|
||||
|
||||
from app import api
|
||||
from app import helpers
|
||||
from app import settings
|
||||
from app.lib import folderslib
|
||||
from app.lib.folderslib import getFnF
|
||||
from flask import Blueprint
|
||||
from flask import request
|
||||
|
||||
@@ -20,21 +17,14 @@ def get_folder_tree():
|
||||
Returns a list of all the folders and tracks in the given folder.
|
||||
"""
|
||||
data = request.get_json()
|
||||
req_dir = data["folder"]
|
||||
req_dir: str = data["folder"]
|
||||
|
||||
if req_dir == "$home":
|
||||
req_dir = settings.HOME_DIR
|
||||
|
||||
folders = folderslib.get_subdirs(req_dir)
|
||||
songs = []
|
||||
|
||||
for track in api.TRACKS:
|
||||
if track.folder == req_dir:
|
||||
songs.append(track)
|
||||
|
||||
final_tracks = helpers.remove_duplicates(songs)
|
||||
tracks, folders = getFnF(req_dir)()
|
||||
|
||||
return {
|
||||
"tracks": final_tracks,
|
||||
"tracks": tracks,
|
||||
"folders": sorted(folders, key=lambda i: i.name),
|
||||
}
|
||||
|
||||
+33
-4
@@ -6,6 +6,7 @@ import random
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
|
||||
@@ -27,15 +28,14 @@ def background(func):
|
||||
return background_func
|
||||
|
||||
|
||||
def run_fast_scandir(__dir: str,
|
||||
ext: list,
|
||||
full=False) -> Dict[List[str], List[str]]:
|
||||
def run_fast_scandir(__dir: str, full=False) -> Dict[List[str], List[str]]:
|
||||
"""
|
||||
Scans a directory for files with a specific extension. Returns a list of files and folders in the directory.
|
||||
"""
|
||||
|
||||
subfolders = []
|
||||
files = []
|
||||
ext = [".flac", ".mp3"]
|
||||
|
||||
for f in os.scandir(__dir):
|
||||
if f.is_dir() and not f.name.startswith("."):
|
||||
@@ -46,7 +46,7 @@ def run_fast_scandir(__dir: str,
|
||||
|
||||
if full or len(files) == 0:
|
||||
for _dir in list(subfolders):
|
||||
sf, f = run_fast_scandir(_dir, ext, full=True)
|
||||
sf, f = run_fast_scandir(_dir, full=True)
|
||||
subfolders.extend(sf)
|
||||
files.extend(f)
|
||||
|
||||
@@ -133,3 +133,32 @@ def create_safe_name(name: str) -> str:
|
||||
Creates a url-safe name from a name.
|
||||
"""
|
||||
return "".join([i for i in name if i not in '/\\:*?"<>|'])
|
||||
|
||||
|
||||
class UseBisection:
|
||||
|
||||
def __init__(self, list: List, search_from: str,
|
||||
queries: List[str]) -> None:
|
||||
self.list = list
|
||||
self.queries = queries
|
||||
self.search_from = search_from
|
||||
self.list.sort(key=lambda x: getattr(x, search_from))
|
||||
|
||||
def find(self, query: str):
|
||||
left = 0
|
||||
right = len(self.list) - 1
|
||||
|
||||
while left <= right:
|
||||
mid = (left + right) // 2
|
||||
|
||||
if self.list[mid].__getattribute__(self.search_from) == query:
|
||||
return self.list[mid]
|
||||
elif self.list[mid].__getattribute__(self.search_from) > query:
|
||||
right = mid - 1
|
||||
else:
|
||||
left = mid + 1
|
||||
|
||||
return None
|
||||
|
||||
def __call__(self) -> Any:
|
||||
return [self.find(query) for query in self.queries]
|
||||
|
||||
@@ -60,10 +60,8 @@ def find_album(albums: List[models.Album], hash: str) -> int | None:
|
||||
|
||||
left = 0
|
||||
right = len(albums) - 1
|
||||
iter = 0
|
||||
|
||||
while left <= right:
|
||||
iter += 1
|
||||
mid = (left + right) // 2
|
||||
|
||||
try:
|
||||
|
||||
@@ -1,13 +1,25 @@
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from os import scandir
|
||||
from pprint import pprint
|
||||
from time import time
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Set
|
||||
|
||||
from tqdm import tqdm
|
||||
from typing import Tuple
|
||||
|
||||
from app import api
|
||||
from app import helpers
|
||||
from app import models
|
||||
from progress.bar import Bar
|
||||
from app.instances import tracks_instance
|
||||
from app.lib import taglib
|
||||
from app.models import Folder
|
||||
from app.models import Track
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
@dataclass
|
||||
class Dir:
|
||||
path: str
|
||||
is_sym: bool
|
||||
|
||||
|
||||
def get_valid_folders() -> None:
|
||||
@@ -19,23 +31,27 @@ def get_folder_track_count(foldername: str) -> int:
|
||||
"""
|
||||
Returns the number of files associated with a folder.
|
||||
"""
|
||||
track_list = [track for track in api.TRACKS if foldername in track.folder]
|
||||
return len(track_list)
|
||||
count = 0
|
||||
for track in api.TRACKS:
|
||||
if foldername in track.folder:
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def create_folder(foldername: str) -> models.Folder:
|
||||
def create_folder(dir: Dir) -> Folder:
|
||||
"""Create a single Folder object"""
|
||||
folder = {
|
||||
"name": foldername.split("/")[-1],
|
||||
"path": foldername,
|
||||
"trackcount": get_folder_track_count(foldername),
|
||||
"name": dir.path.split("/")[-1],
|
||||
"path": dir.path,
|
||||
"is_sym": dir.is_sym,
|
||||
"trackcount": get_folder_track_count(dir.path),
|
||||
}
|
||||
|
||||
return models.Folder(folder)
|
||||
return Folder(folder)
|
||||
|
||||
|
||||
def create_all_folders() -> Set[models.Folder]:
|
||||
folders: List[models.Folder] = []
|
||||
def create_all_folders() -> Set[Folder]:
|
||||
folders: List[Folder] = []
|
||||
|
||||
for foldername in tqdm(api.VALID_FOLDERS, desc="Creating folders"):
|
||||
folder = create_folder(foldername)
|
||||
@@ -44,9 +60,9 @@ def create_all_folders() -> Set[models.Folder]:
|
||||
return folders
|
||||
|
||||
|
||||
def get_subdirs(foldername: str) -> List[models.Folder]:
|
||||
def get_subdirs(foldername: str) -> List[Folder]:
|
||||
"""
|
||||
Finds and Creates models.Folder objects for each sub-directory string in the foldername passed.
|
||||
Finds and Creates Folder objects for each sub-directory string in the foldername passed.
|
||||
"""
|
||||
subdirs = set()
|
||||
|
||||
@@ -73,7 +89,49 @@ def run_scandir():
|
||||
It calls the
|
||||
"""
|
||||
get_valid_folders()
|
||||
folders_ = create_all_folders()
|
||||
# folders_ = create_all_folders()
|
||||
"""Create all the folder objects before clearing api.FOLDERS"""
|
||||
|
||||
api.FOLDERS = folders_
|
||||
# api.FOLDERS = folders_
|
||||
|
||||
|
||||
class getFnF:
|
||||
"""
|
||||
Get files and folders from a directory.
|
||||
"""
|
||||
|
||||
def __init__(self, path: str) -> None:
|
||||
self.path = path
|
||||
|
||||
@classmethod
|
||||
def get_tracks(cls, files: List[str]) -> List[Track]:
|
||||
"""
|
||||
Returns a list of Track objects for each file in the given list.
|
||||
"""
|
||||
return helpers.UseBisection(api.TRACKS, "filepath", files)()
|
||||
|
||||
def __call__(self) -> Tuple[Track, Folder]:
|
||||
try:
|
||||
all = scandir(self.path)
|
||||
except FileNotFoundError:
|
||||
return ([], [])
|
||||
|
||||
dirs, files = [], []
|
||||
|
||||
for entry in all:
|
||||
if entry.is_dir() and not entry.name.startswith("."):
|
||||
dir = {
|
||||
"path": entry.path,
|
||||
"is_sym": entry.is_symlink(),
|
||||
}
|
||||
dirs.append(Dir(**dir))
|
||||
elif entry.is_file() and entry.name.endswith((".mp3", ".flac")):
|
||||
files.append(entry.path)
|
||||
s = time()
|
||||
tracks = self.get_tracks(files)
|
||||
print(f"{time() - s} seconds to get tracks")
|
||||
|
||||
folders = [create_folder(dir) for dir in dirs]
|
||||
folders = filter(lambda f: f.trackcount > 0, folders)
|
||||
|
||||
return (tracks, folders)
|
||||
|
||||
+25
-21
@@ -40,7 +40,7 @@ class Populate:
|
||||
self.pre_albums = []
|
||||
self.albums: List[Album] = []
|
||||
|
||||
self.files = run_fast_scandir(settings.HOME_DIR, [".flac", ".mp3"])[1]
|
||||
self.files = run_fast_scandir(settings.HOME_DIR, full=True)[1]
|
||||
self.db_tracks = tracks_instance.get_all_tracks()
|
||||
self.tag_count = 0
|
||||
self.exist_count = 0
|
||||
@@ -55,15 +55,15 @@ class Populate:
|
||||
self.tagged_tracks.sort(key=lambda x: x["albumhash"])
|
||||
self.tracks = deepcopy(self.tagged_tracks)
|
||||
|
||||
self.create_pre_albums()
|
||||
self.create_albums()
|
||||
self.pre_albums = self.create_pre_albums(self.tagged_tracks)
|
||||
self.create_albums(self.pre_albums)
|
||||
|
||||
self.albums.sort(key=lambda x: x.hash)
|
||||
api.ALBUMS.sort(key=lambda x: x.hash)
|
||||
|
||||
self.save_albums()
|
||||
self.create_tracks()
|
||||
self.create_folders()
|
||||
# self.create_folders()
|
||||
|
||||
def check_untagged(self):
|
||||
"""
|
||||
@@ -120,17 +120,21 @@ class Populate:
|
||||
d = time.time() - s
|
||||
Log(f"Tagged {len(self.tagged_tracks)} files in {d} seconds")
|
||||
|
||||
def create_pre_albums(self):
|
||||
@staticmethod
|
||||
def create_pre_albums(tracks: List[dict]):
|
||||
"""
|
||||
Creates pre-albums for the all tagged tracks.
|
||||
"""
|
||||
for track in tqdm(self.tagged_tracks, desc="Creating pre-albums"):
|
||||
prealbums = []
|
||||
|
||||
for track in tqdm(tracks, desc="Creating pre-albums"):
|
||||
album = {"title": track["album"], "artist": track["albumartist"]}
|
||||
|
||||
if album not in self.pre_albums:
|
||||
self.pre_albums.append(album)
|
||||
if album not in prealbums:
|
||||
prealbums.append(album)
|
||||
|
||||
Log(f"Created {len(self.pre_albums)} pre-albums")
|
||||
Log(f"Created {len(prealbums)} pre-albums")
|
||||
return prealbums
|
||||
|
||||
def create_album(self, album: dict):
|
||||
albumhash = create_album_hash(album["title"], album["artist"])
|
||||
@@ -162,14 +166,14 @@ class Populate:
|
||||
api.ALBUMS.append(album)
|
||||
self.albums.append(album)
|
||||
|
||||
def create_albums(self):
|
||||
def create_albums(self, albums: List[dict]):
|
||||
"""
|
||||
Uses the pre-albums to create new albums and add them to the database.
|
||||
"""
|
||||
for album in tqdm(self.pre_albums, desc="Building albums"):
|
||||
for album in tqdm(albums, desc="Building albums"):
|
||||
self.create_album(album)
|
||||
|
||||
Log(f"{self.exist_count} of {len(self.pre_albums)} albums were already in the database"
|
||||
Log(f"{self.exist_count} of {len(albums)} albums were already in the database"
|
||||
)
|
||||
|
||||
def create_track(self, track: dict):
|
||||
@@ -216,14 +220,14 @@ class Populate:
|
||||
with ThreadPoolExecutor() as executor:
|
||||
executor.map(album_instance.insert_album, self.albums)
|
||||
|
||||
def create_folders(self):
|
||||
"""
|
||||
Creates the folder objects for all the tracks.
|
||||
"""
|
||||
for folder in tqdm(self.folders, desc="Creating folders"):
|
||||
api.VALID_FOLDERS.add(folder)
|
||||
# def create_folders(self):
|
||||
# """
|
||||
# Creates the folder objects for all the tracks.
|
||||
# """
|
||||
# for folder in tqdm(self.folders, desc="Creating folders"):
|
||||
# api.VALID_FOLDERS.add(folder)
|
||||
|
||||
fff = folderslib.create_folder(folder)
|
||||
api.FOLDERS.append(fff)
|
||||
# fff = folderslib.create_folder(folder)
|
||||
# api.FOLDERS.append(fff)
|
||||
|
||||
Log(f"Created {len(self.folders)} new folders")
|
||||
# Log(f"Created {len(self.folders)} new folders")
|
||||
|
||||
@@ -71,12 +71,12 @@ def add_track(filepath: str) -> None:
|
||||
|
||||
api.TRACKS.append(models.Track(tags))
|
||||
|
||||
folder = tags["folder"]
|
||||
# folder = tags["folder"]
|
||||
|
||||
if folder not in api.VALID_FOLDERS:
|
||||
api.VALID_FOLDERS.add(folder)
|
||||
f = folderslib.create_folder(folder)
|
||||
api.FOLDERS.append(f)
|
||||
# if folder not in api.VALID_FOLDERS:
|
||||
# api.VALID_FOLDERS.add(folder)
|
||||
# f = folderslib.create_folder(folder)
|
||||
# api.FOLDERS.append(f)
|
||||
|
||||
|
||||
def remove_track(filepath: str) -> None:
|
||||
|
||||
@@ -46,7 +46,7 @@ class Track:
|
||||
self.length = tags["length"]
|
||||
self.genre = tags["genre"]
|
||||
self.bitrate = tags["bitrate"]
|
||||
|
||||
|
||||
try:
|
||||
self.image = tags["image"]
|
||||
except KeyError:
|
||||
@@ -102,11 +102,9 @@ class Album:
|
||||
|
||||
def get_p_track(ptrack):
|
||||
for track in api.TRACKS:
|
||||
if (
|
||||
track.title == ptrack["title"]
|
||||
and track.artists == ptrack["artists"]
|
||||
and ptrack["album"] == track.album
|
||||
):
|
||||
if (track.title == ptrack["title"]
|
||||
and track.artists == ptrack["artists"]
|
||||
and ptrack["album"] == track.album):
|
||||
return track
|
||||
|
||||
|
||||
@@ -191,9 +189,11 @@ class Folder:
|
||||
name: str
|
||||
path: str
|
||||
trackcount: int
|
||||
is_sym: bool = False
|
||||
"""The number of tracks in the folder"""
|
||||
|
||||
def __init__(self, data) -> None:
|
||||
self.name = data["name"]
|
||||
self.path = data["path"]
|
||||
self.is_sym = data["is_sym"]
|
||||
self.trackcount = data["trackcount"]
|
||||
|
||||
+1
-1
@@ -20,7 +20,7 @@ interface Track {
|
||||
interface Folder {
|
||||
name: string;
|
||||
path: string;
|
||||
trackscount: number;
|
||||
trackcount: number;
|
||||
subdircount: number;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user