OpenLDAP fixtures for backends
"""
-import ldap0
+import ldap
import logging
import os
import pathlib
import secrets
import tempfile
-from ldap0.controls.readentry import PostReadControl
+from ldap.controls.readentry import PostReadControl
from .slapd import server
# We're just after the generated DN, no other attributes at the moment
control = PostReadControl(True, [])
- result = conn.add_s(
- f"olcDatabase={backend},cn=config", self._entry(),
- req_ctrls=[control])
- dn = result.ctrls[0].res.dn_s
+ _, _, _, ctrls = conn.add_ext_s(
+ f"olcDatabase={backend},cn=config",
+ list(self._entry().items()),
+ serverctrls=[control])
+ dn = ctrls[0].dn
self.dn = dn
server.suffixes[suffix] = self
import os
import pathlib
-from ldap0.controls.readentry import PostReadControl
+from ldap.controls.readentry import PostReadControl
SOURCEROOT = pathlib.Path(os.environ.get('TOP_SRCDIR', "..")).absolute()
# We're just after the generated DN, no other attributes at the moment
control = PostReadControl(True, [])
- result = conn.add_s(
- f"olcOverlay={overlay_name},{database.dn}", self._entry(),
- req_ctrls=[control])
- self.dn = result.ctrls[0].res.dn_s
+ _, _, _, ctrls = conn.add_ext_s(
+ f"olcOverlay={overlay_name},{database.dn}",
+ list(self._entry().items()),
+ serverctrls=[control])
+ self.dn = ctrls[0].dn
if order == -1:
database.overlays.append(self)
OpenLDAP server fixtures
"""
-import ldap0
+import ldap
import ldapurl
import logging
import os
import tempfile
import textwrap
-from ldap0.ldapobject import LDAPObject
+from ldap.ldapobject import LDAPObject
SOURCEROOT = pathlib.Path(os.environ.get('TOP_SRCDIR', "..")).absolute()
conn.simple_bind_s('cn=config', self.secret)
moduleload_object = None
- for entry in conn.search_s('cn=config', ldap0.SCOPE_SUBTREE,
+ for dn, attrs in conn.search_s('cn=config', ldap.SCOPE_SUBTREE,
'objectclass=olcModuleList',
['olcModuleLoad']):
if not moduleload_object:
- moduleload_object = entry.dn_s
- for value in entry.entry_s.get('olcModuleLoad', []):
+ moduleload_object = dn
+ for value in attrs.get('olcModuleLoad', []):
+ value = value.decode()
if value[0] == '{':
value = value[value.find('}')+1:]
if pathlib.Path(value).stem == module_name:
if moduleload_object:
conn.modify_s(
moduleload_object,
- [(ldap0.MOD_ADD, b'olcModuleLoad', [str(module).encode()])])
+ [(ldap.MOD_ADD, 'olcModuleLoad', [str(module).encode()])])
else:
- conn.add_s('cn=module,cn=config',
- {'objectClass': [b'olcModuleList'],
- 'olcModuleLoad': [str(module).encode()]})
+ entry = {'objectClass': [b'olcModuleList'],
+ 'olcModuleLoad': [str(module).encode()]}
+ conn.add_s('cn=module,cn=config', list(entry.items()))
@property
def uri(self):
def test_rootdse(server):
conn = server.connect()
- conn.search_s("", scope=ldap0.SCOPE_BASE)
+ conn.search_s("", scope=ldap.SCOPE_BASE)
OpenLDAP fixtures for overlays
"""
-import ldap0
+import ldap
import logging
import os
import pathlib
conn.simple_bind_s("cn=config", server.secret)
conn.modify_s("cn=config", [
- (ldap0.MOD_REPLACE, b"olcServerId", [str(serverid).encode()])])
+ (ldap.MOD_REPLACE, "olcServerId", [str(serverid).encode()])])
server.serverid = serverid
servers[serverid] = server
f'binddn="{db.suffix}" credentials="{db.secret}"').encode())
connections[serverid].modify_s(db.dn, [
- (ldap0.MOD_REPLACE, b"olcSyncrepl", syncrepl),
- (ldap0.MOD_REPLACE, b"olcMultiprovider", [b"TRUE"])])
+ (ldap.MOD_REPLACE, "olcSyncrepl", syncrepl),
+ (ldap.MOD_REPLACE, "olcMultiprovider", [b"TRUE"])])
yield servers
conn.simple_bind_s(db.rootdn, db.secret)
if not entries_added:
- conn.add_s(suffix, {
+ entry = {
"objectClass": [b"organization",
b"domainRelatedObject",
b"dcobject"],
"o": [b"Example, Inc."],
- "associatedDomain": [b"example.com"]})
+ "associatedDomain": [b"example.com"]}
+ conn.add_s(suffix, list(entry.items()))
entries_added.add(suffix)
# Make sure all hosts have the suffix entry
wait_for_resync(suffix, mmr.values())
dn = f"cn=entry{serverid},{suffix}"
- conn.add_s(dn, {"objectClass": [b"device"],
- "description": [(f"Entry created on serverid "
- f"{serverid}").encode()]})
+ entry = {"objectClass": [b"device"],
+ "description": [(f"Entry created on serverid "
+ f"{serverid}").encode()]}
+ conn.add_s(dn, list(entry.items()))
entries_added.add(dn)
connections.append(conn)
wait_for_resync(suffix, mmr.values())
for conn in connections:
- result = conn.search_s(suffix, ldap0.SCOPE_SUBTREE, attrlist=['1.1'])
- dns = {entry.dn_s for entry in result}
+ result = conn.search_s(suffix, ldap.SCOPE_SUBTREE, attrlist=['1.1'])
+ dns = {dn for dn, entry in result}
assert dns == entries_added, \
f"Server {serverid} contents do not match expectations"