#!/usr/bin/python
+import email.utils
import hashlib
import logging
import pytz
import tornado.locale
+log = logging.getLogger("users")
+log.propagate = 1
+
from . import base
from . import ldap
+from .decorators import *
+
# A list of possible random characters.
random_chars = string.ascii_letters + string.digits
# Re-generate the password hash and compare the result.
return password_hash == generate_password_hash(password, salt=salt, algo=algo)
-def maintainer_split(s):
- m = re.match(r"(.*) <(.*)>", s)
- if m:
- name, email = m.groups()
- else:
- name, email = None, None
-
- return name, email
class Users(base.Object):
def init(self):
self.ldap = ldap.LDAP(self.backend)
- def auth(self, name, password):
- # If either name or password is None, we don't check at all.
- if None in (name, password):
- return
+ def _get_user(self, query, *args):
+ res = self.db.get(query, *args)
- # Search for the username in the database.
- # The user must not be deleted and must be activated.
- user = self.db.get("SELECT id FROM users WHERE name = %s AND \
- activated IS TRUE AND deleted IS FALSE", name)
+ if res:
+ return User(self.backend, res.id, data=res)
- if not user:
- # we should check if we get an ldap user
- if self.ldap.auth(name, password):
- user = self.register_from_ldap(name)
+ def _get_users(self, query, *args):
+ res = self.db.query(query, *args)
- if not user:
- return
+ for row in res:
+ yield User(self.backend, row.id, data=row)
- # Get the whole User object from the database.
- user = self.get_by_id(user.id)
+ def _get_user_email(self, query, *args):
+ res = self.db.get(query, *args)
- # If the user was not found or the password does not match,
- # you aren't lucky.
- if not user or not user.check_password(password):
- return
+ if res:
+ return UserEmail(self.backend, res.id, data=res)
- # Otherwise we return the User object.
- return user
+ def _get_user_emails(self, query, *args):
+ res = self.db.query(query, *args)
- def register(self, name, password, email, realname, locale=None):
- user = User.new(self.pakfire, name, realname, locale)
- user.passphrase = password
- user.add_email(email)
- return user
+ for row in res:
+ yield UserEmail(self.backend, row.id, data=row)
- def register_from_ldap(self, name):
- logging.debug("Register LDAP user %s" % name)
+ def __iter__(self):
+ users = self._get_users("SELECT * FROM users \
+ WHERE activated IS TRUE AND deleted IS FALSE ORDER BY name")
- dn, attr = self.ldap.get_user(name, attrlist=["uid", "cn", "mail"])
- realname = attr["cn"][0]
- user = User.new(self.pakfire, name, realname, ldap_dn=dn)
- for email in attr["mail"]:
- user.add_email(email, activated=True)
- # Activate the user
- user.activate()
+ return iter(users)
- return user
+ def __len__(self):
+ res = self.db.get("SELECT COUNT(*) AS count FROM users \
+ WHERE activated IS TRUE AND deleted IS FALSE")
- def name_is_used(self, name):
- users = self.db.query("SELECT id FROM users WHERE name = %s", name)
+ return res.count
- if users:
- return True
+ def create(self, name, realname=None, ldap_dn=None):
+ # XXX check if username has the correct name
- return False
+ # Check if name is already taken
+ user = self.get_by_name(name)
+ if user:
+ raise ValueError("Username %s already taken" % name)
- def email_is_used(self, email):
- users = self.db.get("SELECT id FROM users_emails \
- WHERE email = %s AND activated IS TRUE", email)
+ # Create new user
+ user = self._get_user("INSERT INTO users(name, realname, ldap_dn) \
+ VALUES(%s, %s, %s) RETURNING *", name, realname, ldap_dn)
- if users:
- return True
+ # Create row in permissions table.
+ self.db.execute("INSERT INTO users_permissions(user_id) VALUES(%s)", user.id)
- return False
+ log.debug("Created user %s" % user.name)
- def get_all(self):
- users = self.db.query("""SELECT id FROM users WHERE activated IS TRUE AND
- deleted IS FALSE ORDER BY name ASC""")
+ return user
- return [User(self.pakfire, u.id) for u in users]
+ def create_from_ldap(self, name):
+ log.debug("Creating user %s from LDAP" % name)
- def get_by_id(self, id):
- return User(self.pakfire, id)
+ # Get required attributes from LDAP
+ dn, attr = self.ldap.get_user(name, attrlist=["uid", "cn", "mail"])
+ assert dn
- def get_by_name(self, name):
- user = self.db.get("SELECT id FROM users WHERE name = %s LIMIT 1", name)
+ # Create regular user
+ user = self.create(name, realname=attr["cn"][0], ldap_dn=dn)
+ user.activate()
- if user:
- return User(self.pakfire, user.id)
+ # Add all email addresses and activate them
+ for email in attr["mail"]:
+ user.add_email(email, activated=True)
- def get_by_email(self, email):
- user = self.db.get("SELECT user_id AS id FROM users_emails \
- WHERE email = %s LIMIT 1", email)
+ return user
- if user:
- return User(self.pakfire, user.id)
+ def auth(self, name, password):
+ # If either name or password is None, we don't check at all.
+ if None in (name, password):
+ return
- def count(self):
- users = self.db.get("SELECT COUNT(*) AS count FROM users \
- WHERE activated IS TRUE AND deleted IS FALSE")
+ # Search for the username in the database.
+ # The user must not be deleted and must be activated.
+ user = self._get_user("SELECT * FROM users WHERE name = %s AND \
+ activated IS TRUE AND deleted IS FALSE", name)
- if users:
- return users.count
+ # If no user could be found, we search for a matching user in
+ # the LDAP database
+ if not user:
+ if not self.ldap.auth(name, password):
+ return
- def search(self, pattern, limit=None):
- pattern = "%%%s%%" % pattern
+ # If a LDAP user is found (and password matches), we will
+ # create a new local user with the information from LDAP.
+ user = self.register_from_ldap(name)
+
+ # Check if the password matches
+ if user.check_password(password):
+ return user
- query = "SELECT id FROM users \
- WHERE (name LIKE %s OR realname LIKE %s) AND activated IS %s AND deleted IS %s"
- args = [pattern, pattern, True, False]
+ def email_in_use(self, email):
+ return self._get_user_email("SELECT * FROM users_emails \
+ WHERE email = %s AND activated IS TRUE", email)
- if limit:
- query += " LIMIT %s"
- args.append(limit)
+ def get_by_id(self, id):
+ return self._get_user("SELECT * FROM users WHERE id = %s", id)
- users = []
- for user in self.db.query(query, *args):
- user = User(self.pakfire, user.id)
- users.append(user)
+ def get_by_name(self, name):
+ return self._get_user("SELECT * FROM users WHERE name = %s", name)
- return users
+ def get_by_email(self, email):
+ return self._get_user("SELECT users.* FROM users \
+ LEFT JOIN users_emails ON users.id = users_emails.user_id \
+ WHERE users_emails.email = %s", email)
def find_maintainer(self, s):
- if not s:
- return
+ name, email_address = email.utils.parseaddr(s)
- name, email = maintainer_split(s)
- if not email:
+ # Got invalid input
+ if not email_address:
return
- user = self.db.get("SELECT user_id FROM users_emails WHERE email = %s LIMIT 1", email)
- if not user:
- return
+ return self.get_by_email(email_address)
+
+ def search(self, pattern, limit=None):
+ pattern = "%%%s%%" % pattern
+
+ return self._get_users("SELECT * FROM users \
+ WHERE (name LIKE %s OR realname LIKE %s) \
+ AND activated IS TRUE AND deleted IS FALSE \
+ ORDER BY name LIMIT %s", pattern, pattern, limit)
- return self.get_by_id(user.user_id)
-
@staticmethod
def check_password_strength(password):
score = 0
return accepted, score
-class User(base.Object):
- def __init__(self, pakfire, id):
- base.Object.__init__(self, pakfire)
- self.id = id
-
- # Cache.
- self._data = None
- self._emails = None
- self._perms = None
+class User(base.DataObject):
+ table = "users"
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.realname)
- def __hash__(self):
- return hash(self.id)
-
- def __cmp__(self, other):
- if other is None:
- return 1
+ def __eq__(self, other):
+ if isinstance(other, self.__class__):
+ return self.id == other.id
- if isinstance(other, unicode):
- return cmp(self.email, other)
+ def __lt__(self, other):
+ if isinstance(other, self.__class__):
+ return self.name < other.name
- if self.id == other.id:
- return 0
-
- return cmp(self.realname, other.realname)
-
- @classmethod
- def new(cls, pakfire, name, realname, locale=None, ldap_dn=None):
- res = pakfire.db.get("INSERT INTO users(name, realname, ldap_dn) \
- VALUES(%s, %s, %s) RETURNING id", name, realname, ldap_dn)
-
- # Create row in permissions table.
- pakfire.db.execute("INSERT INTO users_permissions(user_id) VALUES(%s)", res.id)
-
- user = cls(pakfire, res.id)
-
- # If we have a guessed locale, we save it (for sending emails).
- if locale:
- user.locale = locale
-
- return user
-
- @property
- def data(self):
- if self._data is None:
- self._data = self.db.get("SELECT * FROM users WHERE id = %s" % self.id)
- assert self._data, "User %s not found." % self.id
-
- return self._data
+ elif isinstance(other, str):
+ return self.name < other
def delete(self):
- self.db.execute("UPDATE users SET deleted = TRUE WHERE id = %s", self.id)
- self._data = None
+ self._set_attribute("deleted", True)
def activate(self):
- self.db.execute("UPDATE users SET activated = TRUE WHERE id = %s", self.id)
+ self._set_attribute("activated", True)
def check_password(self, password):
"""
passphrase = property(lambda x: None, set_passphrase)
-
def get_realname(self):
- if not self.data.realname:
- return self.name
-
- return self.data.realname
+ return self.data.realname or self.name
def set_realname(self, realname):
- self.db.execute("UPDATE users SET realname = %s WHERE id = %s",
- realname, self.id)
- self.data["realname"] = realname
+ self._set_attribute("realname", realname)
realname = property(get_realname, set_realname)
return firstname
+ @lazy_property
+ def emails(self):
+ res = self.backend.users._get_user_emails("SELECT * FROM users_emails \
+ WHERE user_id = %s AND activated IS TRUE ORDER BY email", self.id)
+
+ return list(res)
+
@property
def email(self):
- if self._emails is None:
- self._emails = self.db.query("SELECT * FROM users_emails WHERE user_id = %s", self.id)
- assert self._emails
+ for email in self.emails:
+ if email.primary:
+ return email
- for email in self._emails:
- if not email.primary == True:
- continue
-
- return email.email
+ def get_email(self, email):
+ for e in self.emails:
+ if e == email:
+ return e
def set_primary_email(self, email):
if not email in self.emails:
WHERE user_id = %s AND email = %s AND activated IS TRUE",
self.id, email)
- @property
- def emails(self):
- res = self.db.query("SELECT email FROM users_emails \
- WHERE user_id = %s AND activated IS TRUE ORDER BY email", self.id)
-
- return (row.email for row in res)
-
- def get_email_activation_code(self, email):
- res = self.db.get("SELECT activation_code FROM users_emails WHERE user_id = %s AND \
- email = %s", self.id, email)
-
- return res.activation_code
-
- # XXX We need to check if the email is already activated
def activate_email(self, code):
- if self.db.query("SELECT * FROM users_emails WHERE user_id = %s AND \
- activation_code = %s AND activated = FALSE", self.id, code):
- # We activate the email and the user to
- self.db.execute("UPDATE users_emails SET activated = TRUE, activation_code = NULL \
- WHERE user_id = %s AND activation_code = %s", self.id, code)
- self.activate()
+ # Search email by activation code
+ email = self.backend.users._get_user_email("SELECT * FROM users_emails \
+ WHERE user_id = %s AND activated IS FALSE AND activation_code = %s", self.id, code)
- def has_email_address(self):
- emails = self.db.query("SELECT * FROM users_emails WHERE user_id = %s", self.id)
-
- if not emails:
+ if not email:
return False
+ # Activate email address
+ email.activate()
return True
# Te activated flag is useful for LDAP users
def add_email(self, email, activated=False):
# Check if the email is in use
- if self.backend.users.email_is_used(email):
+ if self.backend.users.email_in_use(email):
raise ValueError("Email %s is already in use" % email)
activation_code = None
if not activated:
activation_code = generate_random_string(64)
- if not self.has_email_address():
- # The user has no email address in the moment do we can safely guess that he has new
- # registered
- self.db.execute("INSERT INTO users_emails(user_id, email, \
- \"primary\", activation_code, activated) VALUES(%s, %s, TRUE, %s, %s)",
- self.id, email, activation_code, activated)
- if not activated:
- self.send_activation_mail()
- return
+ user_email = self.backend.users._get_user_email("INSERT INTO users_emails(user_id, email, \
+ \"primary\", activated, activation_code) VALUES(%s, %s, %s, %s, %s) RETURNING *",
+ self.id, email, not self.emails, activated, activation_code)
+ self.emails.append(user_email)
- # Add just another email address.
- self.db.execute("INSERT INTO users_emails(user_id, email, \"primary\", activation_code, activated) \
- VALUES(%s, %s, FALSE, %s, %s)", self.id, email, activation_code, activated)
+ # Send activation email if activation is needed
if not activated:
- self.send_email_activation_mail(email)
- return
-
- def remove_email(self, email):
- # We delete this mail if the emial address is not primary and belong to this user
- self.db.execute("DELETE FROM users_emails \
- WHERE id = %s AND email = %s AND \"primary\" = FALSE",
- self.id, email)
-
- self._data = self._emails = None
+ user_email.send_activation_mail()
- def get_state(self):
- return self.data.state
+ return user_email
def set_state(self, state):
- self.db.execute("UPDATE users SET state = %s WHERE id = %s", state,
- self.id)
- self.data["state"] = state
+ self._set_attribute("state", state)
- state = property(get_state, set_state)
+ state = property(lambda s: s.data.state, set_state)
+
+ def is_admin(self):
+ return self.state == "admin"
- def get_locale(self):
- return self.data.locale or ""
+ def is_tester(self):
+ return self.state == "tester"
def set_locale(self, locale):
- self.db.execute("UPDATE users SET locale = %s WHERE id = %s", locale,
- self.id)
- self.data["locale"] = locale
+ self._set_attribute("locale", locale)
- locale = property(get_locale, set_locale)
+ locale = property(lambda s: s.data.locale, set_locale)
def get_timezone(self, tz=None):
if tz is None:
tz = self.get_timezone(timezone)
timezone = tz.zone
- self.db.execute("UPDATE users SET timezone = %s WHERE id = %s",
- timezone, self.id)
+ self._set_attribute("timezone", timezone)
timezone = property(get_timezone, set_timezone)
return gravatar_url
- def is_admin(self):
- return self.state == "admin"
-
- def is_tester(self):
- return self.state == "tester"
-
- @property
+ @lazy_property
def perms(self):
- if self._perms is None:
- self._perms = \
- self.db.get("SELECT * FROM users_permissions WHERE user_id = %s", self.id)
-
- return self._perms
+ return self.db.get("SELECT * FROM users_permissions WHERE user_id = %s", self.id)
def has_perm(self, perm):
"""
# All others must be checked individually.
return self.perms.get(perm, False) == True
+
+class UserEmail(base.DataObject):
+ table = "users_emails"
+
+ def __str__(self):
+ return self.email
+
+ def __eq__(self, other):
+ if isinstance(other, self.__class__):
+ return self.id == other.id
+
+ elif isinstance(other, str):
+ return self.email == other
+
+ @lazy_property
+ def user(self):
+ return self.backend.users.get_by_id(self.data.user_id)
+
+ @property
+ def recipient(self):
+ return "%s <%s>" % (self.user.name, self.email)
+
+ @property
+ def email(self):
+ return self.data.email
+
+ def set_primary(self, primary):
+ self._set_attribute("primary", primary)
+
+ primary = property(lambda s: s.data.primary, set_primary)
+
+ @property
+ def activated(self):
+ return self.data.activated
+
+ def activate(self):
+ self._set_attribute("activated", True)
+ self._set_attribute("activation_code", None)
+
+ @property
+ def activation_code(self):
+ return self.data.activation_code
+
def send_activation_mail(self):
+ if not self.primary:
+ return self.send_email_activation_email()
+
logging.debug("Sending activation mail to %s" % self.email)
# Get the saved locale from the user.
- locale = tornado.locale.get(self.locale)
- _ = locale.translate
+ _ = tornado.locale.get(self.user.locale).translate
subject = _("Account Activation")
message += _("To activate your account, please click on the link below.")
message += "\n"*2
message += " %(baseurl)s/user/%(name)s/activate?code=%(activation_code)s" \
- % { "baseurl" : self.settings.get("baseurl"), "name" : self.name,
- "activation_code" : self.get_email_activation_code(self.email), }
+ % { "baseurl" : self.settings.get("baseurl"), "name" : self.user.name,
+ "activation_code" : self.activation_code, }
message += "\n"*2
message += "Sincerely,\n The Pakfire Build Service"
- self.pakfire.messages.add("%s <%s>" % (self.realname, self.email), subject, message)
+ self.backend.messages.add(self.recipient, subject, message)
def send_email_activation_mail(self, email):
- logging.debug("Sending email address activation mail to %s" % email)
+ logging.debug("Sending email address activation mail to %s" % self.email)
# Get the saved locale from the user.
- locale = tornado.locale.get(self.locale)
- _ = locale.translate
+ _ = tornado.locale.get(self.user.locale).translate
subject = _("Email address Activation")
message += _("To activate your this email address account, please click on the link below.")
message += "\n"*2
message += " %(baseurl)s/user/%(name)s/activate?code=%(activation_code)s" \
- % { "baseurl" : self.settings.get("baseurl"), "name" : self.name,
- "activation_code" : self.get_email_activation_code(email), }
+ % { "baseurl" : self.settings.get("baseurl"), "name" : self.user.name,
+ "activation_code" : self.activation_code, }
message += "\n"*2
message += "Sincerely,\n The Pakfire Build Service"
- self.pakfire.messages.add("%s <%s>" % (self.realname, email), subject, message)
+ self.backend.messages.add(self.recipient, subject, message)
# Some testing code.