]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
ldb:tests: make api_simple module
authorDouglas Bagnall <douglas.bagnall@catalyst.net.nz>
Sat, 17 Aug 2024 02:46:05 +0000 (14:46 +1200)
committerAndreas Schneider <asn@cryptomilk.org>
Tue, 24 Sep 2024 09:14:38 +0000 (09:14 +0000)
Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andreas Schneider <asn@samba.org>
lib/ldb/tests/python/api.py
lib/ldb/tests/python/api_simple.py [new file with mode: 0644]
selftest/tests.py

index 5499694e86c7604603c025158ecae36cfe53bcb9..5b36342ed52faa4cf6cee947b58bdb80146070b8 100755 (executable)
@@ -7,7 +7,6 @@ from unittest import TestCase
 import sys
 sys.path.insert(0, "bin/python")
 import ldb
-import shutil
 import errno
 
 from api_base import (
@@ -59,737 +58,6 @@ class NoContextTests(TestCase):
         self.assertEqual(encoded2, encoded)
 
 
-class SimpleLdb(LdbBaseTest):
-
-    def setUp(self):
-        super().setUp()
-        self.testdir = tempdir()
-        self.filename = os.path.join(self.testdir, "test.ldb")
-        self.ldb = ldb.Ldb(self.url(), flags=self.flags())
-        try:
-            self.ldb.add(self.index)
-        except AttributeError:
-            pass
-
-    def tearDown(self):
-        self.ldb.disconnect()
-        shutil.rmtree(self.testdir)
-        super().tearDown()
-        # Ensure the LDB is closed now, so we close the FD
-
-    def test_connect(self):
-        ldb.Ldb(self.url(), flags=self.flags())
-
-    def test_connect_none(self):
-        ldb.Ldb()
-
-    def test_connect_later(self):
-        x = ldb.Ldb()
-        x.connect(self.url(), flags=self.flags())
-
-    def test_connect_twice(self):
-        url = self.url()
-        x = ldb.Ldb(url)
-        with self.assertRaises(ldb.LdbError):
-            x.connect(url, flags=self.flags())
-
-    def test_connect_twice_later(self):
-        url = self.url()
-        flags = self.flags()
-        x = ldb.Ldb()
-        x.connect(url, flags)
-        with self.assertRaises(ldb.LdbError):
-            x.connect(url, flags)
-
-    def test_connect_and_disconnect(self):
-        url = self.url()
-        flags = self.flags()
-        x = ldb.Ldb()
-        x.connect(url, flags)
-        x.disconnect()
-        x.connect(url, flags)
-        x.disconnect()
-
-    def test_repr(self):
-        x = ldb.Ldb()
-        self.assertTrue(repr(x).startswith("<ldb connection"))
-
-    def test_set_create_perms(self):
-        x = ldb.Ldb()
-        x.set_create_perms(0o600)
-
-    def test_search(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(len(l.search()), 0)
-
-    def test_search_controls(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
-
-    def test_utf8_ldb_Dn(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        dn = ldb.Dn(l, (b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc').decode('utf8'))
-
-    def test_utf8_encoded_ldb_Dn(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        dn_encoded_utf8 = b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc'
-        try:
-            dn = ldb.Dn(l, dn_encoded_utf8)
-        except UnicodeDecodeError as e:
-                raise
-        except TypeError as te:
-           p3errors = ["argument 2 must be str, not bytes",
-                       "Can't convert 'bytes' object to str implicitly"]
-           self.assertIn(str(te), p3errors)
-
-    def test_search_attrs(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
-
-    def test_search_string_dn(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
-
-    def test_search_attr_string(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertRaises(TypeError, l.search, attrs="dc")
-        self.assertRaises(TypeError, l.search, attrs=b"dc")
-
-    def test_opaque(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        l.set_opaque("my_opaque", True)
-        self.assertTrue(l.get_opaque("my_opaque") is not None)
-        self.assertEqual(None, l.get_opaque("unknown"))
-
-    def test_opaque_bool(self):
-        """Test that we can set boolean opaque values."""
-
-        db = ldb.Ldb(self.url(), flags=self.flags())
-        name = "my_opaque"
-
-        db.set_opaque(name, False)
-        self.assertEqual(False, db.get_opaque(name))
-
-        db.set_opaque(name, True)
-        self.assertEqual(True, db.get_opaque(name))
-
-    def test_opaque_int(self):
-        """Test that we can set (positive) integer opaque values."""
-
-        db = ldb.Ldb(self.url(), flags=self.flags())
-        name = "my_opaque"
-
-        db.set_opaque(name, 0)
-        self.assertEqual(0, db.get_opaque(name))
-
-        db.set_opaque(name, 12345678)
-        self.assertEqual(12345678, db.get_opaque(name))
-
-        # Negative values can’t be set.
-        self.assertRaises(OverflowError, db.set_opaque, name, -99999)
-
-    def test_opaque_string(self):
-        """Test that we can set string opaque values."""
-
-        db = ldb.Ldb(self.url(), flags=self.flags())
-        name = "my_opaque"
-
-        db.set_opaque(name, "")
-        self.assertEqual("", db.get_opaque(name))
-
-        db.set_opaque(name, "foo bar")
-        self.assertEqual("foo bar", db.get_opaque(name))
-
-    def test_opaque_none(self):
-        """Test that we can set an opaque to None to effectively unset it."""
-
-        db = ldb.Ldb(self.url(), flags=self.flags())
-        name = "my_opaque"
-
-        # An opaque that has not been set is the same as None.
-        self.assertIsNone(db.get_opaque(name))
-
-        # Give the opaque a value.
-        db.set_opaque(name, 3)
-        self.assertEqual(3, db.get_opaque(name))
-
-        # Test that we can set the opaque to None to unset it.
-        db.set_opaque(name, None)
-        self.assertIsNone(db.get_opaque(name))
-
-    def test_opaque_unsupported(self):
-        """Test that trying to set unsupported values raises an error."""
-
-        db = ldb.Ldb(self.url(), flags=self.flags())
-        name = "my_opaque"
-
-        self.assertRaises(ValueError, db.set_opaque, name, [])
-        self.assertRaises(ValueError, db.set_opaque, name, ())
-        self.assertRaises(ValueError, db.set_opaque, name, 3.14)
-        self.assertRaises(ValueError, db.set_opaque, name, 3+2j)
-        self.assertRaises(ValueError, db.set_opaque, name, b'foo')
-
-    def test_search_scope_base_empty_db(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
-                                      ldb.SCOPE_BASE)), 0)
-
-    def test_search_scope_onelevel_empty_db(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
-                                      ldb.SCOPE_ONELEVEL)), 0)
-
-    def test_delete(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
-
-    def test_delete_w_unhandled_ctrl(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=foo1")
-        m["b"] = [b"a"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
-        l.delete(m.dn)
-
-    def test_contains(self):
-        name = self.url()
-        l = ldb.Ldb(name, flags=self.flags())
-        self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
-        l = ldb.Ldb(name, flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=foo3")
-        m["b"] = ["a"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        try:
-            self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
-            self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
-        finally:
-            l.delete(m.dn)
-
-    def test_get_config_basedn(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(None, l.get_config_basedn())
-
-    def test_get_root_basedn(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(None, l.get_root_basedn())
-
-    def test_get_schema_basedn(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(None, l.get_schema_basedn())
-
-    def test_get_default_basedn(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(None, l.get_default_basedn())
-
-    def test_add(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=foo4")
-        m["bla"] = b"bla"
-        m["objectUUID"] = b"0123456789abcdef"
-        self.assertEqual(len(l.search()), 0)
-        l.add(m)
-        try:
-            self.assertEqual(len(l.search()), 1)
-        finally:
-            l.delete(ldb.Dn(l, "dc=foo4"))
-
-    def test_search_iterator(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        s = l.search_iterator()
-        s.abandon()
-        try:
-            for me in s:
-                self.fail()
-            self.fail()
-        except RuntimeError as re:
-            pass
-        try:
-            s.abandon()
-            self.fail()
-        except RuntimeError as re:
-            pass
-        try:
-            s.result()
-            self.fail()
-        except RuntimeError as re:
-            pass
-
-        s = l.search_iterator()
-        count = 0
-        for me in s:
-            self.assertTrue(isinstance(me, ldb.Message))
-            count += 1
-        r = s.result()
-        self.assertEqual(len(r), 0)
-        self.assertEqual(count, 0)
-
-        m1 = ldb.Message()
-        m1.dn = ldb.Dn(l, "dc=foo4")
-        m1["bla"] = b"bla"
-        m1["objectUUID"] = b"0123456789abcdef"
-        l.add(m1)
-        try:
-            s = l.search_iterator()
-            msgs = []
-            for me in s:
-                self.assertTrue(isinstance(me, ldb.Message))
-                count += 1
-                msgs.append(me)
-            r = s.result()
-            self.assertEqual(len(r), 0)
-            self.assertEqual(len(msgs), 1)
-            self.assertEqual(msgs[0].dn, m1.dn)
-
-            m2 = ldb.Message()
-            m2.dn = ldb.Dn(l, "dc=foo5")
-            m2["bla"] = b"bla"
-            m2["objectUUID"] = b"0123456789abcdee"
-            l.add(m2)
-
-            s = l.search_iterator()
-            msgs = []
-            for me in s:
-                self.assertTrue(isinstance(me, ldb.Message))
-                count += 1
-                msgs.append(me)
-            r = s.result()
-            self.assertEqual(len(r), 0)
-            self.assertEqual(len(msgs), 2)
-            if msgs[0].dn == m1.dn:
-                self.assertEqual(msgs[0].dn, m1.dn)
-                self.assertEqual(msgs[1].dn, m2.dn)
-            else:
-                self.assertEqual(msgs[0].dn, m2.dn)
-                self.assertEqual(msgs[1].dn, m1.dn)
-
-            s = l.search_iterator()
-            msgs = []
-            for me in s:
-                self.assertTrue(isinstance(me, ldb.Message))
-                count += 1
-                msgs.append(me)
-                break
-            try:
-                s.result()
-                self.fail()
-            except RuntimeError as re:
-                pass
-            for me in s:
-                self.assertTrue(isinstance(me, ldb.Message))
-                count += 1
-                msgs.append(me)
-                break
-            for me in s:
-                self.fail()
-
-            r = s.result()
-            self.assertEqual(len(r), 0)
-            self.assertEqual(len(msgs), 2)
-            if msgs[0].dn == m1.dn:
-                self.assertEqual(msgs[0].dn, m1.dn)
-                self.assertEqual(msgs[1].dn, m2.dn)
-            else:
-                self.assertEqual(msgs[0].dn, m2.dn)
-                self.assertEqual(msgs[1].dn, m1.dn)
-        finally:
-            l.delete(ldb.Dn(l, "dc=foo4"))
-            l.delete(ldb.Dn(l, "dc=foo5"))
-
-    def test_add_text(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=foo4")
-        m["bla"] = "bla"
-        m["objectUUID"] = b"0123456789abcdef"
-        self.assertEqual(len(l.search()), 0)
-        l.add(m)
-        try:
-            self.assertEqual(len(l.search()), 1)
-        finally:
-            l.delete(ldb.Dn(l, "dc=foo4"))
-
-    def test_add_w_unhandled_ctrl(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=foo4")
-        m["bla"] = b"bla"
-        self.assertEqual(len(l.search()), 0)
-        self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
-
-    def test_add_dict(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = {"dn": ldb.Dn(l, "dc=foo5"),
-             "bla": b"bla",
-             "objectUUID": b"0123456789abcdef"}
-        self.assertEqual(len(l.search()), 0)
-        l.add(m)
-        try:
-            self.assertEqual(len(l.search()), 1)
-        finally:
-            l.delete(ldb.Dn(l, "dc=foo5"))
-
-    def test_add_dict_text(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = {"dn": ldb.Dn(l, "dc=foo5"),
-             "bla": "bla",
-             "objectUUID": b"0123456789abcdef"}
-        self.assertEqual(len(l.search()), 0)
-        l.add(m)
-        try:
-            self.assertEqual(len(l.search()), 1)
-        finally:
-            l.delete(ldb.Dn(l, "dc=foo5"))
-
-    def test_add_dict_string_dn(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = {"dn": "dc=foo6", "bla": b"bla",
-             "objectUUID": b"0123456789abcdef"}
-        self.assertEqual(len(l.search()), 0)
-        l.add(m)
-        try:
-            self.assertEqual(len(l.search()), 1)
-        finally:
-            l.delete(ldb.Dn(l, "dc=foo6"))
-
-    def test_add_dict_bytes_dn(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = {"dn": b"dc=foo6", "bla": b"bla",
-             "objectUUID": b"0123456789abcdef"}
-        self.assertEqual(len(l.search()), 0)
-        l.add(m)
-        try:
-            self.assertEqual(len(l.search()), 1)
-        finally:
-            l.delete(ldb.Dn(l, "dc=foo6"))
-
-    def test_rename(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=foo7")
-        m["bla"] = b"bla"
-        m["objectUUID"] = b"0123456789abcdef"
-        self.assertEqual(len(l.search()), 0)
-        l.add(m)
-        try:
-            l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
-            self.assertEqual(len(l.search()), 1)
-        finally:
-            l.delete(ldb.Dn(l, "dc=bar"))
-
-    def test_rename_string_dns(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=foo8")
-        m["bla"] = b"bla"
-        m["objectUUID"] = b"0123456789abcdef"
-        self.assertEqual(len(l.search()), 0)
-        l.add(m)
-        self.assertEqual(len(l.search()), 1)
-        try:
-            l.rename("dc=foo8", "dc=bar")
-            self.assertEqual(len(l.search()), 1)
-        finally:
-            l.delete(ldb.Dn(l, "dc=bar"))
-
-    def test_rename_bad_string_dns(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=foo8")
-        m["bla"] = b"bla"
-        m["objectUUID"] = b"0123456789abcdef"
-        self.assertEqual(len(l.search()), 0)
-        l.add(m)
-        self.assertEqual(len(l.search()), 1)
-        self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
-        self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
-        l.delete(ldb.Dn(l, "dc=foo8"))
-
-    def test_empty_dn(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertEqual(0, len(l.search()))
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=empty")
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        rm = l.search()
-        self.assertEqual(1, len(rm))
-        self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
-                         set(rm[0].keys()))
-
-        rm = l.search(m.dn)
-        self.assertEqual(1, len(rm))
-        self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
-                         set(rm[0].keys()))
-        rm = l.search(m.dn, attrs=["blah"])
-        self.assertEqual(1, len(rm))
-        self.assertEqual(0, len(rm[0]))
-
-    def test_modify_delete(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=modifydelete")
-        m["bla"] = [b"1234"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        rm = l.search(m.dn)[0]
-        self.assertEqual([b"1234"], list(rm["bla"]))
-        try:
-            m = ldb.Message()
-            m.dn = ldb.Dn(l, "dc=modifydelete")
-            m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
-            self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
-            l.modify(m)
-            rm = l.search(m.dn)
-            self.assertEqual(1, len(rm))
-            self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
-                             set(rm[0].keys()))
-            rm = l.search(m.dn, attrs=["bla"])
-            self.assertEqual(1, len(rm))
-            self.assertEqual(0, len(rm[0]))
-        finally:
-            l.delete(ldb.Dn(l, "dc=modifydelete"))
-
-    def test_modify_delete_text(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=modifydelete")
-        m.text["bla"] = ["1234"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        rm = l.search(m.dn)[0]
-        self.assertEqual(["1234"], list(rm.text["bla"]))
-        try:
-            m = ldb.Message()
-            m.dn = ldb.Dn(l, "dc=modifydelete")
-            m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
-            self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
-            l.modify(m)
-            rm = l.search(m.dn)
-            self.assertEqual(1, len(rm))
-            self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
-                             set(rm[0].keys()))
-            rm = l.search(m.dn, attrs=["bla"])
-            self.assertEqual(1, len(rm))
-            self.assertEqual(0, len(rm[0]))
-        finally:
-            l.delete(ldb.Dn(l, "dc=modifydelete"))
-
-    def test_modify_add(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=add")
-        m["bla"] = [b"1234"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        try:
-            m = ldb.Message()
-            m.dn = ldb.Dn(l, "dc=add")
-            m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
-            self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
-            l.modify(m)
-            rm = l.search(m.dn)[0]
-            self.assertEqual(3, len(rm))
-            self.assertEqual([b"1234", b"456"], list(rm["bla"]))
-        finally:
-            l.delete(ldb.Dn(l, "dc=add"))
-
-    def test_modify_add_text(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=add")
-        m.text["bla"] = ["1234"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        try:
-            m = ldb.Message()
-            m.dn = ldb.Dn(l, "dc=add")
-            m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
-            self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
-            l.modify(m)
-            rm = l.search(m.dn)[0]
-            self.assertEqual(3, len(rm))
-            self.assertEqual(["1234", "456"], list(rm.text["bla"]))
-        finally:
-            l.delete(ldb.Dn(l, "dc=add"))
-
-    def test_modify_replace(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=modify2")
-        m["bla"] = [b"1234", b"456"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        try:
-            m = ldb.Message()
-            m.dn = ldb.Dn(l, "dc=modify2")
-            m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
-            self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
-            l.modify(m)
-            rm = l.search(m.dn)[0]
-            self.assertEqual(3, len(rm))
-            self.assertEqual([b"789"], list(rm["bla"]))
-            rm = l.search(m.dn, attrs=["bla"])[0]
-            self.assertEqual(1, len(rm))
-        finally:
-            l.delete(ldb.Dn(l, "dc=modify2"))
-
-    def test_modify_replace_text(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=modify2")
-        m.text["bla"] = ["1234", "456"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        try:
-            m = ldb.Message()
-            m.dn = ldb.Dn(l, "dc=modify2")
-            m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
-            self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
-            l.modify(m)
-            rm = l.search(m.dn)[0]
-            self.assertEqual(3, len(rm))
-            self.assertEqual(["789"], list(rm.text["bla"]))
-            rm = l.search(m.dn, attrs=["bla"])[0]
-            self.assertEqual(1, len(rm))
-        finally:
-            l.delete(ldb.Dn(l, "dc=modify2"))
-
-    def test_modify_flags_change(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=add")
-        m["bla"] = [b"1234"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        try:
-            m = ldb.Message()
-            m.dn = ldb.Dn(l, "dc=add")
-            m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
-            self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
-            l.modify(m)
-            rm = l.search(m.dn)[0]
-            self.assertEqual(3, len(rm))
-            self.assertEqual([b"1234", b"456"], list(rm["bla"]))
-
-            # Now create another modify, but switch the flags before we do it
-            m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
-            m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
-            l.modify(m)
-            rm = l.search(m.dn, attrs=["bla"])[0]
-            self.assertEqual(1, len(rm))
-            self.assertEqual([b"1234"], list(rm["bla"]))
-        finally:
-            l.delete(ldb.Dn(l, "dc=add"))
-
-    def test_modify_flags_change_text(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        m = ldb.Message()
-        m.dn = ldb.Dn(l, "dc=add")
-        m.text["bla"] = ["1234"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        try:
-            m = ldb.Message()
-            m.dn = ldb.Dn(l, "dc=add")
-            m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
-            self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
-            l.modify(m)
-            rm = l.search(m.dn)[0]
-            self.assertEqual(3, len(rm))
-            self.assertEqual(["1234", "456"], list(rm.text["bla"]))
-
-            # Now create another modify, but switch the flags before we do it
-            m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
-            m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
-            l.modify(m)
-            rm = l.search(m.dn, attrs=["bla"])[0]
-            self.assertEqual(1, len(rm))
-            self.assertEqual(["1234"], list(rm.text["bla"]))
-        finally:
-            l.delete(ldb.Dn(l, "dc=add"))
-
-    def test_transaction_commit(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        l.transaction_start()
-        m = ldb.Message(ldb.Dn(l, "dc=foo9"))
-        m["foo"] = [b"bar"]
-        m["objectUUID"] = b"0123456789abcdef"
-        l.add(m)
-        l.transaction_commit()
-        l.delete(m.dn)
-
-    def test_transaction_cancel(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        l.transaction_start()
-        m = ldb.Message(ldb.Dn(l, "dc=foo10"))
-        m["foo"] = [b"bar"]
-        m["objectUUID"] = b"0123456789abcdee"
-        l.add(m)
-        l.transaction_cancel()
-        self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
-
-    def test_set_debug(self):
-        def my_report_fn(level, text):
-            pass
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        l.set_debug(my_report_fn)
-
-    def test_zero_byte_string(self):
-        """Testing we do not get trapped in the \0 byte in a property string."""
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        l.add({
-            "dn": b"dc=somedn",
-            "objectclass": b"user",
-            "cN": b"LDAPtestUSER",
-            "givenname": b"ldap",
-            "displayname": b"foo\0bar",
-            "objectUUID": b"0123456789abcdef"
-        })
-        res = l.search(expression="(dn=dc=somedn)")
-        self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
-
-    def test_no_crash_broken_expr(self):
-        l = ldb.Ldb(self.url(), flags=self.flags())
-        self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
-
-# Run the SimpleLdb tests against an lmdb backend
-
-
-class SimpleLdbLmdb(SimpleLdb):
-
-    def setUp(self):
-        if os.environ.get('HAVE_LMDB', '1') == '0':
-            self.skipTest("No lmdb backend")
-        self.prefix = MDB_PREFIX
-        self.index = MDB_INDEX_OBJ
-        super().setUp()
-
-
-class SimpleLdbNoLmdb(LdbBaseTest):
-
-    def setUp(self):
-        if os.environ.get('HAVE_LMDB', '1') != '0':
-            self.skipTest("lmdb backend enabled")
-        self.prefix = MDB_PREFIX
-        self.index = MDB_INDEX_OBJ
-        super().setUp()
-
-    def test_lmdb_disabled(self):
-        self.testdir = tempdir()
-        self.filename = os.path.join(self.testdir, "test.ldb")
-        try:
-            self.ldb = ldb.Ldb(self.url(), flags=self.flags())
-            self.fail("Should have failed on missing LMDB")
-        except ldb.LdbError as err:
-            enum = err.args[0]
-            self.assertEqual(enum, ldb.ERR_OTHER)
-
-
 class BadIndexTests(LdbBaseTest):
     def setUp(self):
         super().setUp()
diff --git a/lib/ldb/tests/python/api_simple.py b/lib/ldb/tests/python/api_simple.py
new file mode 100644 (file)
index 0000000..4d74e2f
--- /dev/null
@@ -0,0 +1,752 @@
+#!/usr/bin/env python3
+# Simple tests for the ldb python bindings.
+# Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
+
+import os
+import sys
+sys.path.insert(0, "bin/python")
+from api_base import (
+    MDB_PREFIX,
+    MDB_INDEX_OBJ,
+    tempdir,
+    LdbBaseTest
+)
+
+import ldb
+import shutil
+
+
+class SimpleLdb(LdbBaseTest):
+
+    def setUp(self):
+        super().setUp()
+        self.testdir = tempdir()
+        self.filename = os.path.join(self.testdir, "test.ldb")
+        self.ldb = ldb.Ldb(self.url(), flags=self.flags())
+        try:
+            self.ldb.add(self.index)
+        except AttributeError:
+            pass
+
+    def tearDown(self):
+        self.ldb.disconnect()
+        shutil.rmtree(self.testdir)
+        super().tearDown()
+        # Ensure the LDB is closed now, so we close the FD
+
+    def test_connect(self):
+        ldb.Ldb(self.url(), flags=self.flags())
+
+    def test_connect_none(self):
+        ldb.Ldb()
+
+    def test_connect_later(self):
+        x = ldb.Ldb()
+        x.connect(self.url(), flags=self.flags())
+
+    def test_connect_twice(self):
+        url = self.url()
+        x = ldb.Ldb(url)
+        with self.assertRaises(ldb.LdbError):
+            x.connect(url, flags=self.flags())
+
+    def test_connect_twice_later(self):
+        url = self.url()
+        flags = self.flags()
+        x = ldb.Ldb()
+        x.connect(url, flags)
+        with self.assertRaises(ldb.LdbError):
+            x.connect(url, flags)
+
+    def test_connect_and_disconnect(self):
+        url = self.url()
+        flags = self.flags()
+        x = ldb.Ldb()
+        x.connect(url, flags)
+        x.disconnect()
+        x.connect(url, flags)
+        x.disconnect()
+
+    def test_repr(self):
+        x = ldb.Ldb()
+        self.assertTrue(repr(x).startswith("<ldb connection"))
+
+    def test_set_create_perms(self):
+        x = ldb.Ldb()
+        x.set_create_perms(0o600)
+
+    def test_search(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(len(l.search()), 0)
+
+    def test_search_controls(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
+
+    def test_utf8_ldb_Dn(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        dn = ldb.Dn(l, (b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc').decode('utf8'))
+
+    def test_utf8_encoded_ldb_Dn(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        dn_encoded_utf8 = b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc'
+        try:
+            dn = ldb.Dn(l, dn_encoded_utf8)
+        except UnicodeDecodeError as e:
+                raise
+        except TypeError as te:
+           p3errors = ["argument 2 must be str, not bytes",
+                       "Can't convert 'bytes' object to str implicitly"]
+           self.assertIn(str(te), p3errors)
+
+    def test_search_attrs(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
+
+    def test_search_string_dn(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
+
+    def test_search_attr_string(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertRaises(TypeError, l.search, attrs="dc")
+        self.assertRaises(TypeError, l.search, attrs=b"dc")
+
+    def test_opaque(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        l.set_opaque("my_opaque", True)
+        self.assertTrue(l.get_opaque("my_opaque") is not None)
+        self.assertEqual(None, l.get_opaque("unknown"))
+
+    def test_opaque_bool(self):
+        """Test that we can set boolean opaque values."""
+
+        db = ldb.Ldb(self.url(), flags=self.flags())
+        name = "my_opaque"
+
+        db.set_opaque(name, False)
+        self.assertEqual(False, db.get_opaque(name))
+
+        db.set_opaque(name, True)
+        self.assertEqual(True, db.get_opaque(name))
+
+    def test_opaque_int(self):
+        """Test that we can set (positive) integer opaque values."""
+
+        db = ldb.Ldb(self.url(), flags=self.flags())
+        name = "my_opaque"
+
+        db.set_opaque(name, 0)
+        self.assertEqual(0, db.get_opaque(name))
+
+        db.set_opaque(name, 12345678)
+        self.assertEqual(12345678, db.get_opaque(name))
+
+        # Negative values can’t be set.
+        self.assertRaises(OverflowError, db.set_opaque, name, -99999)
+
+    def test_opaque_string(self):
+        """Test that we can set string opaque values."""
+
+        db = ldb.Ldb(self.url(), flags=self.flags())
+        name = "my_opaque"
+
+        db.set_opaque(name, "")
+        self.assertEqual("", db.get_opaque(name))
+
+        db.set_opaque(name, "foo bar")
+        self.assertEqual("foo bar", db.get_opaque(name))
+
+    def test_opaque_none(self):
+        """Test that we can set an opaque to None to effectively unset it."""
+
+        db = ldb.Ldb(self.url(), flags=self.flags())
+        name = "my_opaque"
+
+        # An opaque that has not been set is the same as None.
+        self.assertIsNone(db.get_opaque(name))
+
+        # Give the opaque a value.
+        db.set_opaque(name, 3)
+        self.assertEqual(3, db.get_opaque(name))
+
+        # Test that we can set the opaque to None to unset it.
+        db.set_opaque(name, None)
+        self.assertIsNone(db.get_opaque(name))
+
+    def test_opaque_unsupported(self):
+        """Test that trying to set unsupported values raises an error."""
+
+        db = ldb.Ldb(self.url(), flags=self.flags())
+        name = "my_opaque"
+
+        self.assertRaises(ValueError, db.set_opaque, name, [])
+        self.assertRaises(ValueError, db.set_opaque, name, ())
+        self.assertRaises(ValueError, db.set_opaque, name, 3.14)
+        self.assertRaises(ValueError, db.set_opaque, name, 3+2j)
+        self.assertRaises(ValueError, db.set_opaque, name, b'foo')
+
+    def test_search_scope_base_empty_db(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
+                                      ldb.SCOPE_BASE)), 0)
+
+    def test_search_scope_onelevel_empty_db(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
+                                      ldb.SCOPE_ONELEVEL)), 0)
+
+    def test_delete(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
+
+    def test_delete_w_unhandled_ctrl(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=foo1")
+        m["b"] = [b"a"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
+        l.delete(m.dn)
+
+    def test_contains(self):
+        name = self.url()
+        l = ldb.Ldb(name, flags=self.flags())
+        self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
+        l = ldb.Ldb(name, flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=foo3")
+        m["b"] = ["a"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        try:
+            self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
+            self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
+        finally:
+            l.delete(m.dn)
+
+    def test_get_config_basedn(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(None, l.get_config_basedn())
+
+    def test_get_root_basedn(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(None, l.get_root_basedn())
+
+    def test_get_schema_basedn(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(None, l.get_schema_basedn())
+
+    def test_get_default_basedn(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(None, l.get_default_basedn())
+
+    def test_add(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=foo4")
+        m["bla"] = b"bla"
+        m["objectUUID"] = b"0123456789abcdef"
+        self.assertEqual(len(l.search()), 0)
+        l.add(m)
+        try:
+            self.assertEqual(len(l.search()), 1)
+        finally:
+            l.delete(ldb.Dn(l, "dc=foo4"))
+
+    def test_search_iterator(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        s = l.search_iterator()
+        s.abandon()
+        try:
+            for me in s:
+                self.fail()
+            self.fail()
+        except RuntimeError as re:
+            pass
+        try:
+            s.abandon()
+            self.fail()
+        except RuntimeError as re:
+            pass
+        try:
+            s.result()
+            self.fail()
+        except RuntimeError as re:
+            pass
+
+        s = l.search_iterator()
+        count = 0
+        for me in s:
+            self.assertTrue(isinstance(me, ldb.Message))
+            count += 1
+        r = s.result()
+        self.assertEqual(len(r), 0)
+        self.assertEqual(count, 0)
+
+        m1 = ldb.Message()
+        m1.dn = ldb.Dn(l, "dc=foo4")
+        m1["bla"] = b"bla"
+        m1["objectUUID"] = b"0123456789abcdef"
+        l.add(m1)
+        try:
+            s = l.search_iterator()
+            msgs = []
+            for me in s:
+                self.assertTrue(isinstance(me, ldb.Message))
+                count += 1
+                msgs.append(me)
+            r = s.result()
+            self.assertEqual(len(r), 0)
+            self.assertEqual(len(msgs), 1)
+            self.assertEqual(msgs[0].dn, m1.dn)
+
+            m2 = ldb.Message()
+            m2.dn = ldb.Dn(l, "dc=foo5")
+            m2["bla"] = b"bla"
+            m2["objectUUID"] = b"0123456789abcdee"
+            l.add(m2)
+
+            s = l.search_iterator()
+            msgs = []
+            for me in s:
+                self.assertTrue(isinstance(me, ldb.Message))
+                count += 1
+                msgs.append(me)
+            r = s.result()
+            self.assertEqual(len(r), 0)
+            self.assertEqual(len(msgs), 2)
+            if msgs[0].dn == m1.dn:
+                self.assertEqual(msgs[0].dn, m1.dn)
+                self.assertEqual(msgs[1].dn, m2.dn)
+            else:
+                self.assertEqual(msgs[0].dn, m2.dn)
+                self.assertEqual(msgs[1].dn, m1.dn)
+
+            s = l.search_iterator()
+            msgs = []
+            for me in s:
+                self.assertTrue(isinstance(me, ldb.Message))
+                count += 1
+                msgs.append(me)
+                break
+            try:
+                s.result()
+                self.fail()
+            except RuntimeError as re:
+                pass
+            for me in s:
+                self.assertTrue(isinstance(me, ldb.Message))
+                count += 1
+                msgs.append(me)
+                break
+            for me in s:
+                self.fail()
+
+            r = s.result()
+            self.assertEqual(len(r), 0)
+            self.assertEqual(len(msgs), 2)
+            if msgs[0].dn == m1.dn:
+                self.assertEqual(msgs[0].dn, m1.dn)
+                self.assertEqual(msgs[1].dn, m2.dn)
+            else:
+                self.assertEqual(msgs[0].dn, m2.dn)
+                self.assertEqual(msgs[1].dn, m1.dn)
+        finally:
+            l.delete(ldb.Dn(l, "dc=foo4"))
+            l.delete(ldb.Dn(l, "dc=foo5"))
+
+    def test_add_text(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=foo4")
+        m["bla"] = "bla"
+        m["objectUUID"] = b"0123456789abcdef"
+        self.assertEqual(len(l.search()), 0)
+        l.add(m)
+        try:
+            self.assertEqual(len(l.search()), 1)
+        finally:
+            l.delete(ldb.Dn(l, "dc=foo4"))
+
+    def test_add_w_unhandled_ctrl(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=foo4")
+        m["bla"] = b"bla"
+        self.assertEqual(len(l.search()), 0)
+        self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
+
+    def test_add_dict(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = {"dn": ldb.Dn(l, "dc=foo5"),
+             "bla": b"bla",
+             "objectUUID": b"0123456789abcdef"}
+        self.assertEqual(len(l.search()), 0)
+        l.add(m)
+        try:
+            self.assertEqual(len(l.search()), 1)
+        finally:
+            l.delete(ldb.Dn(l, "dc=foo5"))
+
+    def test_add_dict_text(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = {"dn": ldb.Dn(l, "dc=foo5"),
+             "bla": "bla",
+             "objectUUID": b"0123456789abcdef"}
+        self.assertEqual(len(l.search()), 0)
+        l.add(m)
+        try:
+            self.assertEqual(len(l.search()), 1)
+        finally:
+            l.delete(ldb.Dn(l, "dc=foo5"))
+
+    def test_add_dict_string_dn(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = {"dn": "dc=foo6", "bla": b"bla",
+             "objectUUID": b"0123456789abcdef"}
+        self.assertEqual(len(l.search()), 0)
+        l.add(m)
+        try:
+            self.assertEqual(len(l.search()), 1)
+        finally:
+            l.delete(ldb.Dn(l, "dc=foo6"))
+
+    def test_add_dict_bytes_dn(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = {"dn": b"dc=foo6", "bla": b"bla",
+             "objectUUID": b"0123456789abcdef"}
+        self.assertEqual(len(l.search()), 0)
+        l.add(m)
+        try:
+            self.assertEqual(len(l.search()), 1)
+        finally:
+            l.delete(ldb.Dn(l, "dc=foo6"))
+
+    def test_rename(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=foo7")
+        m["bla"] = b"bla"
+        m["objectUUID"] = b"0123456789abcdef"
+        self.assertEqual(len(l.search()), 0)
+        l.add(m)
+        try:
+            l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
+            self.assertEqual(len(l.search()), 1)
+        finally:
+            l.delete(ldb.Dn(l, "dc=bar"))
+
+    def test_rename_string_dns(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=foo8")
+        m["bla"] = b"bla"
+        m["objectUUID"] = b"0123456789abcdef"
+        self.assertEqual(len(l.search()), 0)
+        l.add(m)
+        self.assertEqual(len(l.search()), 1)
+        try:
+            l.rename("dc=foo8", "dc=bar")
+            self.assertEqual(len(l.search()), 1)
+        finally:
+            l.delete(ldb.Dn(l, "dc=bar"))
+
+    def test_rename_bad_string_dns(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=foo8")
+        m["bla"] = b"bla"
+        m["objectUUID"] = b"0123456789abcdef"
+        self.assertEqual(len(l.search()), 0)
+        l.add(m)
+        self.assertEqual(len(l.search()), 1)
+        self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
+        self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
+        l.delete(ldb.Dn(l, "dc=foo8"))
+
+    def test_empty_dn(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertEqual(0, len(l.search()))
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=empty")
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        rm = l.search()
+        self.assertEqual(1, len(rm))
+        self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
+                         set(rm[0].keys()))
+
+        rm = l.search(m.dn)
+        self.assertEqual(1, len(rm))
+        self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
+                         set(rm[0].keys()))
+        rm = l.search(m.dn, attrs=["blah"])
+        self.assertEqual(1, len(rm))
+        self.assertEqual(0, len(rm[0]))
+
+    def test_modify_delete(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=modifydelete")
+        m["bla"] = [b"1234"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        rm = l.search(m.dn)[0]
+        self.assertEqual([b"1234"], list(rm["bla"]))
+        try:
+            m = ldb.Message()
+            m.dn = ldb.Dn(l, "dc=modifydelete")
+            m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
+            self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
+            l.modify(m)
+            rm = l.search(m.dn)
+            self.assertEqual(1, len(rm))
+            self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
+                             set(rm[0].keys()))
+            rm = l.search(m.dn, attrs=["bla"])
+            self.assertEqual(1, len(rm))
+            self.assertEqual(0, len(rm[0]))
+        finally:
+            l.delete(ldb.Dn(l, "dc=modifydelete"))
+
+    def test_modify_delete_text(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=modifydelete")
+        m.text["bla"] = ["1234"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        rm = l.search(m.dn)[0]
+        self.assertEqual(["1234"], list(rm.text["bla"]))
+        try:
+            m = ldb.Message()
+            m.dn = ldb.Dn(l, "dc=modifydelete")
+            m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
+            self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
+            l.modify(m)
+            rm = l.search(m.dn)
+            self.assertEqual(1, len(rm))
+            self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
+                             set(rm[0].keys()))
+            rm = l.search(m.dn, attrs=["bla"])
+            self.assertEqual(1, len(rm))
+            self.assertEqual(0, len(rm[0]))
+        finally:
+            l.delete(ldb.Dn(l, "dc=modifydelete"))
+
+    def test_modify_add(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=add")
+        m["bla"] = [b"1234"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        try:
+            m = ldb.Message()
+            m.dn = ldb.Dn(l, "dc=add")
+            m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
+            self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
+            l.modify(m)
+            rm = l.search(m.dn)[0]
+            self.assertEqual(3, len(rm))
+            self.assertEqual([b"1234", b"456"], list(rm["bla"]))
+        finally:
+            l.delete(ldb.Dn(l, "dc=add"))
+
+    def test_modify_add_text(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=add")
+        m.text["bla"] = ["1234"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        try:
+            m = ldb.Message()
+            m.dn = ldb.Dn(l, "dc=add")
+            m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
+            self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
+            l.modify(m)
+            rm = l.search(m.dn)[0]
+            self.assertEqual(3, len(rm))
+            self.assertEqual(["1234", "456"], list(rm.text["bla"]))
+        finally:
+            l.delete(ldb.Dn(l, "dc=add"))
+
+    def test_modify_replace(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=modify2")
+        m["bla"] = [b"1234", b"456"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        try:
+            m = ldb.Message()
+            m.dn = ldb.Dn(l, "dc=modify2")
+            m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
+            self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
+            l.modify(m)
+            rm = l.search(m.dn)[0]
+            self.assertEqual(3, len(rm))
+            self.assertEqual([b"789"], list(rm["bla"]))
+            rm = l.search(m.dn, attrs=["bla"])[0]
+            self.assertEqual(1, len(rm))
+        finally:
+            l.delete(ldb.Dn(l, "dc=modify2"))
+
+    def test_modify_replace_text(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=modify2")
+        m.text["bla"] = ["1234", "456"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        try:
+            m = ldb.Message()
+            m.dn = ldb.Dn(l, "dc=modify2")
+            m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
+            self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
+            l.modify(m)
+            rm = l.search(m.dn)[0]
+            self.assertEqual(3, len(rm))
+            self.assertEqual(["789"], list(rm.text["bla"]))
+            rm = l.search(m.dn, attrs=["bla"])[0]
+            self.assertEqual(1, len(rm))
+        finally:
+            l.delete(ldb.Dn(l, "dc=modify2"))
+
+    def test_modify_flags_change(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=add")
+        m["bla"] = [b"1234"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        try:
+            m = ldb.Message()
+            m.dn = ldb.Dn(l, "dc=add")
+            m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
+            self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
+            l.modify(m)
+            rm = l.search(m.dn)[0]
+            self.assertEqual(3, len(rm))
+            self.assertEqual([b"1234", b"456"], list(rm["bla"]))
+
+            # Now create another modify, but switch the flags before we do it
+            m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
+            m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
+            l.modify(m)
+            rm = l.search(m.dn, attrs=["bla"])[0]
+            self.assertEqual(1, len(rm))
+            self.assertEqual([b"1234"], list(rm["bla"]))
+        finally:
+            l.delete(ldb.Dn(l, "dc=add"))
+
+    def test_modify_flags_change_text(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        m = ldb.Message()
+        m.dn = ldb.Dn(l, "dc=add")
+        m.text["bla"] = ["1234"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        try:
+            m = ldb.Message()
+            m.dn = ldb.Dn(l, "dc=add")
+            m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
+            self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
+            l.modify(m)
+            rm = l.search(m.dn)[0]
+            self.assertEqual(3, len(rm))
+            self.assertEqual(["1234", "456"], list(rm.text["bla"]))
+
+            # Now create another modify, but switch the flags before we do it
+            m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
+            m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
+            l.modify(m)
+            rm = l.search(m.dn, attrs=["bla"])[0]
+            self.assertEqual(1, len(rm))
+            self.assertEqual(["1234"], list(rm.text["bla"]))
+        finally:
+            l.delete(ldb.Dn(l, "dc=add"))
+
+    def test_transaction_commit(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        l.transaction_start()
+        m = ldb.Message(ldb.Dn(l, "dc=foo9"))
+        m["foo"] = [b"bar"]
+        m["objectUUID"] = b"0123456789abcdef"
+        l.add(m)
+        l.transaction_commit()
+        l.delete(m.dn)
+
+    def test_transaction_cancel(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        l.transaction_start()
+        m = ldb.Message(ldb.Dn(l, "dc=foo10"))
+        m["foo"] = [b"bar"]
+        m["objectUUID"] = b"0123456789abcdee"
+        l.add(m)
+        l.transaction_cancel()
+        self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
+
+    def test_set_debug(self):
+        def my_report_fn(level, text):
+            pass
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        l.set_debug(my_report_fn)
+
+    def test_zero_byte_string(self):
+        """Testing we do not get trapped in the \0 byte in a property string."""
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        l.add({
+            "dn": b"dc=somedn",
+            "objectclass": b"user",
+            "cN": b"LDAPtestUSER",
+            "givenname": b"ldap",
+            "displayname": b"foo\0bar",
+            "objectUUID": b"0123456789abcdef"
+        })
+        res = l.search(expression="(dn=dc=somedn)")
+        self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
+
+    def test_no_crash_broken_expr(self):
+        l = ldb.Ldb(self.url(), flags=self.flags())
+        self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
+
+# Run the SimpleLdb tests against an lmdb backend
+
+
+class SimpleLdbLmdb(SimpleLdb):
+
+    def setUp(self):
+        if os.environ.get('HAVE_LMDB', '1') == '0':
+            self.skipTest("No lmdb backend")
+        self.prefix = MDB_PREFIX
+        self.index = MDB_INDEX_OBJ
+        super().setUp()
+
+
+class SimpleLdbNoLmdb(LdbBaseTest):
+
+    def setUp(self):
+        if os.environ.get('HAVE_LMDB', '1') != '0':
+            self.skipTest("lmdb backend enabled")
+        self.prefix = MDB_PREFIX
+        self.index = MDB_INDEX_OBJ
+        super().setUp()
+
+    def test_lmdb_disabled(self):
+        self.testdir = tempdir()
+        self.filename = os.path.join(self.testdir, "test.ldb")
+        try:
+            self.ldb = ldb.Ldb(self.url(), flags=self.flags())
+            self.fail("Should have failed on missing LMDB")
+        except ldb.LdbError as err:
+            enum = err.args[0]
+            self.assertEqual(enum, ldb.ERR_OTHER)
+
+
+if __name__ == '__main__':
+    import unittest
+    unittest.TestProgram()
index a0fc55cb03696bfb5b6c8c3144c5e88705b9076b..19842f30234140b87eca2b6ee421e07c746ddff6 100644 (file)
@@ -81,6 +81,10 @@ planpythontestsuite("none", "api_add_modify",
                     name="ldb.python.api_add_modify",
                     extra_path=['lib/ldb/tests/python'],
                     environ={'HAVE_LMDB': str(int(have_lmdb))})
+planpythontestsuite("none", "api_simple",
+                    name="ldb.python.api_simple",
+                    extra_path=['lib/ldb/tests/python'],
+                    environ={'HAVE_LMDB': str(int(have_lmdb))})
 planpythontestsuite("none", "crash",
                     name="ldb.python.crash",
                     extra_path=['lib/ldb/tests/python'],
@@ -112,6 +116,11 @@ planpythontestsuite("none", "api_add_modify",
                     extra_path=['lib/ldb/tests/python'],
                     environ={'LC_ALL': 'tr_TR.UTF-8',
                              'HAVE_LMDB': str(int(have_lmdb))})
+planpythontestsuite("none", "api_simple",
+                    name="ldb.python.api_simple.tr",
+                    extra_path=['lib/ldb/tests/python'],
+                    environ={'LC_ALL': 'tr_TR.UTF-8',
+                             'HAVE_LMDB': str(int(have_lmdb))})
 planpythontestsuite("none", "index",
                     name="ldb.python.index.tr",
                     extra_path=['lib/ldb/tests/python'],