#!/usr/bin/python3
-import base64
-import kerberos
import logging
-import os
import tornado.web
-import tornado.websocket
from . import base
# Setup logging
-log = logging.getLogger("pakfire.buildservice.auth")
+log = logging.getLogger("pbs.web.auth")
-class KerberosAuthMixin(object):
- """
- A mixin that handles Kerberos authentication
- """
- @property
- def kerberos_realm(self):
- return "IPFIRE.ORG"
+class LoginHandler(base.KerberosAuthMixin, base.BaseHandler):
+ def get(self, username=None, failed=False):
+ if self.current_user:
+ raise tornado.web.HTTPError(403, "Already logged in")
- @property
- def kerberos_service(self):
- return self.settings.get("kerberos_service", "HTTP")
+ self.render("login.html", username=username, failed=failed)
- def authenticate_redirect(self):
- """
- Called when the application needs the user to authenticate.
+ @base.ratelimit(requests=10, minutes=5)
+ def post(self):
+ # Fetch credentials
+ username = self.get_argument("username")
+ password = self.get_argument("password")
- We will send a response with status code 401 and set the
- WWW-Authenticate header to ask the client to either initiate
- some Kerberos authentication, or to perform HTTP Basic authentication.
- """
- # Ask the client to authenticate using Kerberos
- self.add_header("WWW-Authenticate", "Negotiate")
-
- # Ask the client to authenticate using HTTP Basic Auth
- self.add_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.kerberos_realm)
-
- # Set status to 401
- self.set_status(401)
-
- def get_authenticated_user(self):
- auth_header = self.request.headers.get("Authorization", None)
-
- # No authentication header
- if not auth_header:
- return
-
- # Perform GSS API Negotiation
- if auth_header.startswith("Negotiate"):
- return self._auth_negotiate(auth_header)
-
- # Perform Basic Authentication
- elif auth_header.startswith("Basic "):
- return self._auth_basic(auth_header)
-
- # Fail on anything else
- else:
- raise tornado.web.HTTPError(400, "Unexpected Authentication attempt: %s" % auth_header)
-
- def _auth_negotiate(self, auth_header):
- os.environ["KRB5_KTNAME"] = self.backend.settings.get("krb5-keytab")
-
- auth_value = auth_header.removeprefix("Negotiate ")
-
- try:
- # Initialise the server session
- result, context = kerberos.authGSSServerInit(self.kerberos_service)
-
- if not result == kerberos.AUTH_GSS_COMPLETE:
- raise tornado.web.HTTPError(500, "Kerberos Initialization failed: %s" % result)
-
- # Check the received authentication header
- result = kerberos.authGSSServerStep(context, auth_value)
-
- # If this was not successful, we will fall back to Basic authentication
- if not result == kerberos.AUTH_GSS_COMPLETE:
- return self._auth_basic(auth_header)
-
- if not isinstance(self, tornado.websocket.WebSocketHandler):
- # Fetch the server response
- response = kerberos.authGSSServerResponse(context)
-
- # Send the server response
- self.set_header("WWW-Authenticate", "Negotiate %s" % response)
-
- # Return the user who just authenticated
- user = kerberos.authGSSServerUserName(context)
-
- except kerberos.GSSError as e:
- log.error("Kerberos Authentication Error: %s" % e)
-
- raise tornado.web.HTTPError(500, "Could not initialize the Kerberos context")
-
- finally:
- # Cleanup
- kerberos.authGSSServerClean(context)
-
- log.debug("Successfully authenticated %s" % user)
-
- return user
-
- def _auth_basic(self, auth_header):
- os.environ["KRB5_KTNAME"] = self.backend.settings.get("krb5-keytab")
-
- # Remove "Basic "
- auth_header = auth_header.removeprefix("Basic ")
-
- try:
- # Decode base64
- auth_header = base64.b64decode(auth_header).decode()
-
- username, password = auth_header.split(":", 1)
- except:
- raise tornado.web.HTTPError(400, "Authorization data was malformed")
-
- # Check the credentials against the Kerberos database
- try:
- kerberos.checkPassword(username, password,
- "%s/pakfire.ipfire.org" % self.kerberos_service, self.kerberos_realm)
-
- # Catch any authentication errors
- except kerberos.BasicAuthError as e:
- log.error("Could not authenticate %s: %s" % (username, e))
- return
-
- # Create user principal name
- user = "%s@%s" % (username, self.kerberos_realm)
-
- log.debug("Successfully authenticated %s" % user)
-
- return user
-
-
-class LoginHandler(KerberosAuthMixin, base.BaseHandler):
- def get(self):
- username = self.get_authenticated_user()
- if not username:
- # Ask to authenticate
- self.authenticate_redirect()
- return
-
- # Strip the realm
- username, delim, realm = username.partition("@")
+ # Try to authenticate the user
+ if not self._auth_with_credentials(username, password):
+ return self.get(username=username, failed=True)
+ # If the authentication was successful, we create a new session
with self.db.transaction():
- # Otherwise fetch the authenticated user
+ # Fetch the authenticated user
user = self.backend.users.get_by_name(username)
if not user:
raise tornado.web.HTTPError(500, "Could not find user %s" % username)