add tests for sqlitemanager util class

+ implement pendulum in date_string_to_time_passed()
+ remove unused bisection_search_string
This commit is contained in:
mungai-njoroge
2023-06-21 12:18:19 +03:00
parent 9d4f7af581
commit 4d310c39c3
10 changed files with 186 additions and 161 deletions
+14 -7
View File
@@ -10,7 +10,7 @@ from .utils import SQLiteManager
class SQLiteArtistMethods:
@staticmethod
def insert_one_artist(cur: Cursor, artisthash: str, colors: str | list[str]):
def insert_one_artist(cur: Cursor, artisthash: str, colors: list[str]):
"""
Inserts a single artist into the database.
"""
@@ -23,16 +23,23 @@ class SQLiteArtistMethods:
cur.execute(sql, (artisthash, colors))
@staticmethod
def get_all_artists():
def get_all_artists(cur_: Cursor = None):
"""
Get all artists from the database and return a generator of Artist objects
"""
sql = """SELECT * FROM artists"""
with SQLiteManager() as cur:
cur.execute(sql)
if not cur_:
with SQLiteManager() as cur:
cur.execute(sql)
for artist in cur.fetchall():
for artist in cur.fetchall():
yield artist
cur.close()
else:
cur_.execute(sql)
for artist in cur_.fetchall():
yield artist
cur.close()
+11 -2
View File
@@ -62,7 +62,12 @@ class SQLiteManager:
for you. It also commits and closes the connection when you're done.
"""
def __init__(self, conn: Optional[Connection] = None, userdata_db=False) -> None:
def __init__(
self,
conn: Optional[Connection] = None,
userdata_db=False,
test_db_path: str = None,
) -> None:
"""
When a connection is passed in, don't close the connection, because it's
a connection to the search database [in memory db].
@@ -70,6 +75,7 @@ class SQLiteManager:
self.conn = conn
self.CLOSE_CONN = True
self.userdata_db = userdata_db
self.test_db_path = test_db_path
if conn:
self.conn = conn
@@ -79,7 +85,10 @@ class SQLiteManager:
if self.conn is not None:
return self.conn.cursor()
db_path = Db.get_app_db_path()
if self.test_db_path:
db_path = self.test_db_path
else:
db_path = Db.get_app_db_path()
if self.userdata_db:
db_path = Db.get_userdata_db_path()
+31 -27
View File
@@ -56,24 +56,25 @@ class ProcessAlbumColors:
albums = [a for a in AlbumStore.albums if len(a.colors) == 0]
with SQLiteManager() as cur:
for album in tqdm(albums, desc="Processing missing album colors"):
sql = "SELECT COUNT(1) FROM albums WHERE albumhash = ?"
cur.execute(sql, (album.albumhash,))
count = cur.fetchone()[0]
try:
for album in tqdm(albums, desc="Processing missing album colors"):
sql = "SELECT COUNT(1) FROM albums WHERE albumhash = ?"
cur.execute(sql, (album.albumhash,))
count = cur.fetchone()[0]
if count != 0:
continue
if count != 0:
continue
colors = process_color(album.albumhash)
colors = process_color(album.albumhash)
if colors is None:
continue
if colors is None:
continue
album.set_colors(colors)
color_str = json.dumps(colors)
db.insert_one_album(cur, album.albumhash, color_str)
cur.close()
album.set_colors(colors)
color_str = json.dumps(colors)
db.insert_one_album(cur, album.albumhash, color_str)
finally:
cur.close()
class ProcessArtistColors:
@@ -85,21 +86,24 @@ class ProcessArtistColors:
all_artists = [a for a in ArtistStore.artists if len(a.colors) == 0]
with SQLiteManager() as cur:
for artist in tqdm(all_artists, desc="Processing missing artist colors"):
sql = "SELECT COUNT(1) FROM artists WHERE artisthash = ?"
try:
for artist in tqdm(
all_artists, desc="Processing missing artist colors"
):
sql = "SELECT COUNT(1) FROM artists WHERE artisthash = ?"
cur.execute(sql, (artist.artisthash,))
count = cur.fetchone()[0]
cur.execute(sql, (artist.artisthash,))
count = cur.fetchone()[0]
if count != 0:
continue
if count != 0:
continue
colors = process_color(artist.artisthash, is_album=False)
colors = process_color(artist.artisthash, is_album=False)
if colors is None:
continue
if colors is None:
continue
artist.set_colors(colors)
adb.insert_one_artist(cur, artist.artisthash, colors)
cur.close()
artist.set_colors(colors)
adb.insert_one_artist(cur, artist.artisthash, colors)
finally:
cur.close()
+2 -2
View File
@@ -20,9 +20,9 @@ class ArtistStore:
"""
cls.artists = get_all_artists(TrackStore.tracks, AlbumStore.albums)
db_artists: list[tuple] = list(ardb.get_all_artists())
# db_artists: list[tuple] = list(ardb.get_all_artists())
for art in tqdm(db_artists, desc="Loading artists"):
for art in tqdm(ardb.get_all_artists(), desc="Loading artists"):
cls.map_artist_color(art)
@classmethod
+3 -25
View File
@@ -6,7 +6,9 @@ class UseBisection:
items.
"""
def __init__(self, source: list, search_from: str, queries: list[str], limit=-1) -> None:
def __init__(
self, source: list, search_from: str, queries: list[str], limit=-1
) -> None:
self.source_list = source
self.queries_list = queries
self.attr = search_from
@@ -45,27 +47,3 @@ class UseBisection:
break
return results
def bisection_search_string(strings: list[str], target: str) -> str | None:
"""
Finds a string in a list of strings using bisection search.
"""
if not strings:
return None
strings = sorted(strings)
left = 0
right = len(strings) - 1
while left <= right:
middle = (left + right) // 2
if strings[middle] == target:
return strings[middle]
if strings[middle] < target:
left = middle + 1
else:
right = middle - 1
return None
+3 -40
View File
@@ -1,3 +1,4 @@
import pendulum
from datetime import datetime
_format = "%Y-%m-%d %H:%M:%S"
@@ -21,44 +22,6 @@ def date_string_to_time_passed(prev_date: str) -> str:
diff = now - then
seconds = diff.seconds
print(seconds)
if seconds < 0:
return "from the future 🛸"
if seconds < 15:
return "now"
if seconds < 60:
return f"{int(seconds)} seconds ago"
if seconds < 3600:
return f"{int(seconds // 60)} minutes ago"
if seconds < 86400:
return f"{int(seconds // 3600)} hours ago"
days = diff.days
if days == 1:
return "yesterday"
if days < 7:
return f"{days} days ago"
if days < 14:
return "1 week ago"
if days < 30:
return f"{int(days // 7)} weeks ago"
if days < 60:
return "1 month ago"
if days < 365:
return f"{int(days // 30)} months ago"
if days < 730:
return "1 year ago"
return f"{int(days // 365)} years ago"
now = pendulum.now()
return now.subtract(seconds=seconds).diff_for_humans()