]> git.ipfire.org Git - pbs.git/commitdiff
api: Create a couple more helper functions to fetch the right objects
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 2 Jul 2025 17:23:41 +0000 (17:23 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 2 Jul 2025 17:23:41 +0000 (17:23 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/api/auth.py

index bb7c46517c27e57e156406dd34f67fe2a22c8fb6..5809ce0c2615017bf6b80895e7da6eba6b7f8f03 100644 (file)
@@ -31,7 +31,8 @@ import typing
 from . import apiv1
 from . import backend
 
-from .. import users
+from ..builders import Builder
+from ..users import User
 
 # Fetch Kerberos configuration
 KERBEROS_KEYTAB    = backend.config.get("krb5", "keytab")
@@ -66,8 +67,8 @@ class HTTPBearer(fastapi.security.HTTPBearer):
 security = HTTPBearer()
 
 async def get_current_principal(
-       credentials: fastapi.security.HTTPAuthorizationCredentials = fastapi.Depends(security)
-) -> users.User:
+       credentials: fastapi.security.HTTPAuthorizationCredentials = fastapi.Security(security)
+) -> Builder | User:
        """
                This is the main function to check whether a client is authenticated.
                It will fetch the Bearer token from the request header and return the
@@ -87,17 +88,37 @@ async def get_current_principal(
        if not realm == KERBEROS_REALM:
                raise fastapi.HTTPException(401)
 
-       # Fetch the builder or user object
+       # Check if we are authenticating a machine and not a user
        if principal.startswith("host/"):
-               result = await backend.builders.get_by_name(principal[6:])
-       else:
-               result = await backend.users.get_by_name(principal)
+               # Remove the host/ prefix
+               principal = principal.removeprefix("host/")
 
-       # Fail if nothing could be found
-       if not result:
-               raise fastapi.HTTPException(401)
+               # Fetch the builder
+               return await backend.builders.get_by_name(principal)
+
+       # Fetch the user
+       return await backend.users.get_by_name(principal)
+
+async def get_current_user(
+       principal: Builder | User = fastapi.Depends(get_current_principal),
+) -> User | None:
+       """
+               Like get_current_principal() but is guaranteed to always return a User object or None
+       """
+       if isinstance(principal, User):
+               return principal
+
+async def get_current_admin(
+       user: User = fastapi.Depends(get_current_user),
+) -> User:
+       """
+               Returns the current user only if they are an admin
+       """
+       # Raise an exception for anonymous and non-admin users
+       if not user or not user.is_admin():
+               raise fastapi.HTTPException(403, "Forbidden")
 
-       return result
+       return user
 
 class AuthResponse(pydantic.BaseModel):
        # Token Type
@@ -227,8 +248,8 @@ async def auth_refresh(request: fastapi.Request, data: typing.Optional[RefreshRe
        return AuthResponse(access_token=access_token, refresh_token=refresh_token)
 
 @router.get("/whoami")
-async def whoami(current_principal = fastapi.Depends(get_current_principal)) -> users.User:
-       return current_principal
+async def whoami(current_user = fastapi.Depends(get_current_user)) -> User:
+       return current_user
 
 # Add everything to the app
 apiv1.include_router(router)