From: Andrew Bartlett Date: Fri, 4 Mar 2016 00:12:42 +0000 (+1300) Subject: selftest/drs: Show we return the correct 3 objects for DRSUAPI_EXOP_FSMO_RID_ALLOC X-Git-Tag: tdb-1.3.10~969 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=ca37c7146c35cb4e672dd2b859b31bf9b10cf332;p=thirdparty%2Fsamba.git selftest/drs: Show we return the correct 3 objects for DRSUAPI_EXOP_FSMO_RID_ALLOC This does not depend on DRSUAPI_DRS_GET_ANC. This test is not new, but it was not previously being run. Signed-off-by: Andrew Bartlett Reviewed-by: Garming Sam --- diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index f7bbd04bd9e..4866cd72ee7 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -652,6 +652,11 @@ for env in ['vampire_dc', 'promoted_dc']: name="samba4.drs.repl_move.python(%s)" % env, environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()}, extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD']) + planoldpythontestsuite(env, "getnc_exop", + extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')], + name="samba4.drs.getnc_exop.python(%s)" % env, + environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()}, + extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD']) for env in ["ad_dc_ntvfs", "s4member", "rodc", "promoted_dc", "ad_dc", "ad_member"]: diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py index 904c0133334..6b796bab923 100644 --- a/source4/torture/drs/python/getnc_exop.py +++ b/source4/torture/drs/python/getnc_exop.py @@ -4,6 +4,7 @@ # Tests various schema replication scenarios # # Copyright (C) Kamen Mazdrashki 2011 +# Copyright (C) Andrew Bartlett 2016 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -30,6 +31,7 @@ import drs_base import samba.tests +import ldb from ldb import SCOPE_BASE from samba.dcerpc import drsuapi, misc, drsblobs @@ -49,7 +51,8 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): def tearDown(self): super(DrsReplicaSyncTestCase, self).tearDown() - def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop): + def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop, + replica_flags=0): req8 = drsuapi.DsGetNCChangesRequest8() req8.destination_dsa_guid = misc.GUID(dest_dsa) @@ -61,7 +64,7 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): req8.highwatermark.reserved_usn = 0 req8.highwatermark.highest_usn = 0 req8.uptodateness_vector = None - req8.replica_flags = 0 + req8.replica_flags = replica_flags req8.max_object_count = 0 req8.max_ndr_size = 402116 req8.extended_op = exop @@ -87,10 +90,21 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): # collect info to return later fsmo_info_1 = {"dns_name": self.dnsname_dc1, "invocation_id": self.ldb_dc1.get_invocation_id(), - "ntds_guid": self.ldb_dc1.get_ntds_GUID()} + "ntds_guid": self.ldb_dc1.get_ntds_GUID(), + "server_dn": self.ldb_dc1.get_serverName()} fsmo_info_2 = {"dns_name": self.dnsname_dc2, "invocation_id": self.ldb_dc2.get_invocation_id(), - "ntds_guid": self.ldb_dc2.get_ntds_GUID()} + "ntds_guid": self.ldb_dc2.get_ntds_GUID(), + "server_dn": self.ldb_dc2.get_serverName()} + + msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"]) + fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0]) + fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"] + + msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"]) + fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0]) + fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"] + # determine the owner dc res = self.ldb_dc1.search(fsmo_obj_dn, scope=SCOPE_BASE, attrs=["fSMORoleOwner"]) @@ -108,7 +122,7 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): self.assertEqual(ctr6.nc_object_count, 0) self.assertEqual(ctr6.nc_linked_attributes_count, 0) self.assertEqual(ctr6.linked_attributes_count, 0) - self.assertEqual(ctr6.linked_attributes, None) + self.assertEqual(ctr6.linked_attributes, []) self.assertEqual(ctr6.drs_error[0], 0) def test_FSMONotOwner(self): @@ -144,3 +158,88 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER) self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) + + def test_InvalidDestDSA_ridalloc(self): + """Test RID allocation with invalid destination DSA guid""" + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef", + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str=fsmo_dn, + exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC) + + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + self.assertEqual(level, 6, "Expected level 6 response!") + self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER) + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) + + def test_do_ridalloc(self): + """Test doing a RID allocation with a valid destination DSA guid""" + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"], + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str=fsmo_dn, + exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC) + + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + self.assertEqual(level, 6, "Expected level 6 response!") + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) + ctr6 = ctr + self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS) + self.assertEqual(ctr6.object_count, 3) + self.assertNotEqual(ctr6.first_object, None) + self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn) + self.assertNotEqual(ctr6.first_object.next_object, None) + self.assertNotEqual(ctr6.first_object.next_object.next_object, None) + second_object = ctr6.first_object.next_object.object + self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"]) + third_object = ctr6.first_object.next_object.next_object.object + self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"]) + + self.assertEqual(ctr6.more_data, False) + self.assertEqual(ctr6.nc_object_count, 0) + self.assertEqual(ctr6.nc_linked_attributes_count, 0) + self.assertEqual(ctr6.drs_error[0], 0) + # We don't check the linked_attributes_count as if the domain + # has an RODC, it can gain links on the server account object + + def test_do_ridalloc_get_anc(self): + """Test doing a RID allocation with a valid destination DSA guid and """ + fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn()) + (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn) + + req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"], + invocation_id=fsmo_owner["invocation_id"], + nc_dn_str=fsmo_dn, + exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC, + replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC) + + (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"]) + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + self.assertEqual(level, 6, "Expected level 6 response!") + self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"])) + self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"])) + ctr6 = ctr + self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS) + self.assertEqual(ctr6.object_count, 3) + self.assertNotEqual(ctr6.first_object, None) + self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn) + self.assertNotEqual(ctr6.first_object.next_object, None) + self.assertNotEqual(ctr6.first_object.next_object.next_object, None) + second_object = ctr6.first_object.next_object.object + self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"]) + third_object = ctr6.first_object.next_object.next_object.object + self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"]) + self.assertEqual(ctr6.more_data, False) + self.assertEqual(ctr6.nc_object_count, 0) + self.assertEqual(ctr6.nc_linked_attributes_count, 0) + self.assertEqual(ctr6.drs_error[0], 0) + # We don't check the linked_attributes_count as if the domain + # has an RODC, it can gain links on the server account object