]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
CVE-2020-10704: ldapserver tests: Limit search request sizes
authorGary Lockyer <gary@catalyst.net.nz>
Tue, 14 Apr 2020 01:32:32 +0000 (13:32 +1200)
committerKarolin Seeger <kseeger@samba.org>
Tue, 21 Apr 2020 08:21:09 +0000 (10:21 +0200)
Add tests to ensure that overly long (> 256000 bytes) LDAP search
requests are rejected.

Credit to OSS-Fuzz

REF: https://bugs.chromium.org/p/oss-fuzz/issues/detail?id=20454
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14334

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/ldap_raw.py [new file with mode: 0644]
selftest/knownfail.d/ldap_raw [new file with mode: 0644]
source4/selftest/tests.py

diff --git a/python/samba/tests/ldap_raw.py b/python/samba/tests/ldap_raw.py
new file mode 100644 (file)
index 0000000..334fabc
--- /dev/null
@@ -0,0 +1,234 @@
+# Integration tests for the ldap server, using raw socket IO
+#
+# Tests for handling of malformed or large packets.
+#
+# Copyright (C) Catalyst.Net Ltd 2020
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+import socket
+
+import samba.tests
+from samba.tests import TestCase
+
+
+#
+# LDAP Operations
+#
+SEARCH = b'\x63'
+
+EQUALS = b'\xa3'
+
+
+#
+# ASN.1 Element types
+#
+BOOLEAN = b'\x01'
+INTEGER = b'\x02'
+OCTET_STRING = b'\x04'
+NULL = b'\x05'
+ENUMERATED = b'\x0a'
+SEQUENCE = b'\x30'
+SET = b'\x31'
+
+
+#
+# ASN.1 Helper functions.
+#
+def encode_element(ber_type, data):
+    ''' Encode an ASN.1 BER element. '''
+    if data is None:
+        return ber_type + encode_length(0)
+    return ber_type + encode_length(len(data)) + data
+
+
+def encode_length(length):
+    ''' Encode the length of an ASN.1 BER element.  '''
+
+    if length > 0xFFFFFF:
+        return b'\x84' + length.to_bytes(4, "big")
+    if length > 0xFFFF:
+        return b'\x83' + length.to_bytes(3, "big")
+    if length > 0xFF:
+        return b'\x82' + length.to_bytes(2, "big")
+    if length > 0x7F:
+        return b'\x81' + length.to_bytes(1, "big")
+    return length.to_bytes(1, "big")
+
+
+def encode_string(string):
+    ''' Encode an octet string '''
+    return encode_element(OCTET_STRING, string)
+
+
+def encode_boolean(boolean):
+    ''' Encode a boolean value '''
+    if boolean:
+        return encode_element(BOOLEAN, b'\xFF')
+    return encode_element(BOOLEAN, b'\x00')
+
+
+def encode_integer(integer):
+    ''' Encode an integer value '''
+    bit_len = integer.bit_length()
+    byte_len = (bit_len // 8) + 1
+    return encode_element(INTEGER, integer.to_bytes(byte_len, "big"))
+
+
+def encode_enumerated(enum):
+    ''' Encode an enumerated value '''
+    return encode_element(ENUMERATED, enum.to_bytes(1, "big"))
+
+
+def encode_sequence(sequence):
+    ''' Encode a sequence '''
+    return encode_element(SEQUENCE, sequence)
+
+
+class RawLdapTest(TestCase):
+    """A raw Ldap Test case."""
+
+    def setUp(self):
+        super(RawLdapTest, self).setUp()
+
+        self.host = samba.tests.env_get_var_value('SERVER')
+        self.port = 389
+        self.socket = None
+        self.connect()
+
+    def tearDown(self):
+        self.disconnect()
+        super(RawLdapTest, self).tearDown()
+
+    def disconnect(self):
+        ''' Disconnect from and clean up the connection to the server '''
+        if self.socket is None:
+            return
+        self.socket.close()
+        self.socket = None
+
+    def connect(self):
+        ''' Open a socket stream connection to the server '''
+        try:
+            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.socket.settimeout(10)
+            self.socket.connect((self.host, self.port))
+        except socket.error:
+            self.socket.close()
+            raise
+
+    def send(self, req):
+        ''' Send the request to the server '''
+        try:
+            self.socket.sendall(req)
+        except socket.error:
+            self.disconnect()
+            raise
+
+    def recv(self, num_recv=0xffff, timeout=None):
+        ''' recv an array of bytes from the server '''
+        data = None
+        try:
+            if timeout is not None:
+                self.socket.settimeout(timeout)
+            data = self.socket.recv(num_recv, 0)
+            self.socket.settimeout(10)
+            if len(data) == 0:
+                self.disconnect()
+                return None
+        except socket.timeout:
+            # We ignore timeout's as the ldap server will drop the connection
+            # on the errors we're testing. So returning None on a timeout is
+            # the desired behaviour.
+            self.socket.settimeout(10)
+        except socket.error:
+            self.disconnect()
+            raise
+        return data
+
+    def test_search_equals_maximum_permitted_size(self):
+        '''
+        Check that an LDAP search request equal to the maximum size is accepted
+        '''
+
+        # Lets build an ldap search packet to query the RootDSE
+        header = encode_string(None)        # Base DN, ""
+        header += encode_enumerated(0)      # Enumeration scope
+        header += encode_enumerated(0)      # Enumeration dereference
+        header += encode_integer(0)         # Integer size limit
+        header += encode_integer(0)         # Integer time limit
+        header += encode_boolean(False)     # Boolean attributes only
+
+        #
+        # build an equality search of the form x...x=y...y
+        # With the length of x...x and y...y chosen to generate an
+        # ldap request of 256000 bytes.
+        x = encode_string(b'x' * 127974)
+        y = encode_string(b'y' * 127979)
+        equals = encode_element(EQUALS, x + y)
+        trailer = encode_sequence(None)
+        search = encode_element(SEARCH, header + equals + trailer)
+
+        msg_no = encode_integer(1)
+        packet = encode_sequence(msg_no + search)
+        #
+        # The length of the packet should be equal to the
+        # Maximum length of a search query
+        self.assertEqual(256000, len(packet))
+
+        self.send(packet)
+        data = self.recv()
+        self.assertIsNotNone(data)
+
+        # Should be a sequence
+        self.assertEqual(SEQUENCE, data[0:1])
+
+    def test_search_exceeds_maximum_permitted_size(self):
+        '''
+        Test that a search query longer than the maximum permitted
+        size is rejected.
+        '''
+
+        # Lets build an ldap search packet to query the RootDSE
+        header = encode_string(None)        # Base DN, ""
+        header += encode_enumerated(0)      # Enumeration scope
+        header += encode_enumerated(0)      # Enumeration dereference
+        header += encode_integer(0)         # Integer size limit
+        header += encode_integer(0)         # Integer time limit
+        header += encode_boolean(False)     # Boolean attributes only
+
+        #
+        # build an equality search of the form x...x=y...y
+        # With the length of x...x and y...y chosen to generate an
+        # ldap request of 256001 bytes.
+        x = encode_string(b'x' * 127979)
+        y = encode_string(b'y' * 127975)
+        equals = encode_element(EQUALS, x + y)
+        trailer = encode_sequence(None)
+        search = encode_element(SEARCH, header + equals + trailer)
+
+        msg_no = encode_integer(1)
+        packet = encode_sequence(msg_no + search)
+        #
+        # The length of the sequence data should be one greater than the
+        # Maximum length of a search query
+        self.assertEqual(256001, len(packet))
+
+        self.send(packet)
+        data = self.recv()
+        #
+        # The connection should be closed by the server and we should not
+        # see any data.
+        self.assertIsNone(data)
diff --git a/selftest/knownfail.d/ldap_raw b/selftest/knownfail.d/ldap_raw
new file mode 100644 (file)
index 0000000..8bd2ee5
--- /dev/null
@@ -0,0 +1 @@
+^samba.tests.ldap_raw.samba.tests.ldap_raw.RawLdapTest.test_search_exceeds_maximum_permitted_size\(ad_dc\)
index 46c1eb9a18bdf50f05da029b2d9f86b1288a5fd3..19c81b46ea4451bf3343c4f87f5e6a87dbfec7fb 100755 (executable)
@@ -864,6 +864,12 @@ for env in ["ad_dc_ntvfs:local", "ad_dc:local",
             "promoted_dc:local"]:
     planoldpythontestsuite(env, "samba.tests.blackbox.smbcontrol", py3_compatible=True)
 
+planoldpythontestsuite("ad_dc",
+                       "samba.tests.ldap_raw",
+                       py3_compatible= True,
+                       extra_args=['-U"$USERNAME%$PASSWORD"'],
+                       environ={'TEST_ENV': 'ad_dc'})
+
 planoldpythontestsuite("none", "samba.tests.blackbox.undoguididx")
 
 plantestsuite_loadlist("samba4.ldap.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])