return user
- def auth(self, username, password):
- log.debug("Trying to authenticate %s" % username)
-
- # Check credentials against LDAP
- success = self.ldap.auth(username, password)
- if not success:
- log.error("Could not authenticate %s" % username)
- return
-
- # Search for a user object
- user = self.get_by_name(username) or self.get_by_email(username)
-
- # If we found a user which has been deleted,
- # we let the authentication fail
- if user and user.deleted:
- log.error("Cannot authenticate deleted user %s" % username)
- return
-
- # If not user exists, yet, we can import it from LDAP
- if not user:
- user = self.create_from_ldap(username)
-
- log.info("Successfully authenticated %s" % user)
-
- return user
-
def email_in_use(self, email):
return self._get_user_email("SELECT * FROM users_emails \
WHERE email = %s AND activated IS TRUE", email)
return self._get_user("SELECT * FROM users \
WHERE password_recovery_code = %s AND password_recovery_code_expires_at > NOW()", code)
+ def find(self, username):
+ # Search for a user object
+ user = self.get_by_name(username)
+
+ # If not user exists, yet, we can import it from LDAP
+ if not user:
+ user = self.create_from_ldap(username)
+
+ # If we found a user which has been deleted, we won't return it
+ if user and user.deleted:
+ log.debug("User %s has been deleted" % username)
+ return
+
+ return user
+
def find_maintainers(self, maintainers):
email_addresses = []
-#!/usr/bin/python
+#!/usr/bin/python3
+import base64
+import kerberos
+import logging
+import os
import tornado.web
from . import base
-class LoginHandler(base.BaseHandler):
- def get(self):
- # If the user is already logged in, we just send him back
- # to the start page.
- if self.current_user:
- return self.redirect("/")
+# Setup logging
+log = logging.getLogger("pakfire.buildservice.auth")
+
+class KerberosAuthMixin(object):
+ """
+ A mixin that handles Kerberos authentication
+ """
+ @property
+ def kerberos_realm(self):
+ return "IPFIRE.ORG"
+
+ @property
+ def kerberos_service(self):
+ return self.settings.get("kerberos_service", "HTTP")
+
+ def authenticate_redirect(self):
+ """
+ Called when the application needs the user to authenticate.
+
+ We will send a response with status code 401 and set the
+ WWW-Authenticate header to ask the client to either initiate
+ some Kerberos authentication, or to perform HTTP Basic authentication.
+ """
+ # Ask the client to authenticate using Kerberos
+ self.add_header("WWW-Authenticate", "Negotiate")
+
+ # Ask the client to authenticate using HTTP Basic Auth
+ self.add_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.kerberos_realm)
+
+ # Set status to 401
+ self.set_status(401)
+
+ def get_authenticated_user(self):
+ auth_header = self.request.headers.get("Authorization", None)
+
+ # No authentication header
+ if not auth_header:
+ return
+
+ # Perform GSS API Negotiation
+ if auth_header.startswith("Negotiate"):
+ return self._auth_negotiate(auth_header)
+
+ # Perform Basic Authentication
+ elif auth_header.startswith("Basic "):
+ return self._auth_basic(auth_header)
+
+ # Fail on anything else
+ else:
+ raise tornado.web.HTTPError(400, "Unexpected Authentication attempt: %s" % auth_header)
+
+ def _auth_negotiate(self, auth_header):
+ os.environ["KRB5_KTNAME"] = self.backend.settings.get("krb5-keytab")
+
+ auth_value = auth_header.removeprefix("Negotiate ")
+
+ try:
+ # Initialise the server session
+ result, context = kerberos.authGSSServerInit(self.kerberos_service)
+
+ if not result == kerberos.AUTH_GSS_COMPLETE:
+ raise tornado.web.HTTPError(500, "Kerberos Initialization failed: %s" % result)
+
+ # Check the received authentication header
+ result = kerberos.authGSSServerStep(context, auth_value)
+
+ # If this was not successful, we will fall back to Basic authentication
+ if not result == kerberos.AUTH_GSS_COMPLETE:
+ return self._auth_basic(auth_header)
+
+ # Fetch the server response
+ response = kerberos.authGSSServerResponse(context)
- self.render("login.html", failed=False)
+ # Send the server response
+ self.set_header("WWW-Authenticate", "Negotiate %s" % response)
+
+ # Return the user who just authenticated
+ user = kerberos.authGSSServerUserName(context)
+
+ except kerberos.GSSError as e:
+ log.error("Kerberos Authentication Error: %s" % e)
+
+ raise tornado.web.HTTPError(500, "Could not initialize the Kerberos context")
+
+ finally:
+ # Cleanup
+ kerberos.authGSSServerClean(context)
+
+ log.debug("Successfully authenticated %s" % user)
+
+ return user
+
+ def _auth_basic(self, auth_header):
+ os.environ["KRB5_KTNAME"] = self.backend.settings.get("krb5-keytab")
+
+ # Remove "Basic "
+ auth_header = auth_header.removeprefix("Basic ")
+
+ try:
+ # Decode base64
+ auth_header = base64.b64decode(auth_header).decode()
+
+ username, password = auth_header.split(":", 1)
+ except:
+ raise tornado.web.HTTPError(400, "Authorization data was malformed")
+
+ # Check the credentials against the Kerberos database
+ try:
+ kerberos.checkPassword(username, password,
+ "%s/pakfire.ipfire.org" % self.kerberos_service, self.kerberos_realm)
+
+ # Catch any authentication errors
+ except kerberos.BasicAuthError as e:
+ log.error("Could not authenticate %s: %s" % (username, e))
+ return
+
+ # Create user principal name
+ user = "%s@%s" % (username, self.kerberos_realm)
+
+ log.debug("Successfully authenticated %s" % user)
+
+ return user
+
+
+class LoginHandler(KerberosAuthMixin, base.BaseHandler):
+ def get(self):
+ username = self.get_authenticated_user()
+ if not username:
+ # Ask to authenticate
+ self.authenticate_redirect()
+ return
- def post(self):
- # Fetch credentials
- username = self.get_argument("username")
- password = self.get_argument("password")
+ # Strip the realm
+ username, delim, realm = username.partition("@")
- # Log in the user
with self.db.transaction():
- user = self.backend.users.auth(username, password)
+ # Otherwise fetch the authenticated user
+ user = self.backend.users.find(username)
if not user:
- raise tornado.web.HTTPError(403, "Login failed")
+ raise tornado.web.HTTPError(500, "Could not find user %s" % username)
- # Create a new session for the user
- self.session = self.backend.sessions.create(user,
+ # Create a new session
+ session = self.backend.sessions.create(user,
self.current_address, user_agent=self.user_agent)
- # Set a cookie and update the current user.
- self.set_cookie("session_id", self.session.session_id,
- expires=self.session.valid_until)
+ # Send the session cookie to the browser
+ self.set_cookie("session_id", session.session_id, expires=session.valid_until)
# If there is "next" given, we redirect the user accordingly
next = self.get_argument("next", None)