rewrite migrations

+ delete older migrations ... oops
+ change migratrions from "migrations" to "dbmigrations"
+ restructure migrations, order them based on release version
+ add a utils/decorators.py file with a coroutine decorator
This commit is contained in:
mungai-njoroge
2023-07-29 06:46:28 +03:00
parent a0c51d5f82
commit 93de3d2f0c
19 changed files with 327 additions and 288 deletions
+22 -25
View File
@@ -6,14 +6,25 @@ Reads and applies the latest database migrations.
PLEASE NOTE: OLDER MIGRATIONS CAN NEVER BE DELETED.
ONLY MODIFY OLD MIGRATIONS FOR BUG FIXES OR ENHANCEMENTS ONLY
[TRY NOT TO MODIFY BEHAVIOR, UNLESS YOU KNOW WHAT YOU'RE DOING].
PS: Fuck that! Do what you want.
"""
from app.db.sqlite.migrations import MigrationManager
from app.logger import log
from app.migrations import v1_3_0
from .main import main_db_migrations
from .userdata import userdata_db_migrations
migrations = [
[
# v1.3.0
v1_3_0.RemovePlaylistArtistHashes,
v1_3_0.AddSettingsToPlaylistTable,
v1_3_0.AddLastUpdatedToTrackTable,
v1_3_0.MovePlaylistsAndFavoritesTo10BitHashes,
v1_3_0.RemoveAllTracks,
]
]
def apply_migrations():
@@ -21,28 +32,14 @@ def apply_migrations():
Applies the latest database migrations.
"""
userdb_version = MigrationManager.get_userdatadb_postinit_version()
maindb_version = MigrationManager.get_maindb_postinit_version()
version = MigrationManager.get_version()
# No migrations to run
if userdb_version == 0 and maindb_version == 0:
return
if version != len(migrations):
# run migrations after the previous migration version
for migration in migrations[(version - 1) :]:
for m in migration:
log.info("Running new migration: %s", m.name)
m.migrate()
for migration in main_db_migrations:
if migration.version > maindb_version:
log.info("Running new MAIN-DB post-init migration: %s", migration.name)
migration.migrate()
for migration in userdata_db_migrations:
if migration.version > userdb_version:
log.info("Running new USERDATA-DB post-init migration: %s", migration.name)
migration.migrate()
def set_postinit_migration_versions():
"""
Sets the post-init migration versions.
"""
# TODO: Don't forget to remove the zeros below when you add a valid migration 👇.
MigrationManager.set_maindb_postinit_version(0)
MigrationManager.set_userdatadb_postinit_version(0)
# update migration version
MigrationManager.set_version(len(migrations))
-45
View File
@@ -1,45 +0,0 @@
"""
Pre-init migrations are executed before the database is created.
Useful when you need to move files or folders before the database is created.
`Example use cases: Moving files around, dropping tables, etc.`
PLEASE NOTE: OLDER MIGRATIONS CAN NEVER BE DELETED.
ONLY MODIFY OLD MIGRATIONS FOR BUG FIXES OR ENHANCEMENTS ONLY.
[TRY NOT TO MODIFY BEHAVIOR, UNLESS YOU KNOW WHAT YOU'RE DOING].
"""
from sqlite3 import OperationalError
from app.db.sqlite.migrations import MigrationManager
from app.logger import log
from .drop_artist_and_album_color_tables import DropArtistAndAlbumColorTables
from .move_to_xdg_folder import MoveToXdgFolder
all_preinits = [MoveToXdgFolder, DropArtistAndAlbumColorTables]
def run_preinit_migrations():
"""
Runs all pre-init migrations.
"""
try:
userdb_version = MigrationManager.get_preinit_version()
except OperationalError:
userdb_version = 0
# No migrations to run
if userdb_version == 0:
return
for migration in all_preinits:
if migration.version > userdb_version:
log.warn("Running new pre-init migration: %s", migration.name)
migration.migrate()
def set_preinit_migration_versions():
"""
Sets the migration versions.
"""
MigrationManager.set_preinit_version(all_preinits[-1].version)
@@ -1,24 +0,0 @@
"""
Another shot at attempting to fix duplicate album and artist color entries.
This release should finally fix the issue. The migration script will now remove
the album and artist color tables and recreate them.
"""
from app.db.sqlite.utils import SQLiteManager
from app.logger import log
class DropArtistAndAlbumColorTables:
version = 2
name = "DropArtistAndAlbumColorTables"
@staticmethod
def migrate():
with SQLiteManager() as cur:
tables = ["artists", "albums"]
for table in tables:
cur.execute(f"DROP TABLE IF EXISTS {table}")
cur.execute("VACUUM")
log.info("Deleted artist and album color data to fix a few bugs. ✅")
@@ -1,49 +0,0 @@
"""
This migration handles moving the config folder to the XDG standard location.
It also handles moving the userdata and the downloaded artist images to the new location.
"""
import os
import shutil
from app.settings import Paths
from app.logger import log
class MoveToXdgFolder:
version = 1
name = "MoveToXdgFolder"
@staticmethod
def migrate():
old_config_dir = os.path.join(Paths.USER_HOME_DIR, ".swing")
new_config_dir = Paths.get_app_dir()
if not os.path.exists(old_config_dir):
log.info("No old config folder found. Skipping migration.")
return
log.info("Found old config folder: %s", old_config_dir)
old_imgs_dir = os.path.join(old_config_dir, "images")
# move images to new location
if os.path.exists(old_imgs_dir):
shutil.copytree(
old_imgs_dir,
os.path.join(new_config_dir, "images"),
copy_function=shutil.copy2,
dirs_exist_ok=True,
)
log.warn("Moved artist images to: %s", new_config_dir)
# move userdata.db to new location
userdata_db = os.path.join(old_config_dir, "userdata.db")
if os.path.exists(userdata_db):
shutil.copy2(userdata_db, new_config_dir)
log.warn("Moved userdata.db to: %s", new_config_dir)
log.warn("Migration complete. ✅")
# swing.db is not moved because the new code fixes bugs which require
# the whole database to be recreated anyway. (ie. the bug which caused duplicate album and artist color entries)
-10
View File
@@ -1,10 +0,0 @@
"""
Migrations for the main database.
PLEASE NOTE: OLDER MIGRATIONS CAN NEVER BE DELETED.
ONLY MODIFY OLD MIGRATIONS FOR BUG FIXES OR ENHANCEMENTS ONLY
[TRY NOT TO MODIFY BEHAVIOR, UNLESS YOU KNOW WHAT YOU'RE DOING].
"""
main_db_migrations = []
-10
View File
@@ -1,10 +0,0 @@
"""
Migrations for the userdata database.
PLEASE NOTE: OLDER MIGRATIONS CAN NEVER BE DELETED.
ONLY MODIFY OLD MIGRATIONS FOR BUG FIXES OR ENHANCEMENTS ONLY
[TRY NOT TO MODIFY BEHAVIOR, UNLESS YOU KNOW WHAT YOU'RE DOING].
"""
userdata_db_migrations = []
+257
View File
@@ -0,0 +1,257 @@
import json
import time
from collections import OrderedDict
from typing import Generator
from app.db.sqlite.utils import SQLiteManager
from app.utils.decorators import coroutine
from app.utils.hashing import create_hash
# playlists table
# ---------------
# 0: id
# 1: banner_pos
# 2: has_gif
# 3: image
# 4: last_updated
# 5: name
# 6: trackhashes
class RemovePlaylistArtistHashes:
"""
This migration removes the artisthashes column from the playlists table.
"""
name = "RemovePlaylistArtistHashes"
@staticmethod
def migrate():
# remove artisthashes column
sql = "ALTER TABLE playlists DROP COLUMN artisthashes"
with SQLiteManager(userdata_db=True) as cur:
cur.execute(sql)
cur.close()
class AddSettingsToPlaylistTable:
"""
This migration adds the settings column and removes the banner_pos and has_gif columns
to the playlists table.
"""
name = "AddSettingsToPlaylistTable"
@staticmethod
def migrate():
# existing_playlists = []
select_playlists_sql = "SELECT * FROM playlists"
with SQLiteManager(userdata_db=True) as cur:
create_playlist_table_sql = """CREATE TABLE IF NOT EXISTS playlists (
id integer PRIMARY KEY,
image text,
last_updated text not null,
name text not null,
settings text,
trackhashes text
);"""
insert_playlist_sql = """INSERT INTO playlists(
image,
last_updated,
name,
settings,
trackhashes
) VALUES(:image, :last_updated, :name, :settings, :trackhashes)
"""
cur.execute(select_playlists_sql)
# load all playlists
playlists = cur.fetchall()
# drop old playlists table
cur.execute("DROP TABLE playlists")
# create new playlists table
cur.execute(create_playlist_table_sql)
def transform_playlists(pipeline: Generator, playlists: tuple):
for playlist in playlists:
# create dict that matches the new schema
p = {
"id": playlist[0],
"name": playlist[5],
"image": playlist[3],
"trackhashes": playlist[6],
"last_updated": playlist[4],
"settings": json.dumps(
{
"has_gif": False,
"banner_pos": playlist[1],
"square_img": False,
}
),
}
pipeline.send(p)
@coroutine
def insert_playlist():
while True:
playlist = yield
p = OrderedDict(sorted(playlist.items()))
cur.execute(insert_playlist_sql, p)
# insert playlists using a coroutine
# (my first coroutine)
pipeline = insert_playlist()
transform_playlists(pipeline, playlists)
cur.close()
class AddLastUpdatedToTrackTable:
"""
This migration adds the last modified column to the tracks table.
"""
name = "AddLastUpdatedToTrackTable"
@staticmethod
def migrate():
# add last_mod column and default to current timestamp
timestamp = time.time()
sql = f"ALTER TABLE tracks ADD COLUMN last_mod text not null DEFAULT '{timestamp}'"
with SQLiteManager() as cur:
cur.execute(sql)
cur.close()
class MovePlaylistsAndFavoritesTo10BitHashes:
"""
This migration moves the playlists and favorites to 10 bit hashes.
"""
name = "MovePlaylistsAndFavoritesTo10BitHashes"
@staticmethod
def migrate():
def get_track_data_by_hash(trackhash: str, tracks: list[tuple]) -> tuple:
for track in tracks:
# trackhash is the 15th bit hash
if track[15] == trackhash:
# return artist, album, title
return track[4], track[1], track[13]
def get_track_by_albumhash(albumhash: str, tracks: list[tuple]) -> tuple:
for track in tracks:
# albumhash is the 3rd bit hash
if track[3] == albumhash:
# return album, albumartist
return track[1], track[2]
_base = "SELECT * FROM"
fetch_playlists_sql = f"{_base} playlists"
fetch_tracks_sql = f"{_base} tracks"
update_playlist_hashes_sql = (
"UPDATE playlists SET trackhashes = :trackhashes WHERE id = :id"
)
fetch_favorites_sql = f"{_base} favorites"
update_fav_sql = "UPDATE favorites SET hash = :hash WHERE id = :id"
remove_fav_sql = "DELETE FROM favorites WHERE id = :id"
db_tracks = []
# read tracks from db
with SQLiteManager() as cur:
cur.execute(fetch_tracks_sql)
db_tracks.extend(cur.fetchall())
cur.close()
# update playlists
with SQLiteManager(userdata_db=True) as cur:
cur.execute(fetch_playlists_sql)
playlists = cur.fetchall()
# for each playlist
for p in playlists:
pid = p[0]
# load trackhashes
trackhashes: list[str] = json.loads(p[5])
for index, t in enumerate(trackhashes):
(artist, album, title) = get_track_data_by_hash(t, db_tracks)
# create new hash
new_hash = create_hash(artist, album, title, decode=True, limit=10)
trackhashes[index] = new_hash
# convert to string
trackhashes = json.dumps(trackhashes)
# save to db
cur.execute(
update_playlist_hashes_sql, {"trackhashes": trackhashes, "id": pid}
)
cur.close()
# update favorites
with SQLiteManager(userdata_db=True) as cur:
cur.execute(fetch_favorites_sql)
favorites = cur.fetchall()
# for each favorite
for f in favorites:
fid = f[0]
fhash: str = f[1]
ftype: str = f[2] # "track" || "album"
if ftype == "album":
(album, albumartist) = get_track_by_albumhash(fhash, db_tracks)
# create new hash
new_hash = create_hash(album, albumartist, decode=True, limit=10)
# save to db
cur.execute(update_fav_sql, {"hash": new_hash, "id": fid})
continue
if ftype == "track":
(artist, album, title) = get_track_data_by_hash(fhash, db_tracks)
# create new hash
new_hash = create_hash(artist, album, title, decode=True, limit=10)
# save to db
cur.execute(update_fav_sql, {"hash": new_hash, "id": fid})
continue
# remove favorites that are not track or album. ie. artists
cur.execute(remove_fav_sql, {"id": fid})
cur.close()
class RemoveAllTracks:
"""
This migration removes all tracks from the tracks table.
"""
name = "RemoveAllTracks"
@staticmethod
def migrate():
sql = "DELETE FROM tracks"
with SQLiteManager() as cur:
cur.execute(sql)
cur.close()