]>
git.ipfire.org Git - pbs.git/blob - src/buildservice/users.py
29a98b82a6a39bb643e974c24137b1e99c8b07ab
15 log
= logging
.getLogger("users")
21 from .decorators
import *
23 # A list of possible random characters.
24 random_chars
= string
.ascii_letters
+ string
.digits
26 def generate_random_string(length
=16):
28 Return a string with random chararcters A-Za-z0-9 with given length.
30 return "".join([random
.choice(random_chars
) for i
in range(length
)])
33 def generate_password_hash(password
, salt
=None, algo
="sha512"):
35 This function creates a salted digest of the given password.
37 # Generate the salt (length = 16) of none was given.
39 salt
= generate_random_string(length
=16)
43 if not algo
in hashlib
.algorithms
:
44 raise Exception, "Unsupported password hash algorithm: %s" % algo
46 # Calculate the digest.
51 # Output string is of kind "<algo>$<salt>$<hash>".
52 return "$".join((algo
, salt
, h
.hexdigest()))
54 def check_password_hash(password
, password_hash
):
56 Check a plain-text password with the given digest.
58 # Handle plaintext passwords (plain$<password>).
59 if password_hash
.startswith("plain$"):
60 return password_hash
[6:] == password
63 algo
, salt
, digest
= password_hash
.split("$", 2)
65 logging
.warning("Unknown password hash: %s" % password_hash
)
68 # Re-generate the password hash and compare the result.
69 return password_hash
== generate_password_hash(password
, salt
=salt
, algo
=algo
)
72 class Users(base
.Object
):
74 self
.ldap
= ldap
.LDAP(self
.backend
)
76 def _get_user(self
, query
, *args
):
77 res
= self
.db
.get(query
, *args
)
80 return User(self
.backend
, res
.id, data
=res
)
82 def _get_users(self
, query
, *args
):
83 res
= self
.db
.query(query
, *args
)
86 yield User(self
.backend
, row
.id, data
=row
)
88 def _get_user_email(self
, query
, *args
):
89 res
= self
.db
.get(query
, *args
)
92 return UserEmail(self
.backend
, res
.id, data
=res
)
94 def _get_user_emails(self
, query
, *args
):
95 res
= self
.db
.query(query
, *args
)
98 yield UserEmail(self
.backend
, row
.id, data
=row
)
101 users
= self
._get
_users
("SELECT * FROM users \
102 WHERE activated IS TRUE AND deleted IS FALSE ORDER BY name")
107 res
= self
.db
.get("SELECT COUNT(*) AS count FROM users \
108 WHERE activated IS TRUE AND deleted IS FALSE")
112 def create(self
, name
, realname
=None, ldap_dn
=None):
113 # XXX check if username has the correct name
115 # Check if name is already taken
116 user
= self
.get_by_name(name
)
118 raise ValueError("Username %s already taken" % name
)
121 user
= self
._get
_user
("INSERT INTO users(name, realname, ldap_dn) \
122 VALUES(%s, %s, %s) RETURNING *", name
, realname
, ldap_dn
)
124 # Create row in permissions table.
125 self
.db
.execute("INSERT INTO users_permissions(user_id) VALUES(%s)", user
.id)
127 log
.debug("Created user %s" % user
.name
)
131 def create_from_ldap(self
, name
):
132 log
.debug("Creating user %s from LDAP" % name
)
134 # Get required attributes from LDAP
135 dn
, attr
= self
.ldap
.get_user(name
, attrlist
=["uid", "cn", "mail"])
138 # Create regular user
139 user
= self
.create(name
, realname
=attr
["cn"][0], ldap_dn
=dn
)
142 # Add all email addresses and activate them
143 for email
in attr
["mail"]:
144 user
.add_email(email
, activated
=True)
148 def auth(self
, name
, password
):
149 # If either name or password is None, we don't check at all.
150 if None in (name
, password
):
153 # usually we will get an email address as name
154 user
= self
.get_by_email(name
) or self
.get_by_name(name
)
157 # If no user could be found, we search for a matching user in
159 if not self
.ldap
.auth(name
, password
):
162 # If a LDAP user is found (and password matches), we will
163 # create a new local user with the information from LDAP.
164 user
= self
.create_from_ldap(name
)
166 if not user
.activated
or user
.deleted
:
169 # Check if the password matches
170 if user
.check_password(password
):
173 def email_in_use(self
, email
):
174 return self
._get
_user
_email
("SELECT * FROM users_emails \
175 WHERE email = %s AND activated IS TRUE", email
)
177 def get_by_id(self
, id):
178 return self
._get
_user
("SELECT * FROM users WHERE id = %s", id)
180 def get_by_name(self
, name
):
181 return self
._get
_user
("SELECT * FROM users WHERE name = %s", name
)
183 def get_by_email(self
, email
):
184 return self
._get
_user
("SELECT users.* FROM users \
185 LEFT JOIN users_emails ON users.id = users_emails.user_id \
186 WHERE users_emails.email = %s", email
)
188 def find_maintainers(self
, maintainers
):
191 # Make a unique list of all email addresses
192 for maintainer
in maintainers
:
193 name
, email_address
= email
.utils
.parseaddr(maintainer
)
195 if not email_address
in email_addresses
:
196 email_addresses
.append(email_address
)
198 users
= self
._get
_users
("SELECT DISTINCT users.* FROM users \
199 LEFT JOIN users_emails ON users.id = users_emails.user_id \
200 WHERE users_emails.activated IS TRUE \
201 AND users_emails.email = ANY(%s)", email_addresses
)
205 def find_maintainer(self
, s
):
206 name
, email_address
= email
.utils
.parseaddr(s
)
209 if not email_address
:
212 return self
.get_by_email(email_address
)
214 def search(self
, pattern
, limit
=None):
215 pattern
= "%%%s%%" % pattern
217 return self
._get
_users
("SELECT * FROM users \
218 WHERE (name LIKE %s OR realname LIKE %s) \
219 AND activated IS TRUE AND deleted IS FALSE \
220 ORDER BY name LIMIT %s", pattern
, pattern
, limit
)
223 def check_password_strength(password
):
227 # Empty passwords cannot be used.
228 if len(password
) == 0:
231 # Passwords with less than 6 characters are also too weak.
232 if len(password
) < 6:
235 # Password with at least 8 characters are secure.
236 if len(password
) >= 8:
239 # 10 characters are even more secure.
240 if len(password
) >= 10:
243 # Digits in the password are good.
244 if re
.search("\d+", password
):
247 # Check for lowercase AND uppercase characters.
248 if re
.search("[a-z]", password
) and re
.search("[A-Z]", password
):
251 # Search for special characters.
252 if re
.search(".[!,@,#,$,%,^,&,*,?,_,~,-,(,)]", password
):
258 return accepted
, score
261 class User(base
.DataObject
):
265 return "<%s %s>" % (self
.__class
__.__name
__, self
.realname
)
270 def __eq__(self
, other
):
271 if isinstance(other
, self
.__class
__):
272 return self
.id == other
.id
274 def __lt__(self
, other
):
275 if isinstance(other
, self
.__class
__):
276 return self
.name
< other
.name
278 elif isinstance(other
, str):
279 return self
.name
< other
282 self
._set
_attribute
("deleted", True)
285 self
._set
_attribute
("activated", True)
287 def check_password(self
, password
):
289 Compare the given password with the one stored in the database.
292 return self
.backend
.users
.ldap
.bind(self
.ldap_dn
, password
)
294 return check_password_hash(password
, self
.data
.passphrase
)
296 def set_passphrase(self
, passphrase
):
298 Update the passphrase the users uses to log on.
300 self
.db
.execute("UPDATE users SET passphrase = %s WHERE id = %s",
301 generate_password_hash(passphrase
), self
.id)
303 passphrase
= property(lambda x
: None, set_passphrase
)
305 def get_realname(self
):
306 return self
.data
.realname
or self
.name
308 def set_realname(self
, realname
):
309 self
._set
_attribute
("realname", realname
)
311 realname
= property(get_realname
, set_realname
)
315 return self
.data
.name
319 return self
.data
.ldap_dn
323 # Try to split the string into first and last name.
324 # If that is not successful, return the entire realname.
326 firstname
, rest
= self
.realname
.split(" ", 1)
334 res
= self
.backend
.users
._get
_user
_emails
("SELECT * FROM users_emails \
335 WHERE user_id = %s AND activated IS TRUE ORDER BY email", self
.id)
341 for email
in self
.emails
:
345 def get_email(self
, email
):
346 for e
in self
.emails
:
350 def set_primary_email(self
, email
):
351 if not email
in self
.emails
:
352 raise ValueError("Email address does not belong to user")
354 # Mark previous primary email as non-primary
355 self
.db
.execute("UPDATE users_emails SET \"primary\" = FALSE \
356 WHERE user_id = %s AND \"primary\" IS TRUE" % self
.id)
358 # Mark new primary email
359 self
.db
.execute("UPDATE users_emails SET \"primary\" = TRUE \
360 WHERE user_id = %s AND email = %s AND activated IS TRUE",
363 def has_email_address(self
, email_address
):
365 mail
, email_address
= email
.utils
.parseaddr(email_address
)
369 return email_address
in self
.emails
371 def activate_email(self
, code
):
372 # Search email by activation code
373 email
= self
.backend
.users
._get
_user
_email
("SELECT * FROM users_emails \
374 WHERE user_id = %s AND activated IS FALSE AND activation_code = %s", self
.id, code
)
379 # Activate email address
383 # Te activated flag is useful for LDAP users
384 def add_email(self
, email
, activated
=False):
385 # Check if the email is in use
386 if self
.backend
.users
.email_in_use(email
):
387 raise ValueError("Email %s is already in use" % email
)
389 activation_code
= None
391 activation_code
= generate_random_string(64)
393 user_email
= self
.backend
.users
._get
_user
_email
("INSERT INTO users_emails(user_id, email, \
394 \"primary\", activated, activation_code) VALUES(%s, %s, %s, %s, %s) RETURNING *",
395 self
.id, email
, not self
.emails
, activated
, activation_code
)
396 self
.emails
.append(user_email
)
398 # Send activation email if activation is needed
400 user_email
.send_activation_mail()
404 def send_template(self
, *args
, **kwargs
):
405 return self
.backend
.messages
.send_template(self
, *args
, **kwargs
)
407 def set_state(self
, state
):
408 self
._set
_attribute
("state", state
)
410 state
= property(lambda s
: s
.data
.state
, set_state
)
413 return self
.state
== "admin"
416 return self
.state
== "tester"
418 def get_locale(self
):
419 return tornado
.locale
.get(self
.data
.locale
)
421 def set_locale(self
, locale
):
422 self
._set
_attribute
("locale", locale
)
424 locale
= property(get_locale
, set_locale
)
426 def get_timezone(self
, tz
=None):
428 tz
= self
.data
.timezone
or ""
431 tz
= pytz
.timezone(tz
)
432 except pytz
.UnknownTimeZoneError
:
433 tz
= pytz
.timezone("UTC")
437 def set_timezone(self
, timezone
):
438 if not timezone
is None:
439 tz
= self
.get_timezone(timezone
)
442 self
._set
_attribute
("timezone", timezone
)
444 timezone
= property(get_timezone
, set_timezone
)
448 return self
.data
.activated
452 return self
.data
.deleted
455 def registered(self
):
456 return self
.data
.registered
458 def gravatar_icon(self
, size
=128):
459 h
= hashlib
.new("md5")
461 h
.update("%s" % self
.email
)
464 gravatar_url
= "http://www.gravatar.com/avatar/%s?" % h
.hexdigest()
465 gravatar_url
+= urllib
.urlencode({'d': "mm", 's': str(size
)})
471 return self
.db
.get("SELECT * FROM users_permissions WHERE user_id = %s", self
.id)
473 def has_perm(self
, perm
):
475 Returns True if the user has the requested permission.
477 # Admins have the permission for everything.
481 # Exception for voting. All testers are allowed to vote.
482 if perm
== "vote" and self
.is_tester():
485 # All others must be checked individually.
486 return self
.perms
.get(perm
, False) == True
490 return self
.backend
.sessions
._get
_sessions
("SELECT * FROM sessions \
491 WHERE user_id = %s AND valid_until >= NOW() ORDER BY created_at")
494 class UserEmail(base
.DataObject
):
495 table
= "users_emails"
500 def __eq__(self
, other
):
501 if isinstance(other
, self
.__class
__):
502 return self
.id == other
.id
504 elif isinstance(other
, str):
505 return self
.email
== other
509 return self
.backend
.users
.get_by_id(self
.data
.user_id
)
513 return "%s <%s>" % (self
.user
.name
, self
.email
)
517 return self
.data
.email
519 def set_primary(self
, primary
):
520 self
._set
_attribute
("primary", primary
)
522 primary
= property(lambda s
: s
.data
.primary
, set_primary
)
526 return self
.data
.activated
529 self
._set
_attribute
("activated", True)
530 self
._set
_attribute
("activation_code", None)
533 def activation_code(self
):
534 return self
.data
.activation_code
536 def send_activation_mail(self
):
538 return self
.send_email_activation_email()
540 logging
.debug("Sending activation mail to %s" % self
.email
)
542 self
.user
.send_template("messages/users/account-activation")
544 def send_email_activation_mail(self
):
545 logging
.debug("Sending email address activation mail to %s" % self
.email
)
547 self
.user
.send_template("messages/users/email-activation", email
=self
)
551 if __name__
== "__main__":
552 for password
in ("1234567890", "abcdefghij"):
553 digest
= generate_password_hash(password
)
555 print "%s %s" % (password
, digest
)
556 print " Matches? %s" % check_password_hash(password
, digest
)