]> git.ipfire.org Git - pbs.git/blobdiff - src/web/auth.py
auth: Revert back to authentication using a web form
[pbs.git] / src / web / auth.py
index e6b775e9d5753ed95c7cc3e0e8cdb2fbada35870..622147ab0f33817941d33c9b192494ff7d2ba47a 100644 (file)
 #!/usr/bin/python3
 
-import base64
-import kerberos
 import logging
-import os
 import tornado.web
-import tornado.websocket
 
 from . import base
 
 # Setup logging
-log = logging.getLogger("pakfire.buildservice.auth")
+log = logging.getLogger("pbs.web.auth")
 
-class KerberosAuthMixin(object):
-       """
-               A mixin that handles Kerberos authentication
-       """
-       @property
-       def kerberos_realm(self):
-               return "IPFIRE.ORG"
+class LoginHandler(base.KerberosAuthMixin, base.BaseHandler):
+       def get(self, username=None, failed=False):
+               if self.current_user:
+                       raise tornado.web.HTTPError(403, "Already logged in")
 
-       @property
-       def kerberos_service(self):
-               return self.settings.get("kerberos_service", "HTTP")
+               self.render("login.html", username=username, failed=failed)
 
-       def authenticate_redirect(self):
-               """
-                       Called when the application needs the user to authenticate.
+       @base.ratelimit(requests=10, minutes=5)
+       def post(self):
+               # Fetch credentials
+               username = self.get_argument("username")
+               password = self.get_argument("password")
 
-                       We will send a response with status code 401 and set the
-                       WWW-Authenticate header to ask the client to either initiate
-                       some Kerberos authentication, or to perform HTTP Basic authentication.
-               """
-               # Ask the client to authenticate using Kerberos
-               self.add_header("WWW-Authenticate", "Negotiate")
-
-               # Ask the client to authenticate using HTTP Basic Auth
-               self.add_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.kerberos_realm)
-
-               # Set status to 401
-               self.set_status(401)
-
-       def get_authenticated_user(self):
-               auth_header = self.request.headers.get("Authorization", None)
-
-               # No authentication header
-               if not auth_header:
-                       return
-
-               # Perform GSS API Negotiation
-               if auth_header.startswith("Negotiate"):
-                       return self._auth_negotiate(auth_header)
-
-               # Perform Basic Authentication
-               elif auth_header.startswith("Basic "):
-                       return self._auth_basic(auth_header)
-
-               # Fail on anything else
-               else:
-                       raise tornado.web.HTTPError(400, "Unexpected Authentication attempt: %s" % auth_header)
-
-       def _auth_negotiate(self, auth_header):
-               os.environ["KRB5_KTNAME"] = self.backend.settings.get("krb5-keytab")
-
-               auth_value = auth_header.removeprefix("Negotiate ")
-
-               try:
-                       # Initialise the server session
-                       result, context = kerberos.authGSSServerInit(self.kerberos_service)
-
-                       if not result == kerberos.AUTH_GSS_COMPLETE:
-                               raise tornado.web.HTTPError(500, "Kerberos Initialization failed: %s" % result)
-
-                       # Check the received authentication header
-                       result = kerberos.authGSSServerStep(context, auth_value)
-
-                       # If this was not successful, we will fall back to Basic authentication
-                       if not result == kerberos.AUTH_GSS_COMPLETE:
-                               return self._auth_basic(auth_header)
-
-                       if not isinstance(self, tornado.websocket.WebSocketHandler):
-                               # Fetch the server response
-                               response = kerberos.authGSSServerResponse(context)
-
-                               # Send the server response
-                               self.set_header("WWW-Authenticate", "Negotiate %s" % response)
-
-                       # Return the user who just authenticated
-                       user = kerberos.authGSSServerUserName(context)
-
-               except kerberos.GSSError as e:
-                       log.error("Kerberos Authentication Error: %s" % e)
-
-                       raise tornado.web.HTTPError(500, "Could not initialize the Kerberos context")
-
-               finally:
-                       # Cleanup
-                       kerberos.authGSSServerClean(context)
-
-               log.debug("Successfully authenticated %s" % user)
-
-               return user
-
-       def _auth_basic(self, auth_header):
-               os.environ["KRB5_KTNAME"] = self.backend.settings.get("krb5-keytab")
-
-               # Remove "Basic "
-               auth_header = auth_header.removeprefix("Basic ")
-
-               try:
-                       # Decode base64
-                       auth_header = base64.b64decode(auth_header).decode()
-
-                       username, password = auth_header.split(":", 1)
-               except:
-                       raise tornado.web.HTTPError(400, "Authorization data was malformed")
-
-               # Check the credentials against the Kerberos database
-               try:
-                       kerberos.checkPassword(username, password,
-                               "%s/pakfire.ipfire.org" % self.kerberos_service, self.kerberos_realm)
-
-               # Catch any authentication errors
-               except kerberos.BasicAuthError as e:
-                       log.error("Could not authenticate %s: %s" % (username, e))
-                       return
-
-               # Create user principal name
-               user = "%s@%s" % (username, self.kerberos_realm)
-
-               log.debug("Successfully authenticated %s" % user)
-
-               return user
-
-
-class LoginHandler(KerberosAuthMixin, base.BaseHandler):
-       def get(self):
-               username = self.get_authenticated_user()
-               if not username:
-                       # Ask to authenticate
-                       self.authenticate_redirect()
-                       return
-
-               # Strip the realm
-               username, delim, realm = username.partition("@")
+               # Try to authenticate the user
+               if not self._auth_with_credentials(username, password):
+                       return self.get(username=username, failed=True)
 
+               # If the authentication was successful, we create a new session
                with self.db.transaction():
-                       # Otherwise fetch the authenticated user
+                       # Fetch the authenticated user
                        user = self.backend.users.get_by_name(username)
                        if not user:
                                raise tornado.web.HTTPError(500, "Could not find user %s" % username)