From: Gary Lockyer Date: Thu, 11 Oct 2018 22:21:10 +0000 (+1300) Subject: test samr: Extra tests for samr_EnumDomainGroups X-Git-Tag: tdb-1.3.17~719 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7dd7800a88e6ece0606d6d94718318e2dc067d58;p=thirdparty%2Fsamba.git test samr: Extra tests for samr_EnumDomainGroups Add extra tests to test the content returned by samr_EnumDomainGroups, and tests for the result caching added in the following commit. Signed-off-by: Gary Lockyer Reviewed-by: Andrew Bartlett --- diff --git a/python/samba/tests/dcerpc/sam.py b/python/samba/tests/dcerpc/sam.py index f709ba48950..11ed9b9287f 100644 --- a/python/samba/tests/dcerpc/sam.py +++ b/python/samba/tests/dcerpc/sam.py @@ -32,6 +32,7 @@ from samba.dsdb import ( GTYPE_SECURITY_UNIVERSAL_GROUP, GTYPE_SECURITY_GLOBAL_GROUP) from samba import generate_random_password +from samba.ndr import ndr_unpack import os @@ -40,6 +41,24 @@ def toArray(handle, array, num_entries): return [(entry.idx, entry.name) for entry in array.entries[:num_entries]] +# Extract the rid from an ldb message, assumes that the message has a +# objectSID attribute +# +def rid(msg): + sid = ndr_unpack(security.dom_sid, msg["objectSID"][0]) + (_, rid) = sid.split() + return rid + + +# Calculate the request size for EnumDomainUsers and EnumDomainGroups calls +# to hold the specified number of entries. +# We use the w2k3 element size value of 54, code under test +# rounds this up i.e. (1+(max_size/SAMR_ENUM_USERS_MULTIPLIER)) +# +def calc_max_size(num_entries): + return (num_entries - 1) * 54 + + class SamrTests(RpcInterfaceTestCase): def setUp(self): @@ -72,6 +91,18 @@ class SamrTests(RpcInterfaceTestCase): self.domain_handle = self.conn.OpenDomain( self.handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid) + # Filter a list of records, removing those that are not part of the + # current domain. + # + def filter_domain(self, unfiltered): + def sid(msg): + sid = ndr_unpack(security.dom_sid, msg["objectSID"][0]) + (x, _) = sid.split() + return x + + dom_sid = security.dom_sid(self.samdb.get_domain_sid()) + return [x for x in unfiltered if sid(x) == dom_sid] + def test_connect5(self): (level, info, handle) =\ self.conn.Connect5(None, 0, 1, samr.ConnectInfo1()) @@ -435,3 +466,141 @@ class SamrTests(RpcInterfaceTestCase): 5, check_results, select, attributes, self.create_groups) self.delete_dns(dns) + + def test_EnumDomainGroups(self): + def check_results(expected, actual): + for (e, a) in zip(expected, actual): + self.assertTrue(isinstance(a, samr.SamEntry)) + self.assertEquals( + str(e["sAMAccountName"]), str(a.name.string)) + + # Create four groups + # to ensure that we have the minimum needed for the tests. + dns = self.create_groups([1, 2, 3, 4]) + + # + # Get the expected results by querying the samdb database directly. + # We do this rather than use a list of expected results as this runs + # with other tests so we do not have a known fixed list of elements + select = "(&(|(groupType=%d)(groupType=%d))(objectClass=group))" % ( + GTYPE_SECURITY_UNIVERSAL_GROUP, + GTYPE_SECURITY_GLOBAL_GROUP) + attributes = ["sAMAccountName", "objectSID"] + unfiltered = self.samdb.search(expression=select, attrs=attributes) + filtered = self.filter_domain(unfiltered) + self.assertTrue(len(filtered) > 4) + + # Sort the expected results by rid + expected = sorted(list(filtered), key=rid) + + # + # Perform EnumDomainGroups with max size greater than the expected + # number of results. Allow for an extra 10 entries + # + max_size = calc_max_size(len(expected) + 10) + (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + self.assertEquals(len(expected), num_entries) + check_results(expected, actual.entries) + + # + # Perform EnumDomainGroups with size set to so that it contains + # 4 entries. + # + max_size = calc_max_size(4) + (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + self.assertEquals(4, num_entries) + check_results(expected[:4], actual.entries) + + # + # Try calling with resume_handle greater than number of entries + # Should return no results and a resume handle of 0 + max_size = calc_max_size(1) + rh = len(expected) + self.conn.Close(self.handle) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, rh, max_size) + + self.assertEquals(0, num_entries) + self.assertEquals(0, resume_handle) + + # + # Enumerate through the domain groups one element at a time. + # + max_size = calc_max_size(1) + actual = [] + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + while resume_handle: + self.assertEquals(1, num_entries) + actual.append(a.entries[0]) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, resume_handle, max_size) + if num_entries: + actual.append(a.entries[0]) + + # + # Check that the cached results are being returned. + # Obtain a new resume_handle and insert new entries into the + # into the DB + # + actual = [] + max_size = calc_max_size(1) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + extra_dns = self.create_groups([1000, 1002, 1003, 1004]) + while resume_handle: + self.assertEquals(1, num_entries) + actual.append(a.entries[0]) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, resume_handle, max_size) + if num_entries: + actual.append(a.entries[0]) + + self.assertEquals(len(expected), len(actual)) + check_results(expected, actual) + + # + # Perform EnumDomainGroups, we should read the newly added domains + # + max_size = calc_max_size(len(expected) + len(extra_dns) + 10) + (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + self.assertEquals(len(expected) + len(extra_dns), num_entries) + + # + # Get a new expected result set by querying the database directly + unfiltered01 = self.samdb.search(expression=select, attrs=attributes) + filtered01 = self.filter_domain(unfiltered01) + self.assertTrue(len(filtered01) > len(expected)) + + # Sort the expected results by rid + expected01 = sorted(list(filtered01), key=rid) + + # + # Now check that we read the new entries. + # + check_results(expected01, actual.entries) + + # + # Check that deleted results are handled correctly. + # Obtain a new resume_handle and delete entries from the DB. + # + actual = [] + max_size = calc_max_size(1) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + self.delete_dns(extra_dns) + while resume_handle and num_entries: + self.assertEquals(1, num_entries) + actual.append(a.entries[0]) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, resume_handle, max_size) + if num_entries: + actual.append(a.entries[0]) + + self.assertEquals(len(expected), len(actual)) + check_results(expected, actual) + + self.delete_dns(dns) diff --git a/selftest/knownfail.d/samr b/selftest/knownfail.d/samr new file mode 100644 index 00000000000..23e5e1572cf --- /dev/null +++ b/selftest/knownfail.d/samr @@ -0,0 +1,2 @@ +^samba.tests.dcerpc.sam.samba.tests.dcerpc.sam.SamrTests.test_EnumDomainGroups\(ad_dc_ntvfs:local\) +^samba.tests.dcerpc.sam.python3.samba.tests.dcerpc.sam.SamrTests.test_EnumDomainGroups\(ad_dc_ntvfs:local\)