From: Andrew Bartlett Date: Tue, 10 May 2016 04:40:51 +0000 (+1200) Subject: selftest: Add more tests to cover attribute changes vs DN renames X-Git-Tag: tdb-1.3.10~954 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=9dcc62eb781fdc05070bc12c43081e086963e4ad;p=thirdparty%2Fsamba.git selftest: Add more tests to cover attribute changes vs DN renames This covers a bug where unrelated attribute changes would reverse a rename Signed-off-by: Andrew Bartlett Reviewed-by: Garming Sam ' % self._GUID_string(user_orig["objectGUID"][0])) + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = new_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) # trigger replication from DC1 to DC2, for cleanup self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + # trigger replication from DC2 to DC1, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC1 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + def test_ReplicateMoveObject2(self): """Verifies how a moved container with a user inside is not @@ -199,6 +234,570 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase): # trigger replication from DC1 to DC2, for cleanup self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True) + + # trigger replication from DC2 to DC1, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC1 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True) + + + def test_ReplicateMoveObject3(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is created on DC1 + - the OU is renamed on DC1 + - We verify that after replication, + that the user has the correct DN (under OU2). + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s" % self.ou1_dn.get_component_value(0), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + user_moved_dn = ldb_res[0]["dn"] + + # trigger replication from DC2 (Which has never seen the object) to DC1 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC1 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=False) + + # delete user on DC1 + self.ldb_dc1.delete('' % self._GUID_string(user_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True) + + + def test_ReplicateMoveObject4(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is replicated properly + - the user is modified on DC2 + - the OU is renamed on DC1 + - We verify that after replication DC1 -> DC2, + that the user has the correct DN (under OU2), and the description + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s" % self.ou1_dn.get_component_value(0), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + user_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = user_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) + + # delete user on DC1 + self.ldb_dc1.delete('' % self._GUID_string(user_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + def test_ReplicateMoveObject5(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is replicated properly + - the user is modified on DC2 + - the OU is renamed on DC1 + - We verify that after replication DC2 -> DC1, + that the user has the correct DN (under OU2), and the description + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s" % self.ou1_dn.get_component_value(0), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + # check user info on DC1 + print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + user_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = user_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) + + # delete user on DC2 + self.ldb_dc2.delete('' % self._GUID_string(user_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC1 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + + def test_ReplicateMoveObject6(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 + - We verify that after replication DC1 -> DC2, + that the OU1 has the correct DN (under OU2), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + ou_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + + def test_ReplicateMoveObject7(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 to be under OU2 + - We verify that after replication DC2 -> DC1, + that the OU1 has the correct DN (under OU2), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + ou_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + + def test_ReplicateMoveObject8(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 to OU1-renamed + - We verify that after replication DC1 -> DC2, + that the OU1 has the correct DN (OU1-renamed), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s-renamed" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou1_dn.parent()) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + ou_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + + def test_ReplicateMoveObject9(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 to be under OU2 + - the OU1 is renamed on DC1 to OU1-renamed + - We verify that after replication DC1 -> DC2, + that the OU1 has the correct DN (OU1-renamed), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s-renamed" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou1_dn.parent()) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + ou_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + + def test_ReplicateMoveObject10(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is deleted on DC1 + - We verify that after replication DC1 -> DC2, + that the OU1 is deleted, and the description has gone away + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # delete OU on DC1 + self.ldb_dc1.delete('' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + + def test_ReplicateMoveObject11(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is deleted on DC1 + - We verify that after replication DC2 -> DC1, + that the OU1 is deleted, and the description has gone away + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # delete OU on DC1 + self.ldb_dc1.delete('' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase): @@ -282,14 +881,28 @@ class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase): attrs=["*", "parentGUID"]) self.assertEquals(len(res), 1) user_cur = res[0] - # now check properties of the user - name_orig = obj_orig["cn"][0] - name_cur = user_cur["cn"][0] + cn_orig = obj_orig["cn"][0] + cn_cur = user_cur["cn"][0] + name_orig = obj_orig["name"][0] + name_cur = user_cur["name"][0] dn_orig = obj_orig["dn"] dn_cur = user_cur["dn"] - self.assertFalse("isDeleted" in user_cur) - self.assertEquals(name_cur, name_orig) - self.assertEquals(dn_cur, dn_orig) + # now check properties of the user + if is_deleted: + self.assertTrue("isDeleted" in user_cur) + self.assertEquals(cn_cur.split('\n')[0], cn_orig) + self.assertEquals(name_cur.split('\n')[0], name_orig) + self.assertEquals(dn_cur.get_rdn_value().split('\n')[0], + dn_orig.get_rdn_value()) + self.assertEqual(name_cur, cn_cur) + else: + self.assertFalse("isDeleted" in user_cur) + self.assertEquals(cn_cur, cn_orig) + self.assertEquals(name_cur, name_orig) + self.assertEquals(dn_cur, dn_orig) + self.assertEqual(name_cur, cn_cur) + self.assertEqual(name_cur, user_cur.dn.get_rdn_value()) + return user_cur @@ -401,10 +1014,17 @@ class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase): # check user info on DC2 - should be valid user user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + # Rename on DC1 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) new_dn.add_base(self.ou1_dn) self.ldb_dc1.rename(user_moved_dn, new_dn) + # Modify description on DC2 + msg = ldb.Message() + msg.dn = user_moved_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, scope=SCOPE_SUBTREE, expression="(samAccountName=%s)" % username) @@ -417,12 +1037,22 @@ class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase): self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # check user info on DC2 - should be valid user user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) # delete user on DC1 self.ldb_dc1.delete('' % self._GUID_string(user_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + # trigger replication from DC1 to DC2, for cleanup self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) def test_ReplicateMoveInTree3(self):