]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
drs tests: querying linked attribute over DRS
authorDouglas Bagnall <douglas.bagnall@catalyst.net.nz>
Thu, 14 Jul 2016 06:03:33 +0000 (18:03 +1200)
committerGarming Sam <garming@samba.org>
Fri, 15 Jul 2016 08:01:28 +0000 (10:01 +0200)
Without the deactivated links control, we assert certain conditions over DRS
instead.

Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
source4/selftest/tests.py
source4/torture/drs/python/linked_attributes_drs.py [new file with mode: 0644]

index 01cb87b1db3ec328157012ee82f5969b9444e571..8b2512fb9878ff590633c43bb614e25d7e813aba 100755 (executable)
@@ -674,6 +674,11 @@ for env in ['vampire_dc', 'promoted_dc']:
                            name="samba4.drs.getnc_exop.python(%s)" % env,
                            environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
                            extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
+    planoldpythontestsuite(env, "linked_attributes_drs",
+                           extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
+                           name="samba4.drs.linked_attributes_drs.python(%s)" % env,
+                           environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
+                           extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
 
 
 planoldpythontestsuite("chgdcpass:local", "samba.tests.blackbox.samba_dnsupdate",
diff --git a/source4/torture/drs/python/linked_attributes_drs.py b/source4/torture/drs/python/linked_attributes_drs.py
new file mode 100644 (file)
index 0000000..04d31c2
--- /dev/null
@@ -0,0 +1,212 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Originally based on ./sam.py
+import sys
+import os
+import base64
+import random
+import re
+
+sys.path.insert(0, "bin/python")
+import samba
+from samba.tests.subunitrun import SubunitOptions, TestProgram
+
+import samba.getopt as options
+
+from samba.auth import system_session
+import ldb
+from samba.samdb import SamDB
+from samba.dcerpc import misc
+
+from samba.dcerpc import drsuapi, misc, drsblobs
+from samba.drs_utils import drs_DsBind
+from samba.ndr import ndr_unpack, ndr_pack
+
+import drs_base
+
+import time
+
+
+class LATestException(Exception):
+    pass
+
+class ExopBaseTest:
+    def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
+                   replica_flags=0, max_objects=0):
+        req8 = drsuapi.DsGetNCChangesRequest8()
+
+        req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
+        req8.source_dsa_invocation_id = misc.GUID(invocation_id)
+        req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
+        req8.naming_context.dn = unicode(nc_dn_str)
+        req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
+        req8.highwatermark.tmp_highest_usn = 0
+        req8.highwatermark.reserved_usn = 0
+        req8.highwatermark.highest_usn = 0
+        req8.uptodateness_vector = None
+        req8.replica_flags = replica_flags
+        req8.max_object_count = max_objects
+        req8.max_ndr_size = 402116
+        req8.extended_op = exop
+        req8.fsmo_info = 0
+        req8.partial_attribute_set = None
+        req8.partial_attribute_set_ex = None
+        req8.mapping_ctr.num_mappings = 0
+        req8.mapping_ctr.mappings = None
+
+        return req8
+
+    def _ds_bind(self, server_name):
+        binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
+
+        drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
+        (drs_handle, supported_extensions) = drs_DsBind(drs)
+        return (drs, drs_handle)
+
+    
+class LATests(drs_base.DrsBaseTestCase, ExopBaseTest):
+
+    def setUp(self):
+        super(LATests, self).setUp()
+        # DrsBaseTestCase sets up self.ldb_dc1, self.ldb_dc2
+        # we're only using one
+        self.samdb = self.ldb_dc1
+        
+        self.base_dn = self.samdb.domain_dn()
+        self.ou = "OU=la,%s" % self.base_dn
+        if True:
+            try:
+                self.samdb.delete(self.ou, ['tree_delete:1'])
+            except ldb.LdbError, e:
+                pass
+        self.samdb.add({'objectclass': 'organizationalUnit',
+                        'dn': self.ou})
+
+        self.dc_guid = self.samdb.get_invocation_id()
+        self.drs, self.drs_handle = self._ds_bind(self.dnsname_dc1)
+
+    def tearDown(self):
+        super(LATests, self).tearDown()
+        try:
+            self.samdb.delete(self.ou, ['tree_delete:1'])
+        except ldb.LdbError, e:
+            pass
+
+    def delete_user(self, user):
+        self.samdb.delete(user['dn'])
+        del self.users[self.users.index(user)]
+
+    def add_object(self, cn, objectclass):
+        dn = "CN=%s,%s" % (cn, self.ou)
+        self.samdb.add({'cn': cn,
+                      'objectclass': objectclass,
+                      'dn': dn})
+
+        return dn
+
+    def add_objects(self, n, objectclass, prefix=None):
+        if prefix is None:
+            prefix = objectclass
+        dns = []
+        for i in range(n):
+            dns.append(self.add_object("%s%d" % (prefix, i + 1),
+                                       objectclass))
+        return dns
+
+    def add_linked_attribute(self, src, dest, attr='member'):
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.samdb, src)
+        m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
+        self.samdb.modify(m)
+
+    def remove_linked_attribute(self, src, dest, attr='member'):
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.samdb, src)
+        m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
+        self.samdb.modify(m)
+
+    def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE):
+
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.dc_guid,
+                               nc_dn_str=obj,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
+
+        level, ctr = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
+        expected_attid = getattr(drsuapi, 'DRSUAPI_ATTID_' + attr)
+
+        links = []
+        for link in ctr.linked_attributes:
+            if link.attid == expected_attid:
+                unpacked = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
+                                      link.value.blob)
+                active = link.flags &  drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
+                links.append((str(unpacked.dn), bool(active)))
+
+        return links
+
+
+    def assert_forward_links(self, obj, expected, attr='member'):
+        results = self.attr_search(obj, expected, attr)
+        self.assertEqual(len(results), len(expected))
+
+        for k, v in results:
+            self.assertTrue(k in expected) 
+            self.assertEqual(expected[k], v, "%s active flag should be %d, not %d" %
+                             (k, expected[k], v))
+
+    def get_object_guid(self, dn):
+        res = self.samdb.search(dn,
+                                scope=ldb.SCOPE_BASE,
+                                attrs=['objectGUID'])
+        return str(misc.GUID(res[0]['objectGUID'][0]))
+
+    def test_links_all_delete_group(self):
+        u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
+        g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
+        g2guid = self.get_object_guid(g2)
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.samdb.delete(g2)
+        self.assert_forward_links(g1, {u1: True})
+        res = self.samdb.search('<GUID=%s>' % g2guid,
+                                scope=ldb.SCOPE_BASE,
+                                controls=['show_deleted:1'])
+        new_dn = res[0].dn
+        self.assert_forward_links(new_dn, {})
+
+
+    def test_la_links_delete_link(self):
+        u1, u2 = self.add_objects(2, 'user', 'u_del_link')
+        g1, g2 = self.add_objects(2, 'group', 'g_del_link')
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.remove_linked_attribute(g2, u1)
+
+        self.assert_forward_links(g1, {u1: True})
+        self.assert_forward_links(g2, {u1: False, u2: True})
+
+        self.add_linked_attribute(g2, u1)
+        self.remove_linked_attribute(g2, u2)
+        self.assert_forward_links(g2, {u1: True, u2: False})
+        self.remove_linked_attribute(g2, u1)
+        self.assert_forward_links(g2, {u1: False, u2: False})
+
+    def test_la_links_delete_user(self):
+        u1, u2 = self.add_objects(2, 'user', 'u_del_user')
+        g1, g2 = self.add_objects(2, 'group', 'g_del_user')
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.samdb.delete(u1)
+
+        self.assert_forward_links(g1, {})
+        self.assert_forward_links(g2, {u2: True})