self.send(packet)
data = self.recv()
self.assertIsNone(data)
+
+
+class RawCldapTest(TestCase):
+ """
+ A raw cldap Test case.
+ The ldap connections are made over UDP port 389
+
+ Uses the following environment variables:
+ SERVER
+ """
+
+ def setUp(self):
+ super(RawCldapTest, self).setUp()
+
+ self.host = samba.tests.env_get_var_value('SERVER')
+ self.port = 389
+ self.socket = None
+ self.connect()
+
+ def tearDown(self):
+ self.disconnect()
+ super(RawCldapTest, self).tearDown()
+
+ def disconnect(self):
+ ''' Disconnect from and clean up the connection to the server '''
+ if self.socket is None:
+ return
+ self.socket.close()
+ self.socket = None
+
+ def connect(self):
+ ''' Establish an UDP connection to the test server '''
+
+ try:
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.socket.settimeout(10)
+ self.socket.connect((self.host, self.port))
+ except socket.error:
+ if self.socket is not None:
+ self.socket.close()
+ raise
+
+ def send(self, req):
+ ''' Send the request to the server '''
+ try:
+ self.socket.sendall(req)
+ except socket.error:
+ self.disconnect()
+ raise
+
+ def recv(self, num_recv=0xffff, timeout=None):
+ ''' receive an array of bytes from the server '''
+ data = None
+ try:
+ if timeout is not None:
+ self.socket.settimeout(timeout)
+ data = self.socket.recv(num_recv, 0)
+ self.socket.settimeout(10)
+ if len(data) == 0:
+ self.disconnect()
+ return None
+ except socket.timeout:
+ # We ignore timeout's as the ldap server will drop the connection
+ # on the errors we're testing. So returning None on a timeout is
+ # the desired behaviour.
+ self.socket.settimeout(10)
+ except socket.error:
+ self.disconnect()
+ raise
+ return data
+
+ def test_search_equals_maximum_permitted_size(self):
+ '''
+ Check that an CLDAP search request equal to the maximum size is
+ accepted
+ '''
+
+ # Lets build an ldap search packet to query the RootDSE
+ header = encode_string(None) # Base DN, ""
+ header += encode_enumerated(0) # Enumeration scope
+ header += encode_enumerated(0) # Enumeration dereference
+ header += encode_integer(0) # Integer size limit
+ header += encode_integer(0) # Integer time limit
+ header += encode_boolean(False) # Boolean attributes only
+
+ #
+ # build an equality search of the form x...x=y...y
+ # With the length of x...x and y...y chosen to generate an
+ # cldap request of 4096 bytes.
+ x = encode_string(b'x' * 2027)
+ y = encode_string(b'y' * 2027)
+ equals = encode_element(EQUALS, x + y)
+ trailer = encode_sequence(None)
+ search = encode_element(SEARCH, header + equals + trailer)
+
+ msg_no = encode_integer(2)
+ packet = encode_sequence(msg_no + search)
+ #
+ # The length of the packet should be equal to the
+ # Maximum length of a cldap packet
+ self.assertEqual(4096, len(packet))
+
+ self.send(packet)
+ data = self.recv()
+ self.assertIsNotNone(data)
+
+ #
+ # Decode and validate the response
+
+ # Should be a sequence
+ (ber_type, length, element, rest) = decode_element(data)
+ self.assertEqual(SEQUENCE.hex(), ber_type.hex())
+ self.assertTrue(length > 0)
+ self.assertGreater(len(rest), 0)
+ # rest should contain a Search request done element, but it's
+ # not validated in this test.
+
+ # message id should be 2
+ (ber_type, length, element, rest) = decode_element(element)
+ self.assertEqual(INTEGER.hex(), ber_type.hex())
+ msg_no = int.from_bytes(element, byteorder='big')
+ self.assertEqual(2, msg_no)
+ self.assertGreater(len(rest), 0)
+
+ # Should have a Search response element
+ (ber_type, length, element, rest) = decode_element(rest)
+ self.assertEqual(SEARCH_RES.hex(), ber_type.hex())
+ self.assertEqual(0, len(rest))
+
+ # Should have an empty matching DN
+ (ber_type, length, element, rest) = decode_element(element)
+ self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
+ self.assertEqual(0, len(element))
+ self.assertGreater(len(rest), 0)
+
+ # Then a sequence of attribute sequences
+ (ber_type, length, element, rest) = decode_element(rest)
+ self.assertEqual(SEQUENCE.hex(), ber_type.hex())
+ self.assertEqual(0, len(rest))
+
+ # Check the first attribute sequence, it should be
+ # "configurationNamingContext"
+ # The remaining attribute sequences will be ignored but
+ # check that they exist.
+ (ber_type, length, element, rest) = decode_element(element)
+ self.assertEqual(SEQUENCE.hex(), ber_type.hex())
+ # Check that there are remaining attribute sequences.
+ self.assertGreater(len(rest), 0)
+
+ # Check the name of the first attribute
+ (ber_type, length, element, rest) = decode_element(element)
+ self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
+ self.assertGreater(len(rest), 0)
+ self.assertEqual(b'configurationNamingContext', element)
+
+ # And check that there is an attribute value set
+ (ber_type, length, element, rest) = decode_element(rest)
+ self.assertEqual(SET.hex(), ber_type.hex())
+ self.assertGreater(len(element), 0)
+ self.assertEqual(0, len(rest))
+
+ def test_search_exceeds_maximum_permitted_size(self):
+ '''
+ Test that a cldap request longer than the maximum permitted
+ size is rejected.
+ '''
+
+ # Lets build an ldap search packet to query the RootDSE
+ header = encode_string(None) # Base DN, ""
+ header += encode_enumerated(0) # Enumeration scope
+ header += encode_enumerated(0) # Enumeration dereference
+ header += encode_integer(0) # Integer size limit
+ header += encode_integer(0) # Integer time limit
+ header += encode_boolean(False) # Boolean attributes only
+
+ #
+ # build an equality search of the form x...x=y...y
+ # With the length of x...x and y...y chosen to generate an
+ # cldap request of 4097 bytes.
+ x = encode_string(b'x' * 2027)
+ y = encode_string(b'y' * 2028)
+ equals = encode_element(EQUALS, x + y)
+ trailer = encode_sequence(None)
+ search = encode_element(SEARCH, header + equals + trailer)
+
+ msg_no = encode_integer(2)
+ packet = encode_sequence(msg_no + search)
+ #
+ # The length of the sequence data should be one greater than the
+ # Maximum length of a cldap packet
+ self.assertEqual(4097, len(packet))
+
+ self.send(packet)
+ data = self.recv()
+ #
+ # The connection should be closed by the server and we should not
+ # see any data.
+ self.assertIsNone(data)