16 log
= logging
.getLogger("users")
22 from .decorators
import *
24 # A list of possible random characters.
25 random_chars
= string
.ascii_letters
+ string
.digits
27 def generate_random_string(length
=16):
29 Return a string with random chararcters A-Za-z0-9 with given length.
31 return "".join([random
.choice(random_chars
) for i
in range(length
)])
34 def generate_password_hash(password
, salt
=None, algo
="sha512"):
36 This function creates a salted digest of the given password.
38 # Generate the salt (length = 16) of none was given.
40 salt
= generate_random_string(length
=16)
44 if not algo
in hashlib
.algorithms
:
45 raise Exception, "Unsupported password hash algorithm: %s" % algo
47 # Calculate the digest.
52 # Output string is of kind "<algo>$<salt>$<hash>".
53 return "$".join((algo
, salt
, h
.hexdigest()))
55 def check_password_hash(password
, password_hash
):
57 Check a plain-text password with the given digest.
59 # Handle plaintext passwords (plain$<password>).
60 if password_hash
.startswith("plain$"):
61 return password_hash
[6:] == password
64 algo
, salt
, digest
= password_hash
.split("$", 2)
66 logging
.warning("Unknown password hash: %s" % password_hash
)
69 # Re-generate the password hash and compare the result.
70 return password_hash
== generate_password_hash(password
, salt
=salt
, algo
=algo
)
73 class Users(base
.Object
):
75 self
.ldap
= ldap
.LDAP(self
.backend
)
77 def _get_user(self
, query
, *args
):
78 res
= self
.db
.get(query
, *args
)
81 return User(self
.backend
, res
.id, data
=res
)
83 def _get_users(self
, query
, *args
):
84 res
= self
.db
.query(query
, *args
)
87 yield User(self
.backend
, row
.id, data
=row
)
89 def _get_user_email(self
, query
, *args
):
90 res
= self
.db
.get(query
, *args
)
93 return UserEmail(self
.backend
, res
.id, data
=res
)
95 def _get_user_emails(self
, query
, *args
):
96 res
= self
.db
.query(query
, *args
)
99 yield UserEmail(self
.backend
, row
.id, data
=row
)
102 users
= self
._get
_users
("SELECT * FROM users \
103 WHERE activated IS TRUE AND deleted IS FALSE ORDER BY name")
108 res
= self
.db
.get("SELECT COUNT(*) AS count FROM users \
109 WHERE activated IS TRUE AND deleted IS FALSE")
113 def create(self
, name
, realname
=None, ldap_dn
=None):
114 # XXX check if username has the correct name
116 # Check if name is already taken
117 user
= self
.get_by_name(name
)
119 raise ValueError("Username %s already taken" % name
)
122 user
= self
._get
_user
("INSERT INTO users(name, realname, ldap_dn) \
123 VALUES(%s, %s, %s) RETURNING *", name
, realname
, ldap_dn
)
125 # Create row in permissions table.
126 self
.db
.execute("INSERT INTO users_permissions(user_id) VALUES(%s)", user
.id)
128 log
.debug("Created user %s" % user
.name
)
132 def create_from_ldap(self
, name
):
133 log
.debug("Creating user %s from LDAP" % name
)
135 # Get required attributes from LDAP
136 dn
, attr
= self
.ldap
.get_user(name
, attrlist
=["uid", "cn", "mail"])
139 # Create regular user
140 user
= self
.create(name
, realname
=attr
["cn"][0], ldap_dn
=dn
)
143 # Add all email addresses and activate them
144 for email
in attr
["mail"]:
145 user
.add_email(email
, activated
=True)
149 def auth(self
, name
, password
):
150 # If either name or password is None, we don't check at all.
151 if None in (name
, password
):
154 # usually we will get an email address as name
155 user
= self
.get_by_email(name
) or self
.get_by_name(name
)
158 # If no user could be found, we search for a matching user in
160 if not self
.ldap
.auth(name
, password
):
163 # If a LDAP user is found (and password matches), we will
164 # create a new local user with the information from LDAP.
165 user
= self
.create_from_ldap(name
)
167 if not user
.activated
or user
.deleted
:
170 # Check if the password matches
171 if user
.check_password(password
):
174 def email_in_use(self
, email
):
175 return self
._get
_user
_email
("SELECT * FROM users_emails \
176 WHERE email = %s AND activated IS TRUE", email
)
178 def get_by_id(self
, id):
179 return self
._get
_user
("SELECT * FROM users WHERE id = %s", id)
181 def get_by_name(self
, name
):
182 return self
._get
_user
("SELECT * FROM users WHERE name = %s", name
)
184 def get_by_email(self
, email
):
185 return self
._get
_user
("SELECT users.* FROM users \
186 LEFT JOIN users_emails ON users.id = users_emails.user_id \
187 WHERE users_emails.email = %s", email
)
189 def get_by_password_recovery_code(self
, code
):
190 return self
._get
_user
("SELECT * FROM users \
191 WHERE password_recovery_code = %s AND password_recovery_code_expires_at > NOW()", code
)
193 def find_maintainers(self
, maintainers
):
196 # Make a unique list of all email addresses
197 for maintainer
in maintainers
:
198 name
, email_address
= email
.utils
.parseaddr(maintainer
)
200 if not email_address
in email_addresses
:
201 email_addresses
.append(email_address
)
203 users
= self
._get
_users
("SELECT DISTINCT users.* FROM users \
204 LEFT JOIN users_emails ON users.id = users_emails.user_id \
205 WHERE users_emails.activated IS TRUE \
206 AND users_emails.email = ANY(%s)", email_addresses
)
210 def find_maintainer(self
, s
):
211 name
, email_address
= email
.utils
.parseaddr(s
)
214 if not email_address
:
217 return self
.get_by_email(email_address
)
219 def search(self
, pattern
, limit
=None):
220 pattern
= "%%%s%%" % pattern
222 users
= self
._get
_users
("SELECT * FROM users \
223 WHERE (name LIKE %s OR realname LIKE %s) \
224 AND activated IS TRUE AND deleted IS FALSE \
225 ORDER BY name LIMIT %s", pattern
, pattern
, limit
)
230 def check_password_strength(password
):
234 # Empty passwords cannot be used.
235 if len(password
) == 0:
238 # Passwords with less than 6 characters are also too weak.
239 if len(password
) < 6:
242 # Password with at least 8 characters are secure.
243 if len(password
) >= 8:
246 # 10 characters are even more secure.
247 if len(password
) >= 10:
250 # Digits in the password are good.
251 if re
.search("\d+", password
):
254 # Check for lowercase AND uppercase characters.
255 if re
.search("[a-z]", password
) and re
.search("[A-Z]", password
):
258 # Search for special characters.
259 if re
.search(".[!,@,#,$,%,^,&,*,?,_,~,-,(,)]", password
):
265 return accepted
, score
268 class User(base
.DataObject
):
272 return "<%s %s>" % (self
.__class
__.__name
__, self
.realname
)
277 def __eq__(self
, other
):
278 if isinstance(other
, self
.__class
__):
279 return self
.id == other
.id
281 def __lt__(self
, other
):
282 if isinstance(other
, self
.__class
__):
283 return self
.name
< other
.name
285 elif isinstance(other
, str):
286 return self
.name
< other
289 self
._set
_attribute
("deleted", True)
292 self
._set
_attribute
("activated", True)
294 def check_password(self
, password
):
296 Compare the given password with the one stored in the database.
299 return self
.backend
.users
.ldap
.bind(self
.ldap_dn
, password
)
301 return check_password_hash(password
, self
.data
.passphrase
)
303 def set_passphrase(self
, passphrase
):
305 Update the passphrase the users uses to log on.
307 # We cannot set the password for ldap users
309 raise AttributeError("Cannot set passphrase for LDAP user")
311 self
.db
.execute("UPDATE users SET passphrase = %s WHERE id = %s",
312 generate_password_hash(passphrase
), self
.id)
314 passphrase
= property(lambda x
: None, set_passphrase
)
316 def get_realname(self
):
317 return self
.data
.realname
or self
.name
319 def set_realname(self
, realname
):
320 self
._set
_attribute
("realname", realname
)
322 realname
= property(get_realname
, set_realname
)
326 return self
.data
.name
330 return self
.data
.ldap_dn
334 # Try to split the string into first and last name.
335 # If that is not successful, return the entire realname.
337 firstname
, rest
= self
.realname
.split(" ", 1)
344 def envelope_from(self
):
345 return "%s <%s>" % (self
.realname
, self
.email
)
349 res
= self
.backend
.users
._get
_user
_emails
("SELECT * FROM users_emails \
350 WHERE user_id = %s AND activated IS TRUE ORDER BY email", self
.id)
356 for email
in self
.emails
:
360 def get_email(self
, email
):
361 for e
in self
.emails
:
365 def set_primary_email(self
, email
):
366 if not email
in self
.emails
:
367 raise ValueError("Email address does not belong to user")
369 # Mark previous primary email as non-primary
370 self
.db
.execute("UPDATE users_emails SET \"primary\" = FALSE \
371 WHERE user_id = %s AND \"primary\" IS TRUE" % self
.id)
373 # Mark new primary email
374 self
.db
.execute("UPDATE users_emails SET \"primary\" = TRUE \
375 WHERE user_id = %s AND email = %s AND activated IS TRUE",
378 def has_email_address(self
, email_address
):
380 mail
, email_address
= email
.utils
.parseaddr(email_address
)
384 return email_address
in self
.emails
386 def activate_email(self
, code
):
387 # Search email by activation code
388 email
= self
.backend
.users
._get
_user
_email
("SELECT * FROM users_emails \
389 WHERE user_id = %s AND activated IS FALSE AND activation_code = %s", self
.id, code
)
394 # Activate email address
398 # Te activated flag is useful for LDAP users
399 def add_email(self
, email
, activated
=False):
400 # Check if the email is in use
401 if self
.backend
.users
.email_in_use(email
):
402 raise ValueError("Email %s is already in use" % email
)
404 activation_code
= None
406 activation_code
= generate_random_string(64)
408 user_email
= self
.backend
.users
._get
_user
_email
("INSERT INTO users_emails(user_id, email, \
409 \"primary\", activated, activation_code) VALUES(%s, %s, %s, %s, %s) RETURNING *",
410 self
.id, email
, not self
.emails
, activated
, activation_code
)
413 user_email
.user
= self
414 self
.emails
.append(user_email
)
416 # Send activation email if activation is needed
418 user_email
.send_activation_mail()
422 def send_template(self
, *args
, **kwargs
):
423 return self
.backend
.messages
.send_template(self
, *args
, **kwargs
)
426 return self
.data
.admin
is True
428 def get_locale(self
):
429 return tornado
.locale
.get(self
.data
.locale
)
431 def set_locale(self
, locale
):
432 self
._set
_attribute
("locale", locale
)
434 locale
= property(get_locale
, set_locale
)
436 def get_timezone(self
, tz
=None):
438 tz
= self
.data
.timezone
or ""
441 tz
= pytz
.timezone(tz
)
442 except pytz
.UnknownTimeZoneError
:
443 tz
= pytz
.timezone("UTC")
447 def set_timezone(self
, timezone
):
448 if not timezone
is None:
449 tz
= self
.get_timezone(timezone
)
452 self
._set
_attribute
("timezone", timezone
)
454 timezone
= property(get_timezone
, set_timezone
)
456 def get_password_recovery_code(self
):
457 return self
.data
.password_recovery_code
459 def set_password_recovery_code(self
, code
):
460 self
._set
_attribute
("password_recovery_code", code
)
462 self
._set
_attribute
("password_recovery_code_expires_at",
463 datetime
.datetime
.utcnow() + datetime
.timedelta(days
=1))
465 password_recovery_code
= property(get_password_recovery_code
, set_password_recovery_code
)
467 def forgot_password(self
):
468 log
.debug("User %s reqested password recovery" % self
.name
)
470 # We cannot reset te password for ldap users
472 # Maybe we should send an email with an explanation
475 # Add a recovery code to the database and a timestamp when this code expires
476 self
.password_recovery_code
= generate_random_string(64)
478 # Send an email with the activation code
479 self
.send_template("messages/users/password-reset", user
=self
)
483 return self
.data
.activated
487 return self
.data
.deleted
490 def registered_at(self
):
491 return self
.data
.registered_at
493 def gravatar_icon(self
, size
=128):
494 h
= hashlib
.new("md5")
496 h
.update("%s" % self
.email
)
499 gravatar_url
= "http://www.gravatar.com/avatar/%s?" % h
.hexdigest()
500 gravatar_url
+= urllib
.urlencode({'d': "mm", 's': str(size
)})
506 return self
.db
.get("SELECT * FROM users_permissions WHERE user_id = %s", self
.id)
508 def has_perm(self
, perm
):
510 Returns True if the user has the requested permission.
512 # Admins have the permission for everything.
516 # All others must be checked individually.
517 return self
.perms
.get(perm
, False) == True
521 return self
.backend
.sessions
._get
_sessions
("SELECT * FROM sessions \
522 WHERE user_id = %s AND valid_until >= NOW() ORDER BY created_at")
525 class UserEmail(base
.DataObject
):
526 table
= "users_emails"
531 def __eq__(self
, other
):
532 if isinstance(other
, self
.__class
__):
533 return self
.id == other
.id
535 elif isinstance(other
, str):
536 return self
.email
== other
540 return self
.backend
.users
.get_by_id(self
.data
.user_id
)
544 return "%s <%s>" % (self
.user
.realname
, self
.email
)
548 return self
.data
.email
550 def set_primary(self
, primary
):
551 self
._set
_attribute
("primary", primary
)
553 primary
= property(lambda s
: s
.data
.primary
, set_primary
)
557 return self
.data
.activated
560 self
._set
_attribute
("activated", True)
561 self
._set
_attribute
("activation_code", None)
564 def activation_code(self
):
565 return self
.data
.activation_code
567 def send_activation_mail(self
):
569 return self
.send_email_activation_email()
571 logging
.debug("Sending activation mail to %s" % self
.email
)
573 self
.user
.send_template("messages/users/account-activation")
575 def send_email_activation_mail(self
):
576 logging
.debug("Sending email address activation mail to %s" % self
.email
)
578 self
.user
.send_template("messages/users/email-activation", email
=self
)
582 if __name__
== "__main__":
583 for password
in ("1234567890", "abcdefghij"):
584 digest
= generate_password_hash(password
)
586 print "%s %s" % (password
, digest
)
587 print " Matches? %s" % check_password_hash(password
, digest
)