return jwt.encode(payload, TOKEN_SECRET, algorithm=TOKEN_ALGO)
+def get_principal(token):
+ """
+ Returns the authenticated principal from the given token
+ """
+ try:
+ payload = jwt.decode(token, TOKEN_SECRET, algorithms=[TOKEN_ALGO])
+
+ # Fail if we could not decode the token
+ except jwt.InvalidTokenError as e:
+ print(e)
+ raise fastapi.HTTPException(status_code=401, detail="Invalid refresh token")
+
+ # Extract the principal
+ principal = payload.get("sub")
+ if not principal:
+ raise fastapi.HTTPException(status_code=401, detail="Invalid refresh token")
+
+ # XXX Should this hit the database to check the principal exists?
+
+ return principal
@router.post("/user")
async def auth_user(credentials: fastapi.security.OAuth2PasswordRequestForm =
@router.post("/refresh")
async def auth_refresh(data: RefreshRequest):
- try:
- payload = jwt.decode(data.refresh_token, TOKEN_SECRET, algorithms=[TOKEN_ALGO])
-
- # Fail if we could not decode the token
- except jwt.InvalidTokenError as e:
- print(e)
- raise fastapi.HTTPException(status_code=401, detail="Invalid refresh token")
-
- # Extract the principal
- principal = payload.get("sub")
- if not principal:
- raise fastapi.HTTPException(status_code=401, detail="Invalid refresh token")
+ # Fetch the principal from the given token
+ principal = get_principal(data.refresh_token)
# XXX Check if the principal actually still exists