]> git.ipfire.org Git - pbs.git/blob - src/buildservice/ldap.py
44f7c7ca3140951f1b929236c5d1ea5c78e51f26
[pbs.git] / src / buildservice / ldap.py
1 #!/usr/bin/python
2
3 from __future__ import absolute_import
4
5 import ldap
6 import logging
7
8 log = logging.getLogger("ldap")
9 log.propagate = 1
10
11 from . import base
12 from .decorators import *
13
14 class LDAP(base.Object):
15 @lazy_property
16 def ldap(self):
17 ldap_uri = self.settings.get("ldap_uri")
18
19 log.debug("Connecting to %s..." % ldap_uri)
20
21 # Establish LDAP connection
22 return ldap.initialize(ldap_uri)
23
24 def search(self, query, attrlist=None, limit=0):
25 log.debug("Performing LDAP query: %s" % query)
26
27 search_base = self.settings.get("ldap_search_base")
28
29 results = self.ldap.search_ext_s(search_base, ldap.SCOPE_SUBTREE,
30 query, attrlist=attrlist, sizelimit=limit)
31
32 return results
33
34 def auth(self, username, password):
35 log.debug("Checking credentials for %s" % username)
36
37 dn = self.get_dn(username)
38 if not dn:
39 log.debug("Could not resolve %s to dn" % username)
40 return False
41
42 return self.bind(dn, password)
43
44 def bind(self, dn, password):
45 try:
46 self.ldap.simple_bind_s(dn, password)
47 except ldap.INVALID_CREDENTIALS:
48 log.debug("Account credentials for %s are invalid" % dn)
49 return False
50
51 log.debug("Successfully authenticated %s" % dn)
52
53 return True
54
55 def get_dn_by_uid(self, uid):
56 dn, attrs = self.get_user(uid, attrlist=["uid"])
57
58 if not dn:
59 return
60
61 log.debug("DN for uid %s is: %s" % (uid, dn))
62 return dn
63
64 def get_dn_by_mail(self, mail):
65 result = self.search("(&(objectClass=posixAccount)(mail=%s))" % mail, limit=1, attrlist=["uid"])
66
67 for dn, attrs in result:
68 return dn
69
70 log.debug("DN for mail %s is: %s" % (mail, dn))
71 return None
72
73 def get_dn(self, name):
74 return self.get_dn_by_uid(name) or self.get_dn_by_mail(name)
75
76 def get_user_by_mail(self, mail, **kwargs):
77 result = self.search("(&(objectClass=posixAccount)(mail=%s))" % mail, limit=1, **kwargs)
78 for dn, attrs in result:
79 return (dn, attrs)
80
81 return None
82
83 def get_user_by_dn(self, uid, **kwargs):
84 result = self.search("(&(objectClass=posixAccount)(uid=%s))" % uid, limit=1, **kwargs)
85 for dn, attrs in result:
86 return (dn, attrs)
87
88 return None
89
90 def get_user(self, name, **kwargs):
91 return self.get_user_by_dn(name, **kwargs) or self.get_user_by_mail(name, **kwargs)