support jwt on headers

This commit is contained in:
cwilvx
2024-05-30 22:52:42 +03:00
parent c8325101d5
commit 8fa64b3a4d
6 changed files with 76 additions and 20 deletions
+4
View File
@@ -7,4 +7,8 @@
- The context menu now doesn't take forever to open up
- Merged "Save as Playlist" with "Add to Playlist" > "New Playlist"
## Bug fixes
- Add to queue adding to last index -1
-
## Development
+3
View File
@@ -6,6 +6,9 @@
2. Favorites
- Package jsoni and publish on PyPi
- Rewrite stores to use dictionaries instead of list pools
- last updated date on tracks added via watchdog is broken
- hide "remove from playlist" option on system playlists
- Support auth headers
# DONE
- Add recently played playlist
+1 -1
View File
@@ -65,7 +65,7 @@ def create_api():
app = OpenAPI(__name__, info=api_info, doc_prefix="/docs")
# JWT CONFIGS
app.config["JWT_SECRET_KEY"] = UserConfig().userId
app.config["JWT_TOKEN_LOCATION"] = ["cookies"]
app.config["JWT_TOKEN_LOCATION"] = ["cookies", "headers"]
app.config["JWT_COOKIE_CSRF_PROTECT"] = False
app.config["JWT_SESSION_COOKIE"] = False
+42 -6
View File
@@ -4,7 +4,9 @@ import sqlite3
from flask import current_app, jsonify
from flask_jwt_extended import (
create_access_token,
create_refresh_token,
current_user,
get_jwt_identity,
jwt_required,
set_access_cookies,
)
@@ -37,6 +39,21 @@ def admin_required():
return wrapper
def create_new_token(user: dict):
"""
Create a new token response
"""
access_token = create_access_token(identity=user)
max_age: int = current_app.config.get("JWT_ACCESS_TOKEN_EXPIRES")
return {
"msg": f"Logged in as {user['username']}",
"acccesstoken": access_token,
"refreshtoken": create_refresh_token(identity=user),
"maxage": max_age,
}
class LoginBody(BaseModel):
username: str = Field(description="The username", example="user0")
password: str = Field(description="The password", example="password0")
@@ -47,7 +64,6 @@ def login(body: LoginBody):
"""
Authenticate using username and password
"""
res = jsonify({"msg": f"Logged in as {body.username}"})
user = authdb.get_user_by_username(body.username)
@@ -59,14 +75,28 @@ def login(body: LoginBody):
if not password_ok:
return {"msg": "Hehe! invalid password"}, 401
access_token = create_access_token(identity=user.todict())
max_age: int = current_app.config.get("JWT_ACCESS_TOKEN_EXPIRES")
set_access_cookies(res, access_token, max_age=max_age)
res = create_new_token(user.todict())
token = res["acccesstoken"]
age = res["maxage"]
res = jsonify(res)
set_access_cookies(res, token, max_age=age)
return res
@api.post("/refresh")
@jwt_required(refresh=True)
def refresh():
"""
Refresh an access token by sending a refresh token in the Authorization header
>>> Headers:
>>> Authorization: Bearer <refresh_token>
"""
user = get_jwt_identity()
return create_new_token(user)
class UpdateProfileBody(BaseModel):
id: int = Field(0, description="The user id")
email: str = Field("", description="The email")
@@ -77,6 +107,9 @@ class UpdateProfileBody(BaseModel):
@api.put("/profile/update")
def update_profile(body: UpdateProfileBody):
"""
Update user profile
"""
user = {
"id": body.id,
"email": body.email,
@@ -129,6 +162,9 @@ def update_profile(body: UpdateProfileBody):
@api.post("/profile/create")
@admin_required()
def create_user(body: UpdateProfileBody):
"""
Create a new user
"""
if not body.username or not body.password:
return {"msg": "Username and password are required"}, 400
@@ -199,7 +235,7 @@ def delete_user(body: DeleteUseBody):
@api.get("/logout")
def logout():
"""
Log out
Log out and clear the access token cookie
"""
res = jsonify({"msg": "Logged out"})
res.delete_cookie("access_token_cookie")
-1
View File
@@ -156,7 +156,6 @@ class Populate:
tagged_count = 0
favs = favdb.get_fav_tracks()
records = dict()
for fav in favs:
+26 -12
View File
@@ -47,6 +47,7 @@ mimetypes.add_type("application/manifest+json", ".webmanifest")
werkzeug = logging.getLogger("werkzeug")
werkzeug.setLevel(logging.ERROR)
# Background tasks
@background
def bg_run_setup():
@@ -83,25 +84,32 @@ app = create_api()
app.static_folder = get_home_res_path("client")
# INFO: Routes that don't need authentication
whitelisted_routes = {"/auth/login", "/auth/users", "/auth/logout", "/docs"}
whitelisted_routes = {"/auth/login", "/auth/users", "/auth/logout", "/auth/refresh", "/docs"}
blacklist_extensions = {".webp"}.union(getClientFilesExtensions())
def skipAuthAction():
"""
Skips the JWT verification for the current request.
"""
if request.path == "/" or any(
request.path.endswith(ext) for ext in blacklist_extensions
):
return True
# if request path starts with any of the blacklisted routes, don't verify jwt
if any(request.path.startswith(route) for route in whitelisted_routes):
return True
return False
@app.before_request
def verify_auth():
"""
Verifies the JWT token before each request.
"""
if request.path == "/" or any(
request.path.endswith(ext) for ext in blacklist_extensions
):
return
# if request path starts with any of the blacklisted routes, don't verify jwt
if any(request.path.startswith(route) for route in whitelisted_routes):
# print(
# "Found whitelisted route: ", request.path, "... Skipping jwt verification"
# )
if skipAuthAction():
return
verify_jwt_in_request()
@@ -110,8 +118,14 @@ def verify_auth():
@app.after_request
def refresh_expiring_jwt(response: Response):
"""
Refreshes the JWT token after each request.
Refreshes the cookies JWT token after each request.
"""
# INFO: If the request has an Authorization header, don't refresh the jwt
# Request is probably from the mobile client or a third party
if skipAuthAction() or request.headers.get("Authorization"):
return response
try:
exp_timestamp = get_jwt()["exp"]
now = datetime.now(timezone.utc)