#!/usr/bin/python
# encoding: utf-8
+import base64
import datetime
+import hmac
import json
import ldap
import ldap.modlist
# Cleanup expired account activations
self.db.execute("DELETE FROM account_activations WHERE expires_at <= NOW()")
+ # Discourse
+
+ def decode_discourse_payload(self, payload, signature):
+ # Check signature
+ calculated_signature = self.sign_discourse_payload(payload)
+
+ if not hmac.compare_digest(signature, calculated_signature):
+ raise ValueError("Invalid signature: %s" % signature)
+
+ # Decode the query string
+ qs = base64.b64decode(payload).decode()
+
+ # Parse the query string
+ data = {}
+ for key, val in urllib.parse.parse_qsl(qs):
+ data[key] = val
+
+ return data
+
+ def encode_discourse_payload(self, **args):
+ # Encode the arguments into an URL-formatted string
+ qs = urllib.parse.urlencode(args).encode()
+
+ # Encode into base64
+ return base64.b64encode(qs).decode()
+
+ def sign_discourse_payload(self, payload, secret=None):
+ if secret is None:
+ secret = self.settings.get("discourse_sso_secret")
+
+ # Calculate a HMAC using SHA256
+ h = hmac.new(secret.encode(),
+ msg=payload.encode(), digestmod="sha256")
+
+ return h.hexdigest()
+
class Account(Object):
def __init__(self, backend, dn, attrs=None):
))
def is_admin(self):
- return "wheel" in self.groups
+ return "sudo" in self.groups
def is_staff(self):
return "staff" in self.groups
return ret
def avatar_url(self, size=None):
- if self.backend.debug:
- hostname = "http://people.dev.ipfire.org"
- else:
- hostname = "https://people.ipfire.org"
-
- url = "%s/users/%s.jpg" % (hostname, self.uid)
+ url = "https://people.ipfire.org/users/%s.jpg" % self.uid
if size:
url += "?size=%s" % size
(r"/users/(\w+)/ssh-keys/(SHA256\:.*)", people.SSHKeysDownloadHandler),
(r"/users/(\w+)/ssh-keys/upload", people.SSHKeysUploadHandler),
(r"/users/(\w+)/sip", people.SIPHandler),
+
+ # Single-Sign-On for Discourse
+ (r"/sso/discourse", people.SSODiscourse),
] + authentication_handlers)
# wiki.ipfire.org
import imghdr
import sshpubkeys
import tornado.web
+import urllib.parse
from .. import countries
self.redirect("/users/%s" % account.uid)
+class SSODiscourse(auth.CacheMixin, base.BaseHandler):
+ def _get_discourse_params(self):
+ # Fetch Discourse's parameters
+ sso = self.get_argument("sso")
+ sig = self.get_argument("sig")
+
+ # Decode payload
+ try:
+ return self.accounts.decode_discourse_payload(sso, sig)
+
+ # Raise bad request if the signature is invalid
+ except ValueError:
+ raise tornado.web.HTTPError(400)
+
+ def _redirect_user_to_discourse(self, account, nonce, return_sso_url):
+ """
+ Redirects the user back to Discourse passing some
+ attributes of the user account to Discourse
+ """
+ args = {
+ "nonce" : nonce,
+ "external_id" : account.uid,
+
+ # Pass email address
+ "email" : account.email,
+ "require_activation" : "false",
+
+ # More details about the user
+ "username" : account.uid,
+ "name" : "%s" % account,
+
+ # Avatar
+ "avatar_url" : account.avatar_url(),
+ "avatar_force_update" : "true",
+
+ # Send a welcome message
+ "suppress_welcome_message" : "false",
+
+ # Group memberships
+ "admin" : "true" if account.is_admin() else "false",
+ "moderator" : "true" if account.is_staff() else "false",
+ }
+
+ # Format payload and sign it
+ payload = self.accounts.encode_discourse_payload(**args)
+ signature = self.accounts.sign_discourse_payload(payload)
+
+ qs = urllib.parse.urlencode({
+ "sso" : payload,
+ "sig" : signature,
+ })
+
+ # Redirect user
+ self.redirect("%s?%s" % (return_sso_url, qs))
+
+ @base.ratelimit(minutes=24*60, requests=100)
+ def get(self):
+ params = self._get_discourse_params()
+
+ # Redirect back if user is already logged in
+ if self.current_user:
+ return self._redirect_user_to_discourse(self.current_user, **params)
+
+ # Otherwise the user needs to authenticate
+ # XXX
+ raise tornado.web.HTTPError(401)
+
+
class NewAccountsModule(ui_modules.UIModule):
def render(self, days=14):
t = datetime.datetime.utcnow() - datetime.timedelta(days=days)