from . import apiv1
from . import backend
-from .. import users
+from ..builders import Builder
+from ..users import User
# Fetch Kerberos configuration
KERBEROS_KEYTAB = backend.config.get("krb5", "keytab")
security = HTTPBearer()
async def get_current_principal(
- credentials: fastapi.security.HTTPAuthorizationCredentials = fastapi.Depends(security)
-) -> users.User:
+ credentials: fastapi.security.HTTPAuthorizationCredentials = fastapi.Security(security)
+) -> Builder | User:
"""
This is the main function to check whether a client is authenticated.
It will fetch the Bearer token from the request header and return the
if not realm == KERBEROS_REALM:
raise fastapi.HTTPException(401)
- # Fetch the builder or user object
+ # Check if we are authenticating a machine and not a user
if principal.startswith("host/"):
- result = await backend.builders.get_by_name(principal[6:])
- else:
- result = await backend.users.get_by_name(principal)
+ # Remove the host/ prefix
+ principal = principal.removeprefix("host/")
- # Fail if nothing could be found
- if not result:
- raise fastapi.HTTPException(401)
+ # Fetch the builder
+ return await backend.builders.get_by_name(principal)
+
+ # Fetch the user
+ return await backend.users.get_by_name(principal)
+
+async def get_current_user(
+ principal: Builder | User = fastapi.Depends(get_current_principal),
+) -> User | None:
+ """
+ Like get_current_principal() but is guaranteed to always return a User object or None
+ """
+ if isinstance(principal, User):
+ return principal
+
+async def get_current_admin(
+ user: User = fastapi.Depends(get_current_user),
+) -> User:
+ """
+ Returns the current user only if they are an admin
+ """
+ # Raise an exception for anonymous and non-admin users
+ if not user or not user.is_admin():
+ raise fastapi.HTTPException(403, "Forbidden")
- return result
+ return user
class AuthResponse(pydantic.BaseModel):
# Token Type
return AuthResponse(access_token=access_token, refresh_token=refresh_token)
@router.get("/whoami")
-async def whoami(current_principal = fastapi.Depends(get_current_principal)) -> users.User:
- return current_principal
+async def whoami(current_user = fastapi.Depends(get_current_user)) -> User:
+ return current_user
# Add everything to the app
apiv1.include_router(router)