import fastapi
import fastapi.security
+import logging
# Import the backend
from .. import Backend
# Import middlewares
from . import middlewares
+# Setup logging
+log = logging.getLogger(__name__)
+
# Initialize the app
app = fastapi.FastAPI(
title = "IPFire DBL API",
api_key_header = fastapi.security.APIKeyHeader(name="X-API-Key")
-def require_api_key(api_key: str = fastapi.Depends(api_key_header)):
+async def require_api_key(request: fastapi.Request, api_key: str = fastapi.Depends(api_key_header)):
"""
Requires that a client provides a valid API key
"""
- if not backend.auth(api_key):
+ # Try to authenticate the user
+ user = await backend.auth(api_key)
+
+ # Fail if we could not authenticate the user
+ if user is None:
raise fastapi.HTTPException(401, "Invalid API key")
+ # Do we impersonate another user?
+ impersonated_uid = request.headers.get("X-Impersonated-Uid")
+ if impersonated_uid:
+ # Log action
+ log.debug("%s wants to impersonate '%s'" % (user.uid, impersonated_uid))
+
+ # Check if we have permissions to perform this action
+ if not user.can_impersonate:
+ raise fastapi.HTTPException(403, "Impersonation denied")
+
+ # Fetch the impersonated user
+ impersonated_user = backend.users.get_by_uid(impersonated_uid)
+
+ # Fail if the impersonated user does not exist
+ if impersonated_user is None:
+ raise fastapi.HTTPException(
+ 401, "Failed to find impersonated user '%s'" % impersonated_uid,
+ )
+
+ # Return the impersonated user
+ return impersonated_user
+
+ return user
+
# Import any endpoints
from . import domains
from . import lists
# If we have found a key, we have successfully authenticated
if key:
log.debug("Successfully authenticated %s" % key)
- return True
- return False
+ return key
async def get_key(self, key):
"""
# Check if we have both parts
if not prefix or not secret:
- log.error("The provided API key does not contain a secret")
+ log.warning("The provided API key does not contain a secret")
return
# Fetch all keys by prefix
if key.check(secret):
return key
+ # Log if no API keys could be found
+ else:
+ log.debug("Could not find any API keys that match the prefix %s" % prefix)
+
def get_keys(self, prefix):
"""
Fetches all keys with the matching prefix