From: Gary Lockyer Date: Tue, 28 Apr 2020 21:17:52 +0000 (+1200) Subject: s4 cldap server tests: request size limit tests X-Git-Tag: ldb-2.2.0~560 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=80d9ea1386707c33783b76e5bc4ec37523424439;p=thirdparty%2Fsamba.git s4 cldap server tests: request size limit tests Add tests for packet size limits. Signed-off-by: Gary Lockyer Reviewed-by: Andrew Bartlett --- diff --git a/python/samba/tests/ldap_raw.py b/python/samba/tests/ldap_raw.py index 6110628d5bb..f4ea97d120e 100644 --- a/python/samba/tests/ldap_raw.py +++ b/python/samba/tests/ldap_raw.py @@ -739,3 +739,201 @@ class RawLdapTest(TestCase): self.send(packet) data = self.recv() self.assertIsNone(data) + + +class RawCldapTest(TestCase): + """ + A raw cldap Test case. + The ldap connections are made over UDP port 389 + + Uses the following environment variables: + SERVER + """ + + def setUp(self): + super(RawCldapTest, self).setUp() + + self.host = samba.tests.env_get_var_value('SERVER') + self.port = 389 + self.socket = None + self.connect() + + def tearDown(self): + self.disconnect() + super(RawCldapTest, self).tearDown() + + def disconnect(self): + ''' Disconnect from and clean up the connection to the server ''' + if self.socket is None: + return + self.socket.close() + self.socket = None + + def connect(self): + ''' Establish an UDP connection to the test server ''' + + try: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket.settimeout(10) + self.socket.connect((self.host, self.port)) + except socket.error: + if self.socket is not None: + self.socket.close() + raise + + def send(self, req): + ''' Send the request to the server ''' + try: + self.socket.sendall(req) + except socket.error: + self.disconnect() + raise + + def recv(self, num_recv=0xffff, timeout=None): + ''' receive an array of bytes from the server ''' + data = None + try: + if timeout is not None: + self.socket.settimeout(timeout) + data = self.socket.recv(num_recv, 0) + self.socket.settimeout(10) + if len(data) == 0: + self.disconnect() + return None + except socket.timeout: + # We ignore timeout's as the ldap server will drop the connection + # on the errors we're testing. So returning None on a timeout is + # the desired behaviour. + self.socket.settimeout(10) + except socket.error: + self.disconnect() + raise + return data + + def test_search_equals_maximum_permitted_size(self): + ''' + Check that an CLDAP search request equal to the maximum size is + accepted + ''' + + # Lets build an ldap search packet to query the RootDSE + header = encode_string(None) # Base DN, "" + header += encode_enumerated(0) # Enumeration scope + header += encode_enumerated(0) # Enumeration dereference + header += encode_integer(0) # Integer size limit + header += encode_integer(0) # Integer time limit + header += encode_boolean(False) # Boolean attributes only + + # + # build an equality search of the form x...x=y...y + # With the length of x...x and y...y chosen to generate an + # cldap request of 4096 bytes. + x = encode_string(b'x' * 2027) + y = encode_string(b'y' * 2027) + equals = encode_element(EQUALS, x + y) + trailer = encode_sequence(None) + search = encode_element(SEARCH, header + equals + trailer) + + msg_no = encode_integer(2) + packet = encode_sequence(msg_no + search) + # + # The length of the packet should be equal to the + # Maximum length of a cldap packet + self.assertEqual(4096, len(packet)) + + self.send(packet) + data = self.recv() + self.assertIsNotNone(data) + + # + # Decode and validate the response + + # Should be a sequence + (ber_type, length, element, rest) = decode_element(data) + self.assertEqual(SEQUENCE.hex(), ber_type.hex()) + self.assertTrue(length > 0) + self.assertGreater(len(rest), 0) + # rest should contain a Search request done element, but it's + # not validated in this test. + + # message id should be 2 + (ber_type, length, element, rest) = decode_element(element) + self.assertEqual(INTEGER.hex(), ber_type.hex()) + msg_no = int.from_bytes(element, byteorder='big') + self.assertEqual(2, msg_no) + self.assertGreater(len(rest), 0) + + # Should have a Search response element + (ber_type, length, element, rest) = decode_element(rest) + self.assertEqual(SEARCH_RES.hex(), ber_type.hex()) + self.assertEqual(0, len(rest)) + + # Should have an empty matching DN + (ber_type, length, element, rest) = decode_element(element) + self.assertEqual(OCTET_STRING.hex(), ber_type.hex()) + self.assertEqual(0, len(element)) + self.assertGreater(len(rest), 0) + + # Then a sequence of attribute sequences + (ber_type, length, element, rest) = decode_element(rest) + self.assertEqual(SEQUENCE.hex(), ber_type.hex()) + self.assertEqual(0, len(rest)) + + # Check the first attribute sequence, it should be + # "configurationNamingContext" + # The remaining attribute sequences will be ignored but + # check that they exist. + (ber_type, length, element, rest) = decode_element(element) + self.assertEqual(SEQUENCE.hex(), ber_type.hex()) + # Check that there are remaining attribute sequences. + self.assertGreater(len(rest), 0) + + # Check the name of the first attribute + (ber_type, length, element, rest) = decode_element(element) + self.assertEqual(OCTET_STRING.hex(), ber_type.hex()) + self.assertGreater(len(rest), 0) + self.assertEqual(b'configurationNamingContext', element) + + # And check that there is an attribute value set + (ber_type, length, element, rest) = decode_element(rest) + self.assertEqual(SET.hex(), ber_type.hex()) + self.assertGreater(len(element), 0) + self.assertEqual(0, len(rest)) + + def test_search_exceeds_maximum_permitted_size(self): + ''' + Test that a cldap request longer than the maximum permitted + size is rejected. + ''' + + # Lets build an ldap search packet to query the RootDSE + header = encode_string(None) # Base DN, "" + header += encode_enumerated(0) # Enumeration scope + header += encode_enumerated(0) # Enumeration dereference + header += encode_integer(0) # Integer size limit + header += encode_integer(0) # Integer time limit + header += encode_boolean(False) # Boolean attributes only + + # + # build an equality search of the form x...x=y...y + # With the length of x...x and y...y chosen to generate an + # cldap request of 4097 bytes. + x = encode_string(b'x' * 2027) + y = encode_string(b'y' * 2028) + equals = encode_element(EQUALS, x + y) + trailer = encode_sequence(None) + search = encode_element(SEARCH, header + equals + trailer) + + msg_no = encode_integer(2) + packet = encode_sequence(msg_no + search) + # + # The length of the sequence data should be one greater than the + # Maximum length of a cldap packet + self.assertEqual(4097, len(packet)) + + self.send(packet) + data = self.recv() + # + # The connection should be closed by the server and we should not + # see any data. + self.assertIsNone(data)