]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
CVE-2021-3670 tests/krb5/test_ldap.py: Add test for LDAP timeouts
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Thu, 26 Aug 2021 09:18:26 +0000 (21:18 +1200)
committerDouglas Bagnall <dbagnall@samba.org>
Thu, 25 Nov 2021 01:41:30 +0000 (01:41 +0000)
We allow a timeout of 2x over to avoid this being a flapping test.
Samba is not very accurate on the timeout, which is not otherwise an
issue but makes this test fail sometimes.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14694

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
selftest/knownfail.d/ldap-timeout [new file with mode: 0644]
source4/dsdb/tests/python/large_ldap.py

diff --git a/selftest/knownfail.d/ldap-timeout b/selftest/knownfail.d/ldap-timeout
new file mode 100644 (file)
index 0000000..378ca1f
--- /dev/null
@@ -0,0 +1 @@
+samba4.ldap.large_ldap\..*\.python\(.*\).__main__.LargeLDAPTest.test_timeout\(.*\)
\ No newline at end of file
index 0bf73f988d83e8445cf06143bdd79de41d0403a4..f1fc13939e5d806f6346df56eb1a88f45e7bb987 100644 (file)
@@ -23,6 +23,7 @@ import optparse
 import sys
 import os
 import random
+import time
 
 sys.path.insert(0, "bin/python")
 import samba
@@ -244,6 +245,68 @@ class LargeLDAPTest(samba.tests.TestCase):
         # Assert we don't get all the entries but still the error
         self.assertGreater(count, count_jpeg)
 
+    def test_timeout(self):
+        policy_dn = ldb.Dn(self.ldb,
+                           'CN=Default Query Policy,CN=Query-Policies,'
+                           'CN=Directory Service,CN=Windows NT,CN=Services,'
+                           f'{self.ldb.get_config_basedn().get_linearized()}')
+
+        # Get the current value of lDAPAdminLimits.
+        res = self.ldb.search(base=policy_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['lDAPAdminLimits'])
+        msg = res[0]
+        admin_limits = msg['lDAPAdminLimits']
+
+        # Ensure we restore the previous value of the attribute.
+        admin_limits.set_flags(ldb.FLAG_MOD_REPLACE)
+        self.addCleanup(self.ldb.modify, msg)
+
+        # Temporarily lower the value of MaxQueryDuration so we can test
+        # timeout behaviour.
+        timeout = 5
+        query_duration = f'MaxQueryDuration={timeout}'.encode()
+
+        admin_limits = [limit for limit in admin_limits
+                        if not limit.lower().startswith(b'maxqueryduration=')]
+        admin_limits.append(query_duration)
+
+        # Set the new attribute value.
+        msg = ldb.Message(policy_dn)
+        msg['lDAPAdminLimits'] = ldb.MessageElement(admin_limits,
+                                                    ldb.FLAG_MOD_REPLACE,
+                                                    'lDAPAdminLimits')
+        self.ldb.modify(msg)
+
+        # Use a new connection so that the limits are reloaded.
+        samdb = SamDB(url, credentials=creds,
+                      session_info=system_session(lp),
+                      lp=lp)
+
+        # Create a large search expression that will take a long time to
+        # evaluate.
+        expression = '(anr=l)' * 10000
+        expression = f'(|{expression})'
+
+        # Perform the LDAP search.
+        prev = time.time()
+        with self.assertRaises(ldb.LdbError) as err:
+            samdb.search(base=self.ou_dn,
+                         scope=ldb.SCOPE_SUBTREE,
+                         expression=expression,
+                         attrs=['objectGUID'])
+        now = time.time()
+        duration = now - prev
+
+        # Ensure that we timed out.
+        enum, _ = err.exception.args
+        self.assertEqual(ldb.ERR_TIME_LIMIT_EXCEEDED, enum)
+
+        # Ensure that the time spent searching is within the limit we
+        # set.  We allow a margin of 100% over as the Samba timeout
+        # handling is not very accurate (and does not need to be)
+        self.assertLess(timeout - 1, duration)
+        self.assertLess(duration, timeout * 2)
 
 
 if "://" not in url: