]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
s4 cldap server tests: request size limit tests
authorGary Lockyer <gary@catalyst.net.nz>
Tue, 28 Apr 2020 21:17:52 +0000 (09:17 +1200)
committerGary Lockyer <gary@samba.org>
Sun, 10 May 2020 21:45:38 +0000 (21:45 +0000)
Add tests for packet size limits.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/ldap_raw.py

index 6110628d5bbe38b4ca91cb212050f991b813a106..f4ea97d120e4b06080b7914e1fd55b53906ee98f 100644 (file)
@@ -739,3 +739,201 @@ class RawLdapTest(TestCase):
         self.send(packet)
         data = self.recv()
         self.assertIsNone(data)
+
+
+class RawCldapTest(TestCase):
+    """
+    A raw cldap Test case.
+    The ldap connections are made over UDP port 389
+
+    Uses the following environment variables:
+        SERVER
+    """
+
+    def setUp(self):
+        super(RawCldapTest, self).setUp()
+
+        self.host = samba.tests.env_get_var_value('SERVER')
+        self.port = 389
+        self.socket = None
+        self.connect()
+
+    def tearDown(self):
+        self.disconnect()
+        super(RawCldapTest, self).tearDown()
+
+    def disconnect(self):
+        ''' Disconnect from and clean up the connection to the server '''
+        if self.socket is None:
+            return
+        self.socket.close()
+        self.socket = None
+
+    def connect(self):
+        ''' Establish an UDP connection to the test server '''
+
+        try:
+            self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+            self.socket.settimeout(10)
+            self.socket.connect((self.host, self.port))
+        except socket.error:
+            if self.socket is not None:
+                self.socket.close()
+            raise
+
+    def send(self, req):
+        ''' Send the request to the server '''
+        try:
+            self.socket.sendall(req)
+        except socket.error:
+            self.disconnect()
+            raise
+
+    def recv(self, num_recv=0xffff, timeout=None):
+        ''' receive an array of bytes from the server '''
+        data = None
+        try:
+            if timeout is not None:
+                self.socket.settimeout(timeout)
+            data = self.socket.recv(num_recv, 0)
+            self.socket.settimeout(10)
+            if len(data) == 0:
+                self.disconnect()
+                return None
+        except socket.timeout:
+            # We ignore timeout's as the ldap server will drop the connection
+            # on the errors we're testing. So returning None on a timeout is
+            # the desired behaviour.
+            self.socket.settimeout(10)
+        except socket.error:
+            self.disconnect()
+            raise
+        return data
+
+    def test_search_equals_maximum_permitted_size(self):
+        '''
+        Check that an CLDAP search request equal to the maximum size is
+        accepted
+        '''
+
+        # Lets build an ldap search packet to query the RootDSE
+        header = encode_string(None)        # Base DN, ""
+        header += encode_enumerated(0)      # Enumeration scope
+        header += encode_enumerated(0)      # Enumeration dereference
+        header += encode_integer(0)         # Integer size limit
+        header += encode_integer(0)         # Integer time limit
+        header += encode_boolean(False)     # Boolean attributes only
+
+        #
+        # build an equality search of the form x...x=y...y
+        # With the length of x...x and y...y chosen to generate an
+        # cldap request of 4096 bytes.
+        x = encode_string(b'x' * 2027)
+        y = encode_string(b'y' * 2027)
+        equals = encode_element(EQUALS, x + y)
+        trailer = encode_sequence(None)
+        search = encode_element(SEARCH, header + equals + trailer)
+
+        msg_no = encode_integer(2)
+        packet = encode_sequence(msg_no + search)
+        #
+        # The length of the packet should be equal to the
+        # Maximum length of a cldap packet
+        self.assertEqual(4096, len(packet))
+
+        self.send(packet)
+        data = self.recv()
+        self.assertIsNotNone(data)
+
+        #
+        # Decode and validate the response
+
+        # Should be a sequence
+        (ber_type, length, element, rest) = decode_element(data)
+        self.assertEqual(SEQUENCE.hex(), ber_type.hex())
+        self.assertTrue(length > 0)
+        self.assertGreater(len(rest), 0)
+        # rest should contain a Search request done element, but it's
+        # not validated in this test.
+
+        # message id should be 2
+        (ber_type, length, element, rest) = decode_element(element)
+        self.assertEqual(INTEGER.hex(), ber_type.hex())
+        msg_no = int.from_bytes(element, byteorder='big')
+        self.assertEqual(2, msg_no)
+        self.assertGreater(len(rest), 0)
+
+        # Should have a Search response element
+        (ber_type, length, element, rest) = decode_element(rest)
+        self.assertEqual(SEARCH_RES.hex(), ber_type.hex())
+        self.assertEqual(0, len(rest))
+
+        # Should have an empty matching DN
+        (ber_type, length, element, rest) = decode_element(element)
+        self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
+        self.assertEqual(0, len(element))
+        self.assertGreater(len(rest), 0)
+
+        # Then a sequence of attribute sequences
+        (ber_type, length, element, rest) = decode_element(rest)
+        self.assertEqual(SEQUENCE.hex(), ber_type.hex())
+        self.assertEqual(0, len(rest))
+
+        # Check the first attribute sequence, it should  be
+        # "configurationNamingContext"
+        # The remaining attribute sequences will be ignored but
+        # check that they exist.
+        (ber_type, length, element, rest) = decode_element(element)
+        self.assertEqual(SEQUENCE.hex(), ber_type.hex())
+        # Check that there are remaining attribute sequences.
+        self.assertGreater(len(rest), 0)
+
+        # Check the name of the first attribute
+        (ber_type, length, element, rest) = decode_element(element)
+        self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
+        self.assertGreater(len(rest), 0)
+        self.assertEqual(b'configurationNamingContext', element)
+
+        # And check that there is an attribute value set
+        (ber_type, length, element, rest) = decode_element(rest)
+        self.assertEqual(SET.hex(), ber_type.hex())
+        self.assertGreater(len(element), 0)
+        self.assertEqual(0, len(rest))
+
+    def test_search_exceeds_maximum_permitted_size(self):
+        '''
+        Test that a cldap request longer than the maximum permitted
+        size is rejected.
+        '''
+
+        # Lets build an ldap search packet to query the RootDSE
+        header = encode_string(None)        # Base DN, ""
+        header += encode_enumerated(0)      # Enumeration scope
+        header += encode_enumerated(0)      # Enumeration dereference
+        header += encode_integer(0)         # Integer size limit
+        header += encode_integer(0)         # Integer time limit
+        header += encode_boolean(False)     # Boolean attributes only
+
+        #
+        # build an equality search of the form x...x=y...y
+        # With the length of x...x and y...y chosen to generate an
+        # cldap request of 4097 bytes.
+        x = encode_string(b'x' * 2027)
+        y = encode_string(b'y' * 2028)
+        equals = encode_element(EQUALS, x + y)
+        trailer = encode_sequence(None)
+        search = encode_element(SEARCH, header + equals + trailer)
+
+        msg_no = encode_integer(2)
+        packet = encode_sequence(msg_no + search)
+        #
+        # The length of the sequence data should be one greater than the
+        # Maximum length of a cldap packet
+        self.assertEqual(4097, len(packet))
+
+        self.send(packet)
+        data = self.recv()
+        #
+        # The connection should be closed by the server and we should not
+        # see any data.
+        self.assertIsNone(data)