From: Garming Sam Date: Thu, 9 Jun 2016 02:55:57 +0000 (+1200) Subject: tests/drs: assert sorted identifier GUIDs across getncchanges X-Git-Tag: tdb-1.3.10~826 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=4de5e7da9c2e9cac1b3fe33f4ba161e2b79a9bdc;p=thirdparty%2Fsamba.git tests/drs: assert sorted identifier GUIDs across getncchanges Signed-off-by: Garming Sam Reviewed-by: Andrew Bartlett BUG: https://bugzilla.samba.org/show_bug.cgi?id=11960 --- diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py index 78fc14ecc58..0ddac8b9244 100644 --- a/source4/torture/drs/python/getnc_exop.py +++ b/source4/torture/drs/python/getnc_exop.py @@ -79,7 +79,7 @@ class AbstractLink: class ExopBaseTest: def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop, - replica_flags=0): + replica_flags=0, max_objects=0): req8 = drsuapi.DsGetNCChangesRequest8() req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID() @@ -92,7 +92,7 @@ class ExopBaseTest: req8.highwatermark.highest_usn = 0 req8.uptodateness_vector = None req8.replica_flags = replica_flags - req8.max_object_count = 0 + req8.max_object_count = max_objects req8.max_ndr_size = 402116 req8.extended_op = exop req8.fsmo_info = 0 @@ -303,8 +303,21 @@ class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase, ExopBaseTest): if enum == ldb.ERR_NO_SUCH_OBJECT: pass + def add_linked_attribute(self, src, dest, attr='member'): + m = ldb.Message() + m.dn = ldb.Dn(self.ldb_dc1, src) + m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr) + self.ldb_dc1.modify(m) + + def remove_linked_attribute(self, src, dest, attr='member'): + m = ldb.Message() + m.dn = ldb.Dn(self.ldb_dc1, src) + m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr) + self.ldb_dc1.modify(m) + def test_sort_behaviour_single_object(self): """Testing sorting behaviour on single objects""" + user1_dn = "cn=test_user1,%s" % self.ou user2_dn = "cn=test_user2,%s" % self.ou user3_dn = "cn=test_user3,%s" % self.ou @@ -414,14 +427,49 @@ class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase, ExopBaseTest): self.assertEqual(len(expected_links), ctr.linked_attributes_count) self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes) - def add_linked_attribute(self, src, dest, attr='member'): - m = ldb.Message() - m.dn = ldb.Dn(self.ldb_dc1, src) - m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr) - self.ldb_dc1.modify(m) + def test_sort_behaviour_ncchanges(self): + """Testing sorting behaviour on a group of objects.""" + user1_dn = "cn=test_user1,%s" % self.ou + group_dn = "cn=test_group,%s" % self.ou + self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"}) + self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"}) - def remove_linked_attribute(self, src, dest, attr='member'): - m = ldb.Message() - m.dn = ldb.Dn(self.ldb_dc1, src) - m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr) - self.ldb_dc1.modify(m) + self.add_linked_attribute(group_dn, user1_dn, + attr='member') + + dc_guid_1 = self.ldb_dc1.get_invocation_id() + + drs, drs_handle = self._ds_bind(self.dnsname_dc1) + + # Make sure the max objects count is high enough + req8 = self._exop_req8(dest_dsa=None, + invocation_id=dc_guid_1, + nc_dn_str=self.base_dn, + replica_flags=0, + max_objects=100, + exop=drsuapi.DRSUAPI_EXOP_NONE) + + # Loop until we get linked attributes, or we get to the end. + # Samba sends linked attributes at the end, unlike Windows. + while True: + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + if ctr.more_data == 0 or ctr.linked_attributes_count != 0: + break + req8.highwatermark = ctr.new_highwatermark + + self.assertTrue(ctr.linked_attributes_count != 0) + + no_inactive = [] + for link in ctr.linked_attributes: + try: + target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3, + link.value.blob).guid + except: + target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3Binary, + link.value.blob).guid + no_inactive.append((link, target_guid)) + + no_inactive.sort(cmp=_linked_attribute_compare) + + # assert the two arrays are the same + self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)