misc refactors and docstrings addition

This commit is contained in:
geoffrey45
2022-03-24 00:25:00 +03:00
parent bab2228a28
commit 67c3be7d40
25 changed files with 198 additions and 186 deletions
+3
View File
@@ -9,6 +9,9 @@ cache = Cache(config=config)
def create_app():
"""
Creates the Flask instance, registers modules and registers all the API blueprints.
"""
app = Flask(__name__)
CORS(app)
+14 -3
View File
@@ -1,15 +1,24 @@
from typing import List
"""
This module contains all the Flask Blueprints and API routes. It also contains all the globals list
that are used through-out the app. It handles the initialization of the watchdog,
checking and creating config dirs and starting the re-indexing process using a background thread.
"""
from typing import List, Set
from app import models, instances
from app import functions, helpers, prep
from app.lib import albumslib
from app.lib import folderslib
DB_TRACKS = instances.songs_instance.get_all_songs()
VALID_FOLDERS: Set[str] = set()
ALBUMS: List[models.Album] = []
TRACKS: List[models.Track] = []
PLAYLISTS: List[models.Playlist] = []
FOLDERS: List[models.Folder] = []
@helpers.background
def initialize() -> None:
@@ -17,9 +26,11 @@ def initialize() -> None:
Runs all the necessary setup functions.
"""
functions.start_watchdog()
albumslib.create_everything()
prep.create_config_dir()
albumslib.create_everything()
folderslib.run_scandir()
functions.reindex_tracks()
initialize()
+4
View File
@@ -1,3 +1,7 @@
"""
Contains all the album routes.
"""
from flask import Blueprint, request
from app import api
from app import helpers, cache
+5
View File
@@ -1,3 +1,8 @@
"""
Contains all the artist(s) routes.
"""
from flask import Blueprint
import urllib
+21 -35
View File
@@ -1,55 +1,41 @@
"""
Contains all the folder routes.
"""
import datetime
import os
from flask import Blueprint
from flask import Blueprint, request
from app import api
from app import settings
from app.lib import folderslib
folder_bp = Blueprint("folder", __name__, url_prefix="/")
from app import helpers
import time
@folder_bp.route("/f/<folder>")
def get_folder_tree(folder: str):
@folder_bp.route("/folder", methods=["POST"])
def get_folder_tree():
"""
Returns a list of all the folders and tracks in the given folder.
"""
req_dir = folder.replace("|", "/")
data = request.get_json()
req_dir = data["folder"]
if folder == "home":
if req_dir == "$home":
req_dir = settings.HOME_DIR
dir_content = os.scandir(os.path.join(settings.HOME_DIR, req_dir))
folders = []
files = []
for entry in dir_content:
if entry.is_dir() and not entry.name.startswith("."):
files_in_dir = helpers.run_fast_scandir(entry.path, [".flac", ".mp3"])[1]
if len(files_in_dir) != 0:
_dir = {
"name": entry.name,
"count": len(files_in_dir),
"path": entry.path.replace(settings.HOME_DIR, ""),
}
folders.append(_dir)
if entry.is_file():
if entry.name.endswith(".flac") or entry.name.endswith(".mp3"):
files.append(entry)
files.sort(key=lambda x: os.path.getmtime(x.path))
folders = folderslib.get_subdirs(req_dir)
songs = []
for entry in files:
for track in api.TRACKS:
if track.filepath == entry.path:
songs.append(track)
for track in api.TRACKS:
if track.folder + "/" == req_dir:
songs.append(track)
final_tracks = helpers.remove_duplicates(songs)
return {
"files": helpers.remove_duplicates(songs),
"folders": sorted(folders, key=lambda i: i["name"]),
"tracks": final_tracks,
"folders": sorted(folders, key=lambda i: i.name),
}
+4
View File
@@ -1,3 +1,7 @@
"""
Contains all the playlist routes.
"""
from flask import Blueprint, request
from app import instances, api
from app.lib import playlistlib
+4
View File
@@ -1,3 +1,7 @@
"""
Contains all the search routes.
"""
from flask import Blueprint, request
from app.lib import searchlib
+4
View File
@@ -1,3 +1,7 @@
"""
Contains all the track routes.
"""
from flask import Blueprint, send_file
from app import instances
+5
View File
@@ -1,3 +1,8 @@
"""
This module creates and initiliazes a MongoDB instance. It also contains the
`convert_one()` and `conver_many()` methods for converting MongoDB cursors to Python dicts.
"""
import pymongo
import json
from bson import json_util
+5
View File
@@ -1,3 +1,8 @@
"""
This file contains the Album class for interacting with
album documents in MongoDB.
"""
from app import db
from bson import ObjectId
+4
View File
@@ -1,3 +1,7 @@
"""
This file contains the Artists class for interacting with artist documents in MongoDB.
"""
from app import db
from bson import ObjectId
+4
View File
@@ -1,3 +1,7 @@
"""
This file contains the Playlists class for interacting with the playlist documents in MongoDB.
"""
from app import db, models
from bson import ObjectId
+4
View File
@@ -1,3 +1,7 @@
"""
This file contains the TrackColors class for interacting with Track colors documents in MongoDB.
"""
from app import db
+4
View File
@@ -1,3 +1,7 @@
"""
This file contains the AllSongs class for interacting with track documents in MongoDB.
"""
from app import db
from bson import ObjectId
+22 -10
View File
@@ -8,6 +8,7 @@ from io import BytesIO
import random
import datetime
from typing import List
from flask import request
import mutagen
import urllib
@@ -17,14 +18,17 @@ from mutagen.id3 import ID3
from mutagen.flac import FLAC
from progress.bar import Bar
from PIL import Image
# from pprint import pprint
from app import helpers
from app import instances
from app import settings, watchdoge, models
from app import settings, models
from app.lib import albumslib
from app import api
from app.lib import watchdoge
@helpers.background
def reindex_tracks():
"""
Checks for new songs every 5 minutes.
@@ -33,9 +37,7 @@ def reindex_tracks():
while flag is False:
populate()
get_all_albums()
populate_images()
# functions.save_t_colors()
time.sleep(300)
@@ -57,19 +59,22 @@ def populate():
extract it.
"""
start = time.time()
print("\nchecking for new tracks")
files = helpers.run_fast_scandir(settings.HOME_DIR, [".flac", ".mp3"])[1]
s, files = helpers.run_fast_scandir(settings.HOME_DIR, [".flac", ".mp3"], full=True)
# pprint(s)
_bar = Bar("Processing files", max=len(files))
for file in files:
tags = get_tags(file)
if tags is not None:
instances.songs_instance.insert_song(tags)
_bar.next()
_bar.finish()
albumslib.create_everything()
print("\n check done")
end = time.time()
print(
@@ -120,8 +125,11 @@ def populate_images():
img_path = fetch_image_path(artist)
if img_path is not None:
img = Image.open(BytesIO(requests.get(img_path).content))
img.save(file_path, format="webp")
try:
img = Image.open(BytesIO(requests.get(img_path).content))
img.save(file_path, format="webp")
except requests.exceptions.ConnectionError:
time.sleep(5)
_bar.next()
@@ -332,7 +340,7 @@ def get_tags(fullpath: str) -> dict:
"length": round(audio.info.length),
"bitrate": round(int(audio.info.bitrate) / 1000),
"filepath": fullpath,
"folder": os.path.dirname(fullpath).replace(settings.HOME_DIR, ""),
"folder": os.path.dirname(fullpath),
}
return tags
@@ -371,10 +379,14 @@ def get_all_albums() -> List[models.Album]:
albums: List[models.Album] = []
_bar = Bar("Creating albums", max=len(api.DB_TRACKS))
for track in api.DB_TRACKS:
xx = albumslib.create_album(track)
if xx not in albums:
albums.append(xx)
return albums
_bar.next()
_bar.finish()
return albums
+31 -8
View File
@@ -2,10 +2,12 @@
This module contains mimi functions for the server.
"""
import datetime
import os
import random
import threading
from typing import List
import colorgram
import colorgram, time
from app import models, settings
@@ -24,7 +26,7 @@ def background(func):
return background_func
def run_fast_scandir(_dir: str, ext: list):
def run_fast_scandir(__dir: str, ext: list, full=False):
"""
Scans a directory for files with a specific extension. Returns a list of files and folders in the directory.
"""
@@ -32,17 +34,18 @@ def run_fast_scandir(_dir: str, ext: list):
subfolders = []
files = []
for f in os.scandir(_dir):
for f in os.scandir(__dir):
if f.is_dir() and not f.name.startswith("."):
subfolders.append(f.path)
if f.is_file():
if os.path.splitext(f.name)[1].lower() in ext:
files.append(f.path)
for _dir in list(subfolders):
sf, f = run_fast_scandir(_dir, ext)
subfolders.extend(sf)
files.extend(f)
if full or len(files) == 0:
for _dir in list(subfolders):
sf, f = run_fast_scandir(_dir, ext, full=True)
subfolders.extend(sf)
files.extend(f)
return subfolders, files
@@ -106,6 +109,14 @@ def extract_image_colors(image) -> list:
return formatted_colors
def use_memoji():
"""
Returns a path to a random memoji image.
"""
path = str(random.randint(0, 20)) + ".svg"
return settings.IMG_ARTIST_URI + "defaults/" + path
def check_artist_image(image: str) -> str:
"""
Checks if the artist image is valid.
@@ -113,6 +124,18 @@ def check_artist_image(image: str) -> str:
img_name = image.replace("/", "::") + ".webp"
if not os.path.exists(os.path.join(app_dir, "images", "artists", img_name)):
return settings.DEFAULT_ARTIST_IMG
return use_memoji()
else:
return (settings.IMG_ARTIST_URI + img_name,)
class Timer:
begin: int = 0
end: int = 0
def start(self):
self.begin = time.time()
def stop(self):
self.end = time.time()
print(str(datetime.timedelta(seconds=round(self.end - self.begin))))
+4
View File
@@ -1,3 +1,7 @@
"""
All the MongoDB instances are created here.
"""
from app.db import artists, albums, trackcolors, tracks, playlists
songs_instance = tracks.AllSongs()
+3
View File
@@ -0,0 +1,3 @@
"""
This module contains all the data processing and non-API libraries
"""
+4 -2
View File
@@ -1,3 +1,7 @@
"""
This library contains all the functions related to albums.
"""
import urllib
from typing import List
from app import models, functions, helpers
@@ -5,7 +9,6 @@ from app.lib import trackslib
from app import api
@helpers.background
def create_everything() -> List[models.Track]:
"""
Creates album objects for all albums and returns
@@ -18,7 +21,6 @@ def create_everything() -> List[models.Track]:
trackslib.create_all_tracks()
def get_album_duration(album: list) -> int:
"""
Gets the duration of an album.
+4 -2
View File
@@ -1,3 +1,7 @@
"""
This library contains all the functions related to playlists.
"""
from app import api, instances, models
from app.lib import trackslib
@@ -14,8 +18,6 @@ def add_track(playlistid: str, trackid: str):
instances.playlist_instance.add_track_to_playlist(playlistid, track)
def create_all_playlists():
"""
Gets all playlists from the database.
+4
View File
@@ -1,3 +1,7 @@
"""
This library contains all the functions related to the search functionality.
"""
from typing import List
from app import models, helpers
from app.lib import albumslib
+14 -3
View File
@@ -1,17 +1,25 @@
"""
This library contains all the functions related to tracks.
"""
import os
from typing import List
from app import models, instances
from app.lib import albumslib
from app.helpers import remove_duplicates
from app import api
from app import api, helpers
from progress.bar import Bar
def create_all_tracks() -> List[models.Track]:
"""
Gets all songs under the ~/ directory.
"""
print("Getting all songs...")
tracks: list[models.Track] = []
timer = helpers.Timer()
_bar = Bar("Creating tracks", max=len(api.DB_TRACKS))
for track in api.DB_TRACKS:
try:
os.chmod(track["filepath"], 0o755)
@@ -23,9 +31,12 @@ def create_all_tracks() -> List[models.Track]:
track["image"] = album.image
tracks.append(models.Track(track))
_bar.next()
api.TRACKS.clear()
api.TRACKS.extend(tracks)
_bar.finish()
print(f"Created all songs in {timer.stop()}")
def get_album_tracks(albumname, artist):
@@ -43,4 +54,4 @@ def get_track_by_id(trackid: str) -> models.Track:
"""Returns api track matching an id"""
for track in api.TRACKS:
if track.id == trackid:
return track
return track
+21 -3
View File
@@ -1,3 +1,7 @@
"""
Contains all the models for objects generation and typing.
"""
from dataclasses import dataclass
from typing import List
from app import api
@@ -60,9 +64,7 @@ class Album:
self.count = tags["count"]
self.duration = tags["duration"]
self.date = tags["date"]
self.artistimage = (
settings.IMG_ARTIST_URI + tags["artistimage"]
)
self.artistimage = settings.IMG_ARTIST_URI + tags["artistimage"]
self.image = settings.IMG_THUMB_URI + tags["image"]
@@ -86,11 +88,14 @@ def create_playlist_tracks(playlist_tracks: List) -> List[Track]:
@dataclass
class Playlist:
"""Creates playlist objects"""
playlistid: str
name: str
description: str
image: str
tracks: List[Track]
"""A list of track objects in the playlist"""
def __init__(self, data):
self.playlistid = data["_id"]["$oid"]
@@ -98,3 +103,16 @@ class Playlist:
self.description = data["description"]
self.image = ""
self.tracks = create_playlist_tracks(data["tracks"])
@dataclass
class Folder:
name: str
path: str
trackcount: int
"""The number of tracks in the folder"""
def __init__(self, data) -> None:
self.name = data["name"]
self.path = data["path"]
self.trackcount = data["trackcount"]
+6 -2
View File
@@ -1,7 +1,11 @@
"""
Contains default configs
"""
import os
# paths
CONFIG_FOLDER = "alice"
CONFIG_FOLDER = ".alice"
HOME_DIR = os.path.expanduser("~") + "/"
APP_DIR = os.path.join(HOME_DIR, CONFIG_FOLDER)
THUMBS_PATH = os.path.join(APP_DIR, "images", "thumbnails")
@@ -14,4 +18,4 @@ IMG_THUMB_URI = IMG_BASE_URI + "thumbnails/"
# defaults
DEFAULT_ARTIST_IMG = IMG_ARTIST_URI + "0.webp"
LAST_FM_API_KEY = "762db7a44a9e6fb5585661f5f2bdf23a"
LAST_FM_API_KEY = "762db7a44a9e6fb5585661f5f2bdf23a"
-118
View File
@@ -1,118 +0,0 @@
from pprint import pprint
import time
import os
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from app import instances, functions
from app import db
from app.lib import albumslib
from app import api
class OnMyWatch:
directory = os.path.expanduser("~")
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = Handler()
self.observer.schedule(event_handler, self.directory, recursive=True)
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
print("Observer Stopped")
self.observer.join()
def add_track(filepath: str) -> None:
"""
Processes the audio tags for a given file ands add them to the music dict.
"""
tags = functions.get_tags(filepath)
if tags is not None:
instances.songs_instance.insert_song(tags)
track = instances.songs_instance.get_song_by_path(tags["filepath"])
api.DB_TRACKS.append(track)
album = albumslib.create_album(track)
api.ALBUMS.append(album)
track["image"] = album.image
api.TRACKS.append(db.Track(track))
def remove_track(filepath: str) -> None:
"""
Removes a track from the music dict.
"""
trackid = instances.songs_instance.get_song_by_path(filepath)["_id"]["$oid"]
instances.songs_instance.remove_song_by_id(trackid)
for track in api.TRACKS:
if track.trackid == trackid:
api.TRACKS.remove(track)
class Handler(PatternMatchingEventHandler):
files_to_process = []
def __init__(self):
print("💠 started watchdog 💠")
PatternMatchingEventHandler.__init__(
self,
patterns=["*.flac", "*.mp3"],
ignore_directories=True,
case_sensitive=False,
)
def on_created(self, event):
"""
Fired when a supported file is created.
"""
print("🔵 created +++")
self.files_to_process.append(event.src_path)
def on_deleted(self, event):
"""
Fired when a delete event occurs on a supported file.
"""
print("🔴 deleted ---")
remove_track(event.src_path)
def on_moved(self, event):
"""
Fired when a move event occurs on a supported file.
"""
print("🔘 moved -->")
tr = "share/Trash"
if tr in event.dest_path:
print("trash ++")
remove_track(event.src_path)
elif tr in event.src_path:
add_track(event.dest_path)
elif tr not in event.dest_path and tr not in event.src_path:
add_track(event.dest_path)
remove_track(event.src_path)
def on_closed(self, event):
"""
Fired when a created file is closed.
"""
print("⚫ closed ~~~")
self.files_to_process.remove(event.src_path)
add_track(event.src_path)
watch = OnMyWatch()