add function to validate albums

+ extract colors in watchdogg
+ rename color db files
This commit is contained in:
mungai-njoroge
2023-07-12 08:56:30 +03:00
parent 4a7416853a
commit 861a854f91
18 changed files with 170 additions and 38 deletions
+20 -1
View File
@@ -1,3 +1,22 @@
"""
Contains methods relating to albums.
"""
"""
from tqdm import tqdm
from app.store.albums import AlbumStore
from app.store.tracks import TrackStore
def validate_albums():
"""
Removes albums that have no tracks.
Probably albums that were added from incompletely written files.
"""
album_hashes = {t.albumhash for t in TrackStore.tracks}
albums = AlbumStore.albums
for album in tqdm(albums, desc="Validating albums"):
if album.albumhash not in album_hashes:
AlbumStore.remove_album(album)
+7 -13
View File
@@ -9,8 +9,8 @@ import colorgram
from tqdm import tqdm
from app import settings
from app.db.sqlite.albums import SQLiteAlbumMethods as db
from app.db.sqlite.artists import SQLiteArtistMethods as adb
from app.db.sqlite.albumcolors import SQLiteAlbumMethods as aldb
from app.db.sqlite.artistcolors import SQLiteArtistMethods as adb
from app.db.sqlite.utils import SQLiteManager
from app.store.artists import ArtistStore
@@ -58,11 +58,8 @@ class ProcessAlbumColors:
with SQLiteManager() as cur:
try:
for album in tqdm(albums, desc="Processing missing album colors"):
sql = "SELECT COUNT(1) FROM albums WHERE albumhash = ?"
cur.execute(sql, (album.albumhash,))
count = cur.fetchone()[0]
if count != 0:
exists = aldb.exists(album.albumhash, cur=cur)
if exists:
continue
colors = process_color(album.albumhash)
@@ -72,7 +69,7 @@ class ProcessAlbumColors:
album.set_colors(colors)
color_str = json.dumps(colors)
db.insert_one_album(cur, album.albumhash, color_str)
aldb.insert_one_album(cur, album.albumhash, color_str)
finally:
cur.close()
@@ -90,12 +87,9 @@ class ProcessArtistColors:
for artist in tqdm(
all_artists, desc="Processing missing artist colors"
):
sql = "SELECT COUNT(1) FROM artists WHERE artisthash = ?"
exists = adb.exists(artist.artisthash, cur=cur)
cur.execute(sql, (artist.artisthash,))
count = cur.fetchone()[0]
if count != 0:
if exists:
continue
colors = process_color(artist.artisthash, is_album=False)
+5 -4
View File
@@ -9,10 +9,10 @@ from tqdm import tqdm
from app import settings
from app.db.sqlite.favorite import SQLiteFavoriteMethods as favdb
from app.db.sqlite.lastfm.similar_artists import \
SQLiteLastFMSimilarArtists as lastfmdb
from app.db.sqlite.lastfm.similar_artists import SQLiteLastFMSimilarArtists as lastfmdb
from app.db.sqlite.settings import SettingsSQLMethods as sdb
from app.db.sqlite.tracks import SQLiteTrackMethods
from app.lib.albumslib import validate_albums
from app.lib.artistlib import CheckArtistImages
from app.lib.colorlib import ProcessAlbumColors, ProcessArtistColors
from app.lib.taglib import extract_thumb, get_tags
@@ -51,6 +51,8 @@ class Populate:
POPULATE_KEY = key
validate_tracks()
validate_albums()
tracks = get_all_tracks()
dirs_to_scan = sdb.get_root_dirs()
@@ -122,8 +124,7 @@ class Populate:
if track.last_mod == os.path.getmtime(track.filepath):
unmodified.add(track.filepath)
continue
except FileNotFoundError:
print(f"File not found: {track.filepath}")
except (FileNotFoundError, OSError) as e:
TrackStore.remove_track_obj(track)
remove_tracks_by_filepaths(track.filepath)
+5 -1
View File
@@ -78,7 +78,11 @@ def extract_date(date_str: str | None) -> int | None:
def get_tags(filepath: str):
filetype = filepath.split(".")[-1]
filename = (filepath.split("/")[-1]).replace(f".{filetype}", "")
last_mod = os.path.getmtime(filepath)
try:
last_mod = os.path.getmtime(filepath)
except FileNotFoundError:
return None
try:
tags = TinyTag.get(filepath)
+2 -2
View File
@@ -11,9 +11,9 @@ from app.store.tracks import TrackStore
def validate_tracks() -> None:
"""
Gets all songs under the ~/ directory.
Removes track records whose files no longer exist.
"""
for track in tqdm(TrackStore.tracks, desc="Checking for deleted tracks"):
for track in tqdm(TrackStore.tracks, desc="Validating tracks"):
if not os.path.exists(track.filepath):
TrackStore.remove_track_obj(track)
tdb.remove_tracks_by_filepaths(track.filepath)
+33 -2
View File
@@ -1,6 +1,7 @@
"""
This library contains the classes and functions related to the watchdog file watcher.
"""
import json
import os
import sqlite3
import time
@@ -9,10 +10,14 @@ from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
from app import settings
from app.db.sqlite.settings import SettingsSQLMethods as sdb
from app.db.sqlite.tracks import SQLiteManager
from app.db.sqlite.tracks import SQLiteTrackMethods as db
from app.lib.taglib import get_tags
from app.db.sqlite.albumcolors import SQLiteAlbumMethods as aldb
from app.lib.colorlib import process_color
from app.lib.taglib import extract_thumb, get_tags
from app.logger import log
from app.models import Artist, Track
from app.store.albums import AlbumStore
@@ -123,6 +128,22 @@ class Watcher:
self.run()
def handle_colors(cur: sqlite3.Cursor, albumhash: str):
exists = aldb.exists(albumhash, cur)
if exists:
return
colors = process_color(albumhash, is_album=True)
if colors is None:
return
aldb.insert_one_album(cur=cur, albumhash=albumhash, colors=json.dumps(colors))
return colors
def add_track(filepath: str) -> None:
"""
Processes the audio tags for a given file ands add them to the database and store.
@@ -138,14 +159,23 @@ def add_track(filepath: str) -> None:
if tags is None or tags["bitrate"] == 0 or tags["duration"] == 0:
return
colors = None
with SQLiteManager() as cur:
db.insert_one_track(tags, cur)
extracted = extract_thumb(filepath, tags["albumhash"] + ".webp")
if not extracted:
return
colors = handle_colors(cur, tags["albumhash"])
track = Track(**tags)
TrackStore.add_track(track)
if not AlbumStore.album_exists(track.albumhash):
album = AlbumStore.create_album(track)
album.set_colors(colors)
AlbumStore.add_album(album)
artists: list[Artist] = track.artist + track.albumartist # type: ignore
@@ -154,6 +184,7 @@ def add_track(filepath: str) -> None:
if not ArtistStore.artist_exists(artist.artisthash):
ArtistStore.add_artist(Artist(artist.name))
extract_thumb(filepath, track.image)
def remove_track(filepath: str) -> None:
"""
@@ -277,7 +308,7 @@ class Handler(PatternMatchingEventHandler):
if current_size == previous_size:
# Wait for a short duration to ensure the file write operation is complete
time.sleep(0.5)
time.sleep(5)
# Check the file size again
current_size = os.path.getsize(event.src_path)