-#!/usr/bin/python
+#!/usr/bin/python3
import datetime
import email.utils
+import ldap
import logging
import pytz
-import re
+import time
import tornado.locale
from . import base
-from . import ldap
from .decorators import *
# Setup logging
log = logging.getLogger("pakfire.builservice.users")
+# A list of LDAP attributes that we fetch
+LDAP_ATTRS = (
+ # UID
+ "uid",
+
+ # Common Name
+ "cn",
+
+ # First & Last Name
+ "givenName", "sn"
+
+ # Email Addresses
+ "mail",
+ "mailAlternateAddress",
+)
+
class Users(base.Object):
- def init(self):
- self.ldap = ldap.LDAP(self.backend)
+ #def init(self):
+ # self.ldap = ldap.LDAP(self.backend)
+
+ @lazy_property
+ def ldap(self):
+ ldap_uri = self.backend.config.get("ldap", "uri")
+
+ log.debug("Connecting to %s..." % ldap_uri)
+
+ # Establish LDAP connection
+ return ldap.initialize(ldap_uri)
def _get_user(self, query, *args):
res = self.db.get(query, *args)
for row in res:
yield User(self.backend, row.id, data=row)
- def _get_user_email(self, query, *args):
- res = self.db.get(query, *args)
-
- if res:
- return UserEmail(self.backend, res.id, data=res)
-
- def _get_user_emails(self, query, *args):
- res = self.db.query(query, *args)
-
- for row in res:
- yield UserEmail(self.backend, row.id, data=row)
-
def __iter__(self):
- users = self._get_users("SELECT * FROM users \
- WHERE activated IS TRUE AND deleted IS FALSE ORDER BY name")
+ users = self._get_users("""
+ SELECT
+ *
+ FROM
+ users
+ WHERE
+ deleted IS FALSE
+ ORDER BY
+ name
+ """,
+ )
return iter(users)
def __len__(self):
- res = self.db.get("SELECT COUNT(*) AS count FROM users \
- WHERE activated IS TRUE AND deleted IS FALSE")
+ res = self.db.get("""
+ SELECT
+ COUNT(*) AS count
+ FROM
+ users
+ WHERE
+ deleted IS FALSE
+ """,
+ )
return res.count
- def create(self, name, email, realname=None, notify=True):
- # XXX check if username has the correct name
+ def _ldap_query(self, query, attrlist=None, limit=0, search_base=None):
+ search_base = self.backend.config.get("ldap", "base")
- # Check if name is already taken
- user = self.get_by_name(name)
- if user:
- raise ValueError("Username %s already taken" % name)
+ log.debug("Performing LDAP query (%s): %s" % (search_base, query))
- # Create new user
- user = self._get_user("INSERT INTO users(name, realname) \
- VALUES(%s, %s) RETURNING *", name, realname)
+ t = time.time()
- log.debug("Created user %s" % user.name)
+ # Ask for up to 512 results being returned at a time
+ page_control = ldap.controls.SimplePagedResultsControl(True, size=512, cookie="")
- # Add email address
- user.add_email(email, activated=True)
+ results = []
+ pages = 0
- # Send a welcome email
- if notify:
- user._send_welcome_email()
+ # Perform the search
+ while True:
+ response = self.ldap.search_ext(search_base,
+ ldap.SCOPE_SUBTREE, query, attrlist=attrlist, sizelimit=limit,
+ serverctrls=[page_control],
+ )
- return user
+ # Fetch all results
+ type, data, rmsgid, serverctrls = self.ldap.result3(response)
- def create_from_ldap(self, name):
- log.debug("Creating user %s from LDAP" % name)
+ # Append to local copy
+ results += data
+ pages += 1
- # Get required attributes from LDAP
- dn, attr = self.ldap.get_user(name, attrlist=["uid", "cn", "mail"])
- assert dn
+ controls = [c for c in serverctrls
+ if c.controlType == ldap.controls.SimplePagedResultsControl.controlType]
- # Create regular user
- user = self.create(name, realname=attr["cn"][0])
+ if not controls:
+ break
- # Add all email addresses and activate them
- for email in attr["mail"]:
- user.add_email(email, activated=True)
+ # Set the cookie for more results
+ page_control.cookie = controls[0].cookie
- return user
+ # There are no more results
+ if not page_control.cookie:
+ break
- def email_in_use(self, email):
- return self._get_user_email("SELECT * FROM users_emails \
- WHERE email = %s AND activated IS TRUE", email)
+ # Log time it took to perform the query
+ log.debug("Query took %.2fms (%s page(s))" % ((time.time() - t) * 1000.0, pages))
- def get_by_id(self, id):
- return self._get_user("SELECT * FROM users WHERE id = %s", id)
+ # Return all attributes (without the DN)
+ return [attrs for dn, attrs in results]
- def get_by_name(self, name):
- return self._get_user("SELECT * FROM users WHERE name = %s", name)
+ def _ldap_get(self, *args, **kwargs):
+ results = self._ldap_query(*args, **kwargs)
- def get_by_email(self, email):
- return self._get_user("SELECT users.* FROM users \
- LEFT JOIN users_emails ON users.id = users_emails.user_id \
- WHERE users_emails.email = %s", email)
+ # No result
+ if not results:
+ return {}
- def find(self, username):
- # Search for a user object
- user = self.get_by_name(username)
+ # Too many results?
+ elif len(results) > 1:
+ raise RuntimeException("Too many results returned for ldap_get()")
- # If not user exists, yet, we can import it from LDAP
- if not user:
- user = self.create_from_ldap(username)
+ return results[0]
- # If we found a user which has been deleted, we won't return it
- if user and user.deleted:
- log.debug("User %s has been deleted" % username)
- return
+ def create(self, name, notify=False, _attrs=None):
+ """
+ Creates a new user
+ """
+ user = self._get_user("""
+ INSERT INTO
+ users(
+ name,
+ _attrs
+ )
+ VALUES
+ (%s, %s)
+ RETURNING
+ *
+ """, name, _attrs,
+ )
- return user
+ log.debug("Created user %s" % user)
- def find_maintainers(self, maintainers):
- email_addresses = []
+ # Send a welcome email
+ if notify:
+ user._send_welcome_email()
- # Make a unique list of all email addresses
- for maintainer in maintainers:
- name, email_address = email.utils.parseaddr(maintainer)
+ return user
- if not email_address in email_addresses:
- email_addresses.append(email_address)
+ def get_by_id(self, id):
+ return self._get_user("SELECT * FROM users \
+ WHERE id = %s", id)
- users = self._get_users("SELECT DISTINCT users.* FROM users \
- LEFT JOIN users_emails ON users.id = users_emails.user_id \
- WHERE users_emails.activated IS TRUE \
- AND users_emails.email = ANY(%s)", email_addresses)
+ def get_by_name(self, name):
+ """
+ Fetch a user by its username
+ """
+ # Try to find a local user
+ user = self._get_user("""
+ SELECT
+ *
+ FROM
+ users
+ WHERE
+ deleted IS FALSE
+ AND
+ name = %s
+ """, name,
+ )
+ if user:
+ return user
+
+ # Search in LDAP
+ res = self._ldap_get(
+ "(&"
+ "(objectClass=person)"
+ "(uid=%s)"
+ ")" % name,
+ attrlist=("uid",),
+ )
+ if not res:
+ return
- return sorted(users)
+ # Fetch the UID
+ uid = res.get("uid")[0].decode()
- def find_maintainer(self, s):
- name, email_address = email.utils.parseaddr(s)
+ # Create a new user
+ return self.create(uid)
- # Got invalid input
- if not email_address:
- return
+ def get_by_email(self, email):
+ # Search in LDAP
+ res = self._ldap_get(
+ "(&"
+ "(objectClass=person)"
+ "(|"
+ "(mail=%s)"
+ "(mailAlternateAddress=%s)"
+ ")"
+ ")" % (email, email),
+ attrlist=("uid",),
+ )
- return self.get_by_email(email_address)
+ # No results
+ if not res:
+ return
- def search(self, pattern, limit=None):
- pattern = "%%%s%%" % pattern
+ # Fetch the UID
+ uid = res.get("uid")[0].decode()
+
+ return self.get_by_name(uid)
+
+ def search(self, q, limit=None):
+ res = self._ldap_query(
+ "(&"
+ "(objectClass=person)"
+ "(|"
+ "(uid=%s)"
+ "(cn=*%s*)"
+ "(mail=%s)"
+ "(mailAlternateAddress=%s)"
+ ")"
+ ")" % (q, q, q, q),
+ attrlist=("uid",),
+ limit=limit,
+ )
- users = self._get_users("SELECT * FROM users \
- WHERE (name LIKE %s OR realname LIKE %s) \
- AND activated IS TRUE AND deleted IS FALSE \
- ORDER BY name LIMIT %s", pattern, pattern, limit)
+ # Fetch users
+ users = self._get_users("""
+ SELECT
+ *
+ FROM
+ users
+ WHERE
+ deleted IS FALSE
+ AND
+ name = ANY(%s)
+ """, [row.get("uid")[0].decode() for row in res],
+ )
- return list(users)
+ return sorted(users)
class User(base.DataObject):
return NotImplemented
+ @property
+ def name(self):
+ return self.data.name
+
def delete(self):
self._set_attribute("deleted", True)
for session in self.sessions:
session.destroy()
- def get_realname(self):
- return self.data.realname or self.name
+ # Fetch any attributes from LDAP
- def set_realname(self, realname):
- self._set_attribute("realname", realname)
+ @lazy_property
+ def attrs(self):
+ return self.backend.users._ldap_get("(uid=%s)" % self.name, attrlist=LDAP_ATTRS)
- realname = property(get_realname, set_realname)
+ def _get_attrs(self, key):
+ return [v.decode() for v in self.attrs.get(key, [])]
- @property
- def name(self):
- return self.data.name
+ def _get_attr(self, key):
+ for value in self._get_attrs(key):
+ return value
- @property
- def firstname(self):
- # Try to split the string into first and last name.
- # If that is not successful, return the entire realname.
- try:
- firstname, rest = self.realname.split(" ", 1)
- except:
- return self.realname
-
- return firstname
+ # Realname
@property
- def envelope_from(self):
- return "%s <%s>" % (self.realname, self.email)
-
- @lazy_property
- def emails(self):
- res = self.backend.users._get_user_emails("SELECT * FROM users_emails \
- WHERE user_id = %s AND activated IS TRUE ORDER BY email", self.id)
-
- return list(res)
+ def realname(self):
+ return self._get_attr("cn") or ""
@property
def email(self):
- for email in self.emails:
- if email.primary:
- return email
-
- def get_email(self, email):
- for e in self.emails:
- if e == email:
- return e
-
- def set_primary_email(self, email):
- if not email in self.emails:
- raise ValueError("Email address does not belong to user")
-
- # Mark previous primary email as non-primary
- self.db.execute("UPDATE users_emails SET \"primary\" = FALSE \
- WHERE user_id = %s AND \"primary\" IS TRUE" % self.id)
-
- # Mark new primary email
- self.db.execute("UPDATE users_emails SET \"primary\" = TRUE \
- WHERE user_id = %s AND email = %s AND activated IS TRUE",
- self.id, email)
-
- def has_email_address(self, email_address):
- try:
- mail, email_address = email.utils.parseaddr(email_address)
- except:
- pass
-
- return email_address in self.emails
-
- def activate_email(self, code):
- # Search email by activation code
- email = self.backend.users._get_user_email("SELECT * FROM users_emails \
- WHERE user_id = %s AND activated IS FALSE AND activation_code = %s", self.id, code)
-
- if not email:
- return False
-
- # Activate email address
- email.activate()
- return True
-
- # Te activated flag is useful for LDAP users
- def add_email(self, email, activated=False):
- # Check if the email is in use
- if self.backend.users.email_in_use(email):
- raise ValueError("Email %s is already in use" % email)
-
- activation_code = None
- if not activated:
- activation_code = generate_random_string(64)
-
- user_email = self.backend.users._get_user_email("INSERT INTO users_emails(user_id, email, \
- \"primary\", activated, activation_code) VALUES(%s, %s, %s, %s, %s) RETURNING *",
- self.id, email, not self.emails, activated, activation_code)
-
- # Set caches
- user_email.user = self
- self.emails.append(user_email)
-
- return user_email
+ """
+ The primary email address
+ """
+ return self._get_attr("email")
@property
def email_to(self):
"""
The name/email address of the user in MIME format
"""
- return email.utils.formataddr((self.name, self.email.email))
+ return email.utils.formataddr((self.name, self.email))
def send_email(self, *args, **kwargs):
return self.backend.messages.send_template(
def is_admin(self):
return self.data.admin is True
- def get_locale(self):
- return tornado.locale.get(self.data.locale)
+ # Locale
- def set_locale(self, locale):
- self._set_attribute("locale", locale)
+ @property
+ def locale(self):
+ return tornado.locale.get()
- locale = property(get_locale, set_locale)
+ # Timezone
- def get_timezone(self, tz=None):
+ @property
+ def timezone(self, tz=None):
if tz is None:
tz = self.data.timezone or ""
return tz
- def set_timezone(self, timezone):
- if not timezone is None:
- tz = self.get_timezone(timezone)
- timezone = tz.zone
-
- self._set_attribute("timezone", timezone)
-
- timezone = property(get_timezone, set_timezone)
-
@property
def deleted(self):
return self.data.deleted
)
-class UserEmail(base.DataObject):
- table = "users_emails"
-
- def __str__(self):
- return self.email
-
- def __eq__(self, other):
- if isinstance(other, self.__class__):
- return self.id == other.id
-
- elif isinstance(other, str):
- return self.email == other
-
- return NotImplemented
-
- @lazy_property
- def user(self):
- return self.backend.users.get_by_id(self.data.user_id)
-
- @property
- def recipient(self):
- return "%s <%s>" % (self.user.realname, self.email)
-
- @property
- def email(self):
- return self.data.email
-
- def set_primary(self, primary):
- self._set_attribute("primary", primary)
-
- primary = property(lambda s: s.data.primary, set_primary)
-
- @property
- def activated(self):
- return self.data.activated
-
- def activate(self):
- self._set_attribute("activated", True)
- self._set_attribute("activation_code", None)
-
- @property
- def activation_code(self):
- return self.data.activation_code
-
- def send_email_activation_mail(self):
- logging.debug("Sending email address activation mail to %s" % self.email)
-
- self.user.send_email("messages/users/email-activation", email=self)
-
-
class QuotaExceededError(Exception):
pass