]>
git.ipfire.org Git - pbs.git/blob - src/buildservice/users.py
15 log
= logging
.getLogger("users")
21 from .decorators
import *
23 # A list of possible random characters.
24 random_chars
= string
.ascii_letters
+ string
.digits
26 def generate_random_string(length
=16):
28 Return a string with random chararcters A-Za-z0-9 with given length.
30 return "".join([random
.choice(random_chars
) for i
in range(length
)])
33 def generate_password_hash(password
, salt
=None, algo
="sha512"):
35 This function creates a salted digest of the given password.
37 # Generate the salt (length = 16) of none was given.
39 salt
= generate_random_string(length
=16)
43 if not algo
in hashlib
.algorithms
:
44 raise Exception, "Unsupported password hash algorithm: %s" % algo
46 # Calculate the digest.
51 # Output string is of kind "<algo>$<salt>$<hash>".
52 return "$".join((algo
, salt
, h
.hexdigest()))
54 def check_password_hash(password
, password_hash
):
56 Check a plain-text password with the given digest.
58 # Handle plaintext passwords (plain$<password>).
59 if password_hash
.startswith("plain$"):
60 return password_hash
[6:] == password
63 algo
, salt
, digest
= password_hash
.split("$", 2)
65 logging
.warning("Unknown password hash: %s" % password_hash
)
68 # Re-generate the password hash and compare the result.
69 return password_hash
== generate_password_hash(password
, salt
=salt
, algo
=algo
)
72 class Users(base
.Object
):
74 self
.ldap
= ldap
.LDAP(self
.backend
)
76 def _get_user(self
, query
, *args
):
77 res
= self
.db
.get(query
, *args
)
80 return User(self
.backend
, res
.id, data
=res
)
82 def _get_users(self
, query
, *args
):
83 res
= self
.db
.query(query
, *args
)
86 yield User(self
.backend
, row
.id, data
=row
)
88 def _get_user_email(self
, query
, *args
):
89 res
= self
.db
.get(query
, *args
)
92 return UserEmail(self
.backend
, res
.id, data
=res
)
94 def _get_user_emails(self
, query
, *args
):
95 res
= self
.db
.query(query
, *args
)
98 yield UserEmail(self
.backend
, row
.id, data
=row
)
101 users
= self
._get
_users
("SELECT * FROM users \
102 WHERE activated IS TRUE AND deleted IS FALSE ORDER BY name")
107 res
= self
.db
.get("SELECT COUNT(*) AS count FROM users \
108 WHERE activated IS TRUE AND deleted IS FALSE")
112 def create(self
, name
, realname
=None, ldap_dn
=None):
113 # XXX check if username has the correct name
115 # Check if name is already taken
116 user
= self
.get_by_name(name
)
118 raise ValueError("Username %s already taken" % name
)
121 user
= self
._get
_user
("INSERT INTO users(name, realname, ldap_dn) \
122 VALUES(%s, %s, %s) RETURNING *", name
, realname
, ldap_dn
)
124 # Create row in permissions table.
125 self
.db
.execute("INSERT INTO users_permissions(user_id) VALUES(%s)", user
.id)
127 log
.debug("Created user %s" % user
.name
)
131 def create_from_ldap(self
, name
):
132 log
.debug("Creating user %s from LDAP" % name
)
134 # Get required attributes from LDAP
135 dn
, attr
= self
.ldap
.get_user(name
, attrlist
=["uid", "cn", "mail"])
138 # Create regular user
139 user
= self
.create(name
, realname
=attr
["cn"][0], ldap_dn
=dn
)
142 # Add all email addresses and activate them
143 for email
in attr
["mail"]:
144 user
.add_email(email
, activated
=True)
148 def auth(self
, name
, password
):
149 # If either name or password is None, we don't check at all.
150 if None in (name
, password
):
153 # usually we will get an email address as name
154 user
= self
.get_by_email(name
) or self
.get_by_name(name
)
157 # If no user could be found, we search for a matching user in
159 if not self
.ldap
.auth(name
, password
):
162 # If a LDAP user is found (and password matches), we will
163 # create a new local user with the information from LDAP.
164 user
= self
.create_from_ldap(name
)
166 if not user
.activated
or user
.deleted
:
169 # Check if the password matches
170 if user
.check_password(password
):
173 def email_in_use(self
, email
):
174 return self
._get
_user
_email
("SELECT * FROM users_emails \
175 WHERE email = %s AND activated IS TRUE", email
)
177 def get_by_id(self
, id):
178 return self
._get
_user
("SELECT * FROM users WHERE id = %s", id)
180 def get_by_name(self
, name
):
181 return self
._get
_user
("SELECT * FROM users WHERE name = %s", name
)
183 def get_by_email(self
, email
):
184 return self
._get
_user
("SELECT users.* FROM users \
185 LEFT JOIN users_emails ON users.id = users_emails.user_id \
186 WHERE users_emails.email = %s", email
)
188 def find_maintainers(self
, maintainers
):
191 # Make a unique list of all email addresses
192 for maintainer
in maintainers
:
193 name
, email_address
= email
.utils
.parseaddr(maintainer
)
195 if not email_address
in email_addresses
:
196 email_addresses
.append(email_address
)
198 users
= self
._get
_users
("SELECT DISTINCT users.* FROM users \
199 LEFT JOIN users_emails ON users.id = users_emails.user_id \
200 WHERE users_emails.activated IS TRUE \
201 AND users_emails.email = ANY(%s)", email_addresses
)
205 def find_maintainer(self
, s
):
206 name
, email_address
= email
.utils
.parseaddr(s
)
209 if not email_address
:
212 return self
.get_by_email(email_address
)
214 def search(self
, pattern
, limit
=None):
215 pattern
= "%%%s%%" % pattern
217 return self
._get
_users
("SELECT * FROM users \
218 WHERE (name LIKE %s OR realname LIKE %s) \
219 AND activated IS TRUE AND deleted IS FALSE \
220 ORDER BY name LIMIT %s", pattern
, pattern
, limit
)
223 def check_password_strength(password
):
227 # Empty passwords cannot be used.
228 if len(password
) == 0:
231 # Passwords with less than 6 characters are also too weak.
232 if len(password
) < 6:
235 # Password with at least 8 characters are secure.
236 if len(password
) >= 8:
239 # 10 characters are even more secure.
240 if len(password
) >= 10:
243 # Digits in the password are good.
244 if re
.search("\d+", password
):
247 # Check for lowercase AND uppercase characters.
248 if re
.search("[a-z]", password
) and re
.search("[A-Z]", password
):
251 # Search for special characters.
252 if re
.search(".[!,@,#,$,%,^,&,*,?,_,~,-,(,)]", password
):
258 return accepted
, score
261 class User(base
.DataObject
):
265 return "<%s %s>" % (self
.__class
__.__name
__, self
.realname
)
270 def __eq__(self
, other
):
271 if isinstance(other
, self
.__class
__):
272 return self
.id == other
.id
274 def __lt__(self
, other
):
275 if isinstance(other
, self
.__class
__):
276 return self
.name
< other
.name
278 elif isinstance(other
, str):
279 return self
.name
< other
282 self
._set
_attribute
("deleted", True)
285 self
._set
_attribute
("activated", True)
287 def check_password(self
, password
):
289 Compare the given password with the one stored in the database.
292 return self
.backend
.users
.ldap
.bind(self
.ldap_dn
, password
)
294 return check_password_hash(password
, self
.data
.passphrase
)
296 def set_passphrase(self
, passphrase
):
298 Update the passphrase the users uses to log on.
300 self
.db
.execute("UPDATE users SET passphrase = %s WHERE id = %s",
301 generate_password_hash(passphrase
), self
.id)
303 passphrase
= property(lambda x
: None, set_passphrase
)
305 def get_realname(self
):
306 return self
.data
.realname
or self
.name
308 def set_realname(self
, realname
):
309 self
._set
_attribute
("realname", realname
)
311 realname
= property(get_realname
, set_realname
)
315 return self
.data
.name
319 return self
.data
.ldap_dn
323 # Try to split the string into first and last name.
324 # If that is not successful, return the entire realname.
326 firstname
, rest
= self
.realname
.split(" ", 1)
334 res
= self
.backend
.users
._get
_user
_emails
("SELECT * FROM users_emails \
335 WHERE user_id = %s AND activated IS TRUE ORDER BY email", self
.id)
341 for email
in self
.emails
:
345 def get_email(self
, email
):
346 for e
in self
.emails
:
350 def set_primary_email(self
, email
):
351 if not email
in self
.emails
:
352 raise ValueError("Email address does not belong to user")
354 # Mark previous primary email as non-primary
355 self
.db
.execute("UPDATE users_emails SET \"primary\" = FALSE \
356 WHERE user_id = %s AND \"primary\" IS TRUE" % self
.id)
358 # Mark new primary email
359 self
.db
.execute("UPDATE users_emails SET \"primary\" = TRUE \
360 WHERE user_id = %s AND email = %s AND activated IS TRUE",
363 def has_email_address(self
, email_address
):
365 mail
, email_address
= email
.utils
.parseaddr(email_address
)
369 return email_address
in self
.emails
371 def activate_email(self
, code
):
372 # Search email by activation code
373 email
= self
.backend
.users
._get
_user
_email
("SELECT * FROM users_emails \
374 WHERE user_id = %s AND activated IS FALSE AND activation_code = %s", self
.id, code
)
379 # Activate email address
383 # Te activated flag is useful for LDAP users
384 def add_email(self
, email
, activated
=False):
385 # Check if the email is in use
386 if self
.backend
.users
.email_in_use(email
):
387 raise ValueError("Email %s is already in use" % email
)
389 activation_code
= None
391 activation_code
= generate_random_string(64)
393 user_email
= self
.backend
.users
._get
_user
_email
("INSERT INTO users_emails(user_id, email, \
394 \"primary\", activated, activation_code) VALUES(%s, %s, %s, %s, %s) RETURNING *",
395 self
.id, email
, not self
.emails
, activated
, activation_code
)
396 self
.emails
.append(user_email
)
398 # Send activation email if activation is needed
400 user_email
.send_activation_mail()
404 def set_state(self
, state
):
405 self
._set
_attribute
("state", state
)
407 state
= property(lambda s
: s
.data
.state
, set_state
)
410 return self
.state
== "admin"
413 return self
.state
== "tester"
415 def get_locale(self
):
416 return tornado
.locale
.get(self
.data
.locale
)
418 def set_locale(self
, locale
):
419 self
._set
_attribute
("locale", locale
)
421 locale
= property(get_locale
, set_locale
)
423 def get_timezone(self
, tz
=None):
425 tz
= self
.data
.timezone
or ""
428 tz
= pytz
.timezone(tz
)
429 except pytz
.UnknownTimeZoneError
:
430 tz
= pytz
.timezone("UTC")
434 def set_timezone(self
, timezone
):
435 if not timezone
is None:
436 tz
= self
.get_timezone(timezone
)
439 self
._set
_attribute
("timezone", timezone
)
441 timezone
= property(get_timezone
, set_timezone
)
445 return self
.data
.activated
449 return self
.data
.deleted
452 def registered(self
):
453 return self
.data
.registered
455 def gravatar_icon(self
, size
=128):
456 h
= hashlib
.new("md5")
458 h
.update("%s" % self
.email
)
461 gravatar_url
= "http://www.gravatar.com/avatar/%s?" % h
.hexdigest()
462 gravatar_url
+= urllib
.urlencode({'d': "mm", 's': str(size
)})
468 return self
.db
.get("SELECT * FROM users_permissions WHERE user_id = %s", self
.id)
470 def has_perm(self
, perm
):
472 Returns True if the user has the requested permission.
474 # Admins have the permission for everything.
478 # Exception for voting. All testers are allowed to vote.
479 if perm
== "vote" and self
.is_tester():
482 # All others must be checked individually.
483 return self
.perms
.get(perm
, False) == True
487 return self
.backend
.sessions
._get
_sessions
("SELECT * FROM sessions \
488 WHERE user_id = %s AND valid_until >= NOW() ORDER BY created_at")
491 class UserEmail(base
.DataObject
):
492 table
= "users_emails"
497 def __eq__(self
, other
):
498 if isinstance(other
, self
.__class
__):
499 return self
.id == other
.id
501 elif isinstance(other
, str):
502 return self
.email
== other
506 return self
.backend
.users
.get_by_id(self
.data
.user_id
)
510 return "%s <%s>" % (self
.user
.name
, self
.email
)
514 return self
.data
.email
516 def set_primary(self
, primary
):
517 self
._set
_attribute
("primary", primary
)
519 primary
= property(lambda s
: s
.data
.primary
, set_primary
)
523 return self
.data
.activated
526 self
._set
_attribute
("activated", True)
527 self
._set
_attribute
("activation_code", None)
530 def activation_code(self
):
531 return self
.data
.activation_code
533 def send_activation_mail(self
):
535 return self
.send_email_activation_email()
537 logging
.debug("Sending activation mail to %s" % self
.email
)
539 # Get the saved locale from the user.
540 _
= tornado
.locale
.get(self
.user
.locale
).translate
542 subject
= _("Account Activation")
544 message
= _("You, or somebody using your email address, has registered an account on the Pakfire Build Service.")
546 message
+= _("To activate your account, please click on the link below.")
548 message
+= " %(baseurl)s/user/%(name)s/activate?code=%(activation_code)s" \
549 % { "baseurl" : self
.settings
.get("baseurl"), "name" : self
.user
.name
,
550 "activation_code" : self
.activation_code
, }
552 message
+= "Sincerely,\n The Pakfire Build Service"
554 self
.backend
.messages
.add(self
.recipient
, subject
, message
)
556 def send_email_activation_mail(self
, email
):
557 logging
.debug("Sending email address activation mail to %s" % self
.email
)
559 # Get the saved locale from the user.
560 _
= self
.user
.locale
.translate
562 subject
= _("Email address Activation")
564 message
= _("You, or somebody using your email address, has add this email address to an account on the Pakfire Build Service.")
566 message
+= _("To activate your this email address account, please click on the link below.")
568 message
+= " %(baseurl)s/user/%(name)s/activate?code=%(activation_code)s" \
569 % { "baseurl" : self
.settings
.get("baseurl"), "name" : self
.user
.name
,
570 "activation_code" : self
.activation_code
, }
572 message
+= "Sincerely,\n The Pakfire Build Service"
574 self
.backend
.messages
.add(self
.recipient
, subject
, message
)
578 if __name__
== "__main__":
579 for password
in ("1234567890", "abcdefghij"):
580 digest
= generate_password_hash(password
)
582 print "%s %s" % (password
, digest
)
583 print " Matches? %s" % check_password_hash(password
, digest
)