]> git.ipfire.org Git - pbs.git/commitdiff
web: Use Kerberos authentication for users
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 5 Oct 2022 10:27:32 +0000 (10:27 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 5 Oct 2022 10:27:32 +0000 (10:27 +0000)
This introduces that users can use Kerberos to authenticate against the
web server, or fall back to HTTP Basic authentication which will in turn
contact the Kerberos servers.

After the initial authentication, a session cookie will be sent to the
browser as usual.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/buildservice/users.py
src/web/auth.py

index 229c05e299f5da8e46d178ffe18a53223cf16ad8..8832d51a6322ea9a852626a47131cc0dd114ac12 100644 (file)
@@ -92,32 +92,6 @@ class Users(base.Object):
 
                return user
 
-       def auth(self, username, password):
-               log.debug("Trying to authenticate %s" % username)
-
-               # Check credentials against LDAP
-               success = self.ldap.auth(username, password)
-               if not success:
-                       log.error("Could not authenticate %s" % username)
-                       return
-
-               # Search for a user object
-               user = self.get_by_name(username) or self.get_by_email(username)
-
-               # If we found a user which has been deleted,
-               # we let the authentication fail
-               if user and user.deleted:
-                       log.error("Cannot authenticate deleted user %s" % username)
-                       return
-
-               # If not user exists, yet, we can import it from LDAP
-               if not user:
-                       user = self.create_from_ldap(username)
-
-               log.info("Successfully authenticated %s" % user)
-
-               return user
-
        def email_in_use(self, email):
                return self._get_user_email("SELECT * FROM users_emails \
                        WHERE email = %s AND activated IS TRUE", email)
@@ -137,6 +111,21 @@ class Users(base.Object):
                return self._get_user("SELECT * FROM users \
                        WHERE password_recovery_code = %s AND password_recovery_code_expires_at > NOW()", code)
 
+       def find(self, username):
+               # Search for a user object
+               user = self.get_by_name(username)
+
+               # If not user exists, yet, we can import it from LDAP
+               if not user:
+                       user = self.create_from_ldap(username)
+
+               # If we found a user which has been deleted, we won't return it
+               if user and user.deleted:
+                       log.debug("User %s has been deleted" % username)
+                       return
+
+               return user
+
        def find_maintainers(self, maintainers):
                email_addresses = []
 
index 4b6245bf03cc9fca57961331b1820016f04ae20c..a3166ac6ebd9552c28ab9ed9be32fb69e43e2c25 100644 (file)
-#!/usr/bin/python
+#!/usr/bin/python3
 
+import base64
+import kerberos
+import logging
+import os
 import tornado.web
 
 from . import base
 
-class LoginHandler(base.BaseHandler):
-       def get(self):
-               # If the user is already logged in, we just send him back
-               # to the start page.
-               if self.current_user:
-                       return self.redirect("/")
+# Setup logging
+log = logging.getLogger("pakfire.buildservice.auth")
+
+class KerberosAuthMixin(object):
+       """
+               A mixin that handles Kerberos authentication
+       """
+       @property
+       def kerberos_realm(self):
+               return "IPFIRE.ORG"
+
+       @property
+       def kerberos_service(self):
+               return self.settings.get("kerberos_service", "HTTP")
+
+       def authenticate_redirect(self):
+               """
+                       Called when the application needs the user to authenticate.
+
+                       We will send a response with status code 401 and set the
+                       WWW-Authenticate header to ask the client to either initiate
+                       some Kerberos authentication, or to perform HTTP Basic authentication.
+               """
+               # Ask the client to authenticate using Kerberos
+               self.add_header("WWW-Authenticate", "Negotiate")
+
+               # Ask the client to authenticate using HTTP Basic Auth
+               self.add_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.kerberos_realm)
+
+               # Set status to 401
+               self.set_status(401)
+
+       def get_authenticated_user(self):
+               auth_header = self.request.headers.get("Authorization", None)
+
+               # No authentication header
+               if not auth_header:
+                       return
+
+               # Perform GSS API Negotiation
+               if auth_header.startswith("Negotiate"):
+                       return self._auth_negotiate(auth_header)
+
+               # Perform Basic Authentication
+               elif auth_header.startswith("Basic "):
+                       return self._auth_basic(auth_header)
+
+               # Fail on anything else
+               else:
+                       raise tornado.web.HTTPError(400, "Unexpected Authentication attempt: %s" % auth_header)
+
+       def _auth_negotiate(self, auth_header):
+               os.environ["KRB5_KTNAME"] = self.backend.settings.get("krb5-keytab")
+
+               auth_value = auth_header.removeprefix("Negotiate ")
+
+               try:
+                       # Initialise the server session
+                       result, context = kerberos.authGSSServerInit(self.kerberos_service)
+
+                       if not result == kerberos.AUTH_GSS_COMPLETE:
+                               raise tornado.web.HTTPError(500, "Kerberos Initialization failed: %s" % result)
+
+                       # Check the received authentication header
+                       result = kerberos.authGSSServerStep(context, auth_value)
+
+                       # If this was not successful, we will fall back to Basic authentication
+                       if not result == kerberos.AUTH_GSS_COMPLETE:
+                               return self._auth_basic(auth_header)
+
+                       # Fetch the server response
+                       response = kerberos.authGSSServerResponse(context)
 
-               self.render("login.html", failed=False)
+                       # Send the server response
+                       self.set_header("WWW-Authenticate", "Negotiate %s" % response)
+
+                       # Return the user who just authenticated
+                       user = kerberos.authGSSServerUserName(context)
+
+               except kerberos.GSSError as e:
+                       log.error("Kerberos Authentication Error: %s" % e)
+
+                       raise tornado.web.HTTPError(500, "Could not initialize the Kerberos context")
+
+               finally:
+                       # Cleanup
+                       kerberos.authGSSServerClean(context)
+
+               log.debug("Successfully authenticated %s" % user)
+
+               return user
+
+       def _auth_basic(self, auth_header):
+               os.environ["KRB5_KTNAME"] = self.backend.settings.get("krb5-keytab")
+
+               # Remove "Basic "
+               auth_header = auth_header.removeprefix("Basic ")
+
+               try:
+                       # Decode base64
+                       auth_header = base64.b64decode(auth_header).decode()
+
+                       username, password = auth_header.split(":", 1)
+               except:
+                       raise tornado.web.HTTPError(400, "Authorization data was malformed")
+
+               # Check the credentials against the Kerberos database
+               try:
+                       kerberos.checkPassword(username, password,
+                               "%s/pakfire.ipfire.org" % self.kerberos_service, self.kerberos_realm)
+
+               # Catch any authentication errors
+               except kerberos.BasicAuthError as e:
+                       log.error("Could not authenticate %s: %s" % (username, e))
+                       return
+
+               # Create user principal name
+               user = "%s@%s" % (username, self.kerberos_realm)
+
+               log.debug("Successfully authenticated %s" % user)
+
+               return user
+
+
+class LoginHandler(KerberosAuthMixin, base.BaseHandler):
+       def get(self):
+               username = self.get_authenticated_user()
+               if not username:
+                       # Ask to authenticate
+                       self.authenticate_redirect()
+                       return
 
-       def post(self):
-               # Fetch credentials
-               username = self.get_argument("username")
-               password = self.get_argument("password")
+               # Strip the realm
+               username, delim, realm = username.partition("@")
 
-               # Log in the user
                with self.db.transaction():
-                       user = self.backend.users.auth(username, password)
+                       # Otherwise fetch the authenticated user
+                       user = self.backend.users.find(username)
                        if not user:
-                               raise tornado.web.HTTPError(403, "Login failed")
+                               raise tornado.web.HTTPError(500, "Could not find user %s" % username)
 
-                       # Create a new session for the user
-                       self.session = self.backend.sessions.create(user,
+                       # Create a new session
+                       session = self.backend.sessions.create(user,
                                self.current_address, user_agent=self.user_agent)
 
-               # Set a cookie and update the current user.
-               self.set_cookie("session_id", self.session.session_id,
-                       expires=self.session.valid_until)
+               # Send the session cookie to the browser
+               self.set_cookie("session_id", session.session_id, expires=session.valid_until)
 
                # If there is "next" given, we redirect the user accordingly
                next = self.get_argument("next", None)