From: Michael Tremer Date: Wed, 2 Jul 2025 17:23:41 +0000 (+0000) Subject: api: Create a couple more helper functions to fetch the right objects X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=95713ab7a28ca838c2ae75ebe1f81ed69d9961d2;p=pbs.git api: Create a couple more helper functions to fetch the right objects Signed-off-by: Michael Tremer --- diff --git a/src/api/auth.py b/src/api/auth.py index bb7c4651..5809ce0c 100644 --- a/src/api/auth.py +++ b/src/api/auth.py @@ -31,7 +31,8 @@ import typing from . import apiv1 from . import backend -from .. import users +from ..builders import Builder +from ..users import User # Fetch Kerberos configuration KERBEROS_KEYTAB = backend.config.get("krb5", "keytab") @@ -66,8 +67,8 @@ class HTTPBearer(fastapi.security.HTTPBearer): security = HTTPBearer() async def get_current_principal( - credentials: fastapi.security.HTTPAuthorizationCredentials = fastapi.Depends(security) -) -> users.User: + credentials: fastapi.security.HTTPAuthorizationCredentials = fastapi.Security(security) +) -> Builder | User: """ This is the main function to check whether a client is authenticated. It will fetch the Bearer token from the request header and return the @@ -87,17 +88,37 @@ async def get_current_principal( if not realm == KERBEROS_REALM: raise fastapi.HTTPException(401) - # Fetch the builder or user object + # Check if we are authenticating a machine and not a user if principal.startswith("host/"): - result = await backend.builders.get_by_name(principal[6:]) - else: - result = await backend.users.get_by_name(principal) + # Remove the host/ prefix + principal = principal.removeprefix("host/") - # Fail if nothing could be found - if not result: - raise fastapi.HTTPException(401) + # Fetch the builder + return await backend.builders.get_by_name(principal) + + # Fetch the user + return await backend.users.get_by_name(principal) + +async def get_current_user( + principal: Builder | User = fastapi.Depends(get_current_principal), +) -> User | None: + """ + Like get_current_principal() but is guaranteed to always return a User object or None + """ + if isinstance(principal, User): + return principal + +async def get_current_admin( + user: User = fastapi.Depends(get_current_user), +) -> User: + """ + Returns the current user only if they are an admin + """ + # Raise an exception for anonymous and non-admin users + if not user or not user.is_admin(): + raise fastapi.HTTPException(403, "Forbidden") - return result + return user class AuthResponse(pydantic.BaseModel): # Token Type @@ -227,8 +248,8 @@ async def auth_refresh(request: fastapi.Request, data: typing.Optional[RefreshRe return AuthResponse(access_token=access_token, refresh_token=refresh_token) @router.get("/whoami") -async def whoami(current_principal = fastapi.Depends(get_current_principal)) -> users.User: - return current_principal +async def whoami(current_user = fastapi.Depends(get_current_user)) -> User: + return current_user # Add everything to the app apiv1.include_router(router)