mirror of
https://github.com/Dvorinka/swingmusic-extended.git
synced 2026-06-05 04:53:01 +00:00
feat: support watching symlinks in watchdogg.py
+ remove code for auto-adding ~/Home to root_dirs during populate
This commit is contained in:
+104
-25
@@ -2,17 +2,22 @@
|
||||
This library contains the classes and functions related to the watchdog file watcher.
|
||||
"""
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
|
||||
from watchdog.events import PatternMatchingEventHandler
|
||||
from watchdog.observers import Observer
|
||||
|
||||
from app.db.sqlite.tracks import SQLiteManager
|
||||
from app.db.sqlite.tracks import SQLiteTrackMethods as db
|
||||
|
||||
from app.logger import log
|
||||
from app.db.store import Store
|
||||
from app.lib.taglib import get_tags
|
||||
from app.logger import log
|
||||
from app.models import Artist, Track
|
||||
from app import settings
|
||||
|
||||
from app.db.sqlite.tracks import SQLiteManager
|
||||
from app.db.sqlite.tracks import SQLiteTrackMethods as db
|
||||
from app.db.sqlite.settings import SettingsSQLMethods as sdb
|
||||
|
||||
|
||||
class Watcher:
|
||||
@@ -20,39 +25,96 @@ class Watcher:
|
||||
Contains the methods for initializing and starting watchdog.
|
||||
"""
|
||||
|
||||
home_dir = os.path.expanduser("~")
|
||||
dirs = [home_dir]
|
||||
observers: list[Observer] = []
|
||||
|
||||
def __init__(self):
|
||||
self.observer = Observer()
|
||||
|
||||
def run(self):
|
||||
event_handler = Handler()
|
||||
"""
|
||||
Starts watchers for each dir in root_dirs
|
||||
"""
|
||||
|
||||
for dir_ in self.dirs:
|
||||
trials = 0
|
||||
|
||||
while trials < 10:
|
||||
try:
|
||||
dirs = sdb.get_root_dirs()
|
||||
print(dirs)
|
||||
dir_map = [
|
||||
{"original": d, "realpath": os.path.realpath(d)} for d in dirs
|
||||
]
|
||||
break
|
||||
except sqlite3.OperationalError:
|
||||
trials += 1
|
||||
time.sleep(1)
|
||||
else:
|
||||
log.error(
|
||||
"WatchDogError: Failed to start Watchdog. Waiting for database timed out!"
|
||||
)
|
||||
return
|
||||
|
||||
if len(dirs) == 0:
|
||||
log.warning(
|
||||
"WatchDogInfo: No root directories configured. Watchdog not started."
|
||||
)
|
||||
return
|
||||
|
||||
dir_map = [d for d in dir_map if d['realpath'] != d['original']]
|
||||
|
||||
if len(dirs) > 0 and dirs[0] == "$home":
|
||||
dirs = [settings.USER_HOME_DIR]
|
||||
|
||||
event_handler = Handler(root_dirs=dirs, dir_map=dir_map)
|
||||
|
||||
for _dir in dirs:
|
||||
exists = os.path.exists(_dir)
|
||||
|
||||
if not exists:
|
||||
log.error("WatchdogError: Directory not found: %s", _dir)
|
||||
|
||||
for _dir in dirs:
|
||||
self.observer.schedule(
|
||||
event_handler, os.path.realpath(dir_), recursive=True
|
||||
event_handler, os.path.realpath(_dir), recursive=True
|
||||
)
|
||||
self.observers.append(self.observer)
|
||||
|
||||
try:
|
||||
self.observer.start()
|
||||
except OSError:
|
||||
log.error("Could not start watchdog.")
|
||||
log.info("Started watchdog")
|
||||
except FileNotFoundError:
|
||||
log.error(
|
||||
"WatchdogError: Failed to start watchdog, root directories could not be resolved."
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
for obsv in self.observers:
|
||||
obsv.unschedule_all()
|
||||
obsv.stop()
|
||||
self.stop_all()
|
||||
|
||||
for obsv in self.observers:
|
||||
obsv.join()
|
||||
|
||||
def stop_all(self):
|
||||
"""
|
||||
Unschedules and stops all existing watchers.
|
||||
"""
|
||||
log.info("Stopping all watchdog observers")
|
||||
for obsv in self.observers:
|
||||
obsv.unschedule_all()
|
||||
obsv.stop()
|
||||
|
||||
def restart(self):
|
||||
"""
|
||||
Stops all existing watchers, refetches root_dirs from the db
|
||||
and restarts the watchers.
|
||||
"""
|
||||
log.info("🔃 Restarting watchdog")
|
||||
self.stop_all()
|
||||
self.run()
|
||||
|
||||
|
||||
def add_track(filepath: str) -> None:
|
||||
"""
|
||||
@@ -118,9 +180,13 @@ def remove_track(filepath: str) -> None:
|
||||
|
||||
class Handler(PatternMatchingEventHandler):
|
||||
files_to_process = []
|
||||
root_dirs = []
|
||||
dir_map = []
|
||||
|
||||
def __init__(self, root_dirs: list[str], dir_map: dict[str:str]):
|
||||
self.root_dirs = root_dirs
|
||||
self.dir_map = dir_map
|
||||
|
||||
def __init__(self):
|
||||
log.info("✅ started watchdog")
|
||||
PatternMatchingEventHandler.__init__(
|
||||
self,
|
||||
patterns=["*.flac", "*.mp3"],
|
||||
@@ -128,6 +194,16 @@ class Handler(PatternMatchingEventHandler):
|
||||
case_sensitive=False,
|
||||
)
|
||||
|
||||
def get_abs_path(self, path: str):
|
||||
"""
|
||||
Convert a realpath to a path relative to the matching root directory.
|
||||
"""
|
||||
for d in self.dir_map:
|
||||
if d["realpath"] in path:
|
||||
return path.replace(d["realpath"], d["original"])
|
||||
|
||||
return path
|
||||
|
||||
def on_created(self, event):
|
||||
"""
|
||||
Fired when a supported file is created.
|
||||
@@ -138,8 +214,8 @@ class Handler(PatternMatchingEventHandler):
|
||||
"""
|
||||
Fired when a delete event occurs on a supported file.
|
||||
"""
|
||||
|
||||
remove_track(event.src_path)
|
||||
path = self.get_abs_path(event.src_path)
|
||||
remove_track(path)
|
||||
|
||||
def on_moved(self, event):
|
||||
"""
|
||||
@@ -148,14 +224,19 @@ class Handler(PatternMatchingEventHandler):
|
||||
trash = "share/Trash"
|
||||
|
||||
if trash in event.dest_path:
|
||||
remove_track(event.src_path)
|
||||
path = self.get_abs_path(event.src_path)
|
||||
remove_track(path)
|
||||
|
||||
elif trash in event.src_path:
|
||||
add_track(event.dest_path)
|
||||
path = self.get_abs_path(event.dest_path)
|
||||
add_track(path)
|
||||
|
||||
elif trash not in event.dest_path and trash not in event.src_path:
|
||||
add_track(event.dest_path)
|
||||
remove_track(event.src_path)
|
||||
dest_path = self.get_abs_path(event.dest_path)
|
||||
src_path = self.get_abs_path(event.src_path)
|
||||
|
||||
add_track(dest_path)
|
||||
remove_track(src_path)
|
||||
|
||||
def on_closed(self, event):
|
||||
"""
|
||||
@@ -164,9 +245,7 @@ class Handler(PatternMatchingEventHandler):
|
||||
try:
|
||||
self.files_to_process.remove(event.src_path)
|
||||
if os.path.getsize(event.src_path) > 0:
|
||||
add_track(event.src_path)
|
||||
path = self.get_abs_path(event.src_path)
|
||||
add_track(path)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
# watcher = Watcher()
|
||||
|
||||
Reference in New Issue
Block a user