add json config and its manager class

+ rewrite logic to prevent removing last admin role
+ handle showing users on login and enabling guest
This commit is contained in:
mungai-njoroge
2024-04-29 16:31:30 +03:00
parent 0ff5661765
commit cfeff7ff51
6 changed files with 234 additions and 23 deletions
+95 -19
View File
@@ -1,14 +1,21 @@
import json
from dataclasses import asdict
from functools import wraps
import sqlite3
from flask import jsonify
from flask_jwt_extended import create_access_token, current_user, set_access_cookies
from flask_jwt_extended import (
create_access_token,
current_user,
jwt_required,
set_access_cookies,
)
from pydantic import BaseModel, Field
from flask_openapi3 import Tag
from flask_openapi3 import APIBlueprint
from app.db.sqlite.auth import SQLiteAuthMethods as authdb
from app.utils.auth import check_password, encode_password
from app.config import UserConfig
bp_tag = Tag(name="Auth", description="Authentication stuff")
api = APIBlueprint("auth", __name__, url_prefix="/auth", abp_tags=[bp_tag])
@@ -51,7 +58,7 @@ def login(body: LoginBody):
password_ok = check_password(body.password, user.password)
if not password_ok:
return {"msg": "Invalid password"}, 401
return {"msg": "Hehe! invalid password"}, 401
access_token = create_access_token(identity=user.todict())
set_access_cookies(res, access_token)
@@ -59,38 +66,55 @@ def login(body: LoginBody):
class UpdateProfileBody(BaseModel):
id: int = Field(0, description="The user id")
email: str = Field("", description="The email")
username: str = Field("", description="The username", example="user0")
password: str = Field("", description="The password", example="password0")
roles: list[str] = Field([], description="The roles")
roles: list[str] = Field(None, description="The roles")
@api.put("/profile/update")
def update_profile(body: UpdateProfileBody):
user = {
"id": current_user["id"],
"id": body.id,
"email": body.email,
"username": body.username,
"password": body.password,
"roles": body.roles,
}
# if not id, update self
if not user["id"]:
user["id"] = current_user["id"]
print("current_user: ", current_user)
# only admins can update roles
if body.roles:
if "admin" in current_user["roles"]:
# prevent admin from locking themselves out
roles = set(body.roles)
roles.add("admin")
user["roles"] = json.dumps(list(roles))
else:
user.pop("roles")
if body.roles is not None:
if "admin" not in current_user["roles"]:
return {"msg": "Only admins can update roles"}, 403
if "admin" not in body.roles:
# check if we're removing the last admin
users = authdb.get_all_users()
admins = [user for user in users if "admin" in user.roles]
if len(admins) == 1 and admins[0].id == user["id"]:
return {"msg": "Cannot remove the only admin"}, 400
user["roles"] = json.dumps(body.roles)
if user["password"]:
user["password"] = encode_password(user["password"])
# remove empty values
clean_user = {k: v for k, v in user.items() if v}
return authdb.update_user(clean_user)
try:
return authdb.update_user(clean_user)
except sqlite3.IntegrityError:
return {"msg": "Username already exists"}, 400
@api.post("/profile/create")
@@ -150,7 +174,7 @@ def delete_user(body: DeleteUseBody):
Delete a user by username
"""
# prevent admin from deleting themselves
if body.username == current_user["usrname"]:
if body.username == current_user["username"]:
return {"msg": "Sorry! you cannot delete yourselfu"}, 400
# prevent deleting the only admin
@@ -160,6 +184,11 @@ def delete_user(body: DeleteUseBody):
return {"msg": "Cannot delete the only admin"}, 400
authdb.delete_user_by_username(body.username)
# if user is guest, update config
if body.username == "guest":
UserConfig().enableGuest = False
return {"msg": f"User {body.username} deleted"}
@@ -180,14 +209,51 @@ class GetAllUsersQuery(BaseModel):
@api.get("/users")
@jwt_required(optional=True)
def get_all_users(query: GetAllUsersQuery):
"""
Get all users
Get all users (if you're an admin, you will also receive accounts settings)
"""
config = UserConfig()
# config.enableGuest = True
# config.usersOnLogin = True
settings = {
"enableGuest": config.enableGuest,
"usersOnLogin": config.usersOnLogin,
}
res = {
"settings": {},
"users": [],
}
# if user is admin, also return settings
if current_user and "admin" in current_user["roles"]:
res = {
"settings": settings,
}
# if is normal user, return empty response
elif current_user:
return res
# if not logged in and showing users on login is disabled, return empty response
elif (
not current_user
and not settings["usersOnLogin"]
and not settings["enableGuest"]
):
return res
users = authdb.get_all_users()
# remove guest user
users = [user for user in users if user.username != "guest"]
print("settings: ", settings["enableGuest"])
if not settings["enableGuest"]:
users = [user for user in users if user.username != "guest"]
if not settings["usersOnLogin"]:
users = [user for user in users if user.username == "guest"]
# reverse list to show latest users first
users = list(reversed(users))
@@ -195,10 +261,20 @@ def get_all_users(query: GetAllUsersQuery):
# bring admins to the front
users = sorted(users, key=lambda x: "admin" in x.roles, reverse=True)
if query.simplified:
return [user.todict_simplified() for user in users]
# bring current user to index 0
if current_user:
users = sorted(
users,
key=lambda x: x.username == current_user["username"],
reverse=True,
)
return [user.todict() for user in users]
if query.simplified:
res["users"] = [user.todict_simplified() for user in users]
res["users"] = [user.todict() for user in users]
return res
@api.route("/user")