# Blackbox tests for "samba-tool drs" command
# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
self.cmdline_creds = "-U%s/%s%%%s" % (creds.get_domain(),
creds.get_username(), creds.get_password())
+ def tearDown(self):
+ self._enable_inbound_repl(self.dnsname_dc1)
+ self._enable_inbound_repl(self.dnsname_dc2)
+
+ try:
+ shutil.rmtree(os.path.join(self.tempdir, "private"))
+ shutil.rmtree(os.path.join(self.tempdir, "etc"))
+ shutil.rmtree(os.path.join(self.tempdir, "msg.lock"))
+ os.remove(os.path.join(self.tempdir, "names.tdb"))
+ shutil.rmtree(os.path.join(self.tempdir, "state"))
+ except Exception:
+ pass
+
+ super(SambaToolDrsTests, self).tearDown()
+
def _get_rootDSE(self, dc, ldap_only=True):
samdb = samba.tests.connect_samdb(dc, lp=self.get_loadparm(),
credentials=self.get_credentials(),
# Output should be like 'Replicate from <DC-SRC> to <DC-DEST> was successful.'
nc_name = self._get_rootDSE(self.dc1)["defaultNamingContext"]
- out = self.check_output("samba-tool drs replicate --local %s %s %s %s" % (self.dc1,
- self.dc2,
- nc_name,
- self.cmdline_creds))
- self.assertTrue("Incremental" in out)
+
+ def get_num_obj_links(output):
+ num_objs = None
+ num_links = None
+ for word in output.split(" "):
+ try:
+ int(word)
+ if num_objs is None:
+ num_objs = int(word)
+ elif num_links is None:
+ num_links = int(word)
+ except ValueError:
+ pass
+
+ return (num_objs, num_links)
+
+ out = self.check_output("samba-tool drs replicate --local --full-sync %s %s %s %s"
+ % (self.dc1, self.dc2, nc_name, self.cmdline_creds))
+ self.assertTrue("was successful" in out)
+ self.assertTrue("Full" in out)
+
+ (first_obj, _) = get_num_obj_links(out)
+
+ out = self.check_output("samba-tool drs replicate --local %s %s %s %s"
+ % (self.dc1, self.dc2, nc_name, self.cmdline_creds))
self.assertTrue("was successful" in out)
+ self.assertTrue("Incremental" in out)
+
+ (second_obj, _) = get_num_obj_links(out)
+
+ self.assertTrue(first_obj > second_obj)
+
+ server_rootdse = self._get_rootDSE(self.dc1)
+ server_nc_name = server_rootdse["defaultNamingContext"]
+ server_ds_name = server_rootdse["dsServiceName"]
+ server_ldap_service_name = str(server_rootdse["ldapServiceName"][0])
+ server_realm = server_ldap_service_name.split(":")[0]
+ creds = self.get_credentials()
+
+ # We have to give it a different netbiosname every time
+ # it runs, otherwise the collision causes strange issues
+ # to happen. This should be different on different environments.
+ netbiosname = "test" + self.dc2
+ if len(netbiosname) > 15:
+ netbiosname = netbiosname[:15]
+
+ out = self.check_output("samba-tool domain join %s dc --server=%s %s --targetdir=%s --option=netbiosname=%s"
+ % (server_realm, self.dc1, self.cmdline_creds, self.tempdir, netbiosname))
+
+ new_dc_config_file = "%s/etc/smb.conf" % self.tempdir
+
+ self.check_output("samba-tool drs replicate --local %s %s %s %s -s %s"
+ % ("invalid", self.dc1, nc_name,
+ self.cmdline_creds, new_dc_config_file))
+
+ self._disable_inbound_repl(self.dnsname_dc1)
+ self._disable_inbound_repl(self.dnsname_dc2)
+
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2)
+
+ # add an object with link on dc1
+ group_name = "group-repl-local-%s" % self.dc2
+ user_name = "user-repl-local-%s" % self.dc2
+
+ self.check_output("samba-tool group add %s %s -H ldap://%s"
+ % (group_name, self.cmdline_creds, self.dc1))
+ self.check_output("samba-tool user add %s %s --random-password -H ldap://%s"
+ % (user_name, self.cmdline_creds, self.dc1))
+ self.check_output("samba-tool group addmembers %s %s %s -H ldap://%s"
+ % (group_name, user_name, self.cmdline_creds, self.dc1))
+
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1)
+
+ # pull that change with --local into local db from dc1: should send link and some objects
+ out = self.check_output("samba-tool drs replicate --local %s %s %s %s -s %s"
+ % ("invalid", self.dc1, nc_name,
+ self.cmdline_creds, new_dc_config_file))
+
+ (obj_1, link_1) = get_num_obj_links(out)
+
+ self.assertEqual(obj_1, 2)
+ self.assertEqual(link_1, 1)
+
+ # pull that change with --local into local db from dc2: shouldn't send link or object
+ # as we sent an up-to-dateness vector showing that we had already synced with DC1
+ out = self.check_output("samba-tool drs replicate --local %s %s %s %s -s %s"
+ % ("invalid", self.dc2, nc_name,
+ self.cmdline_creds, new_dc_config_file))
+
+ (obj_2, link_2) = get_num_obj_links(out)
+
+ self.assertEqual(obj_2, 0)
+ self.assertEqual(link_2, 0)
def test_samba_tool_replicate_machine_creds_P(self):
"""Tests 'samba-tool drs replicate -P' command with machine creds."""
attrs=[])
self.assertRaises(ldb.LdbError, check_dns_account_obj)
- shutil.rmtree(os.path.join(self.tempdir, "private"))
- shutil.rmtree(os.path.join(self.tempdir, "etc"))
- shutil.rmtree(os.path.join(self.tempdir, "msg.lock"))
- os.remove(os.path.join(self.tempdir, "names.tdb"))
- shutil.rmtree(os.path.join(self.tempdir, "state"))
-
def test_samba_tool_drs_clone_dc_secrets(self):
"""Tests 'samba-tool drs clone-dc-database --include-secrets' command ."""
server_rootdse = self._get_rootDSE(self.dc1)
attrs=[])
self.assertRaises(ldb.LdbError, check_dns_account_obj)
- shutil.rmtree(os.path.join(self.tempdir, "private"))
- shutil.rmtree(os.path.join(self.tempdir, "etc"))
- shutil.rmtree(os.path.join(self.tempdir, "msg.lock"))
- os.remove(os.path.join(self.tempdir, "names.tdb"))
- shutil.rmtree(os.path.join(self.tempdir, "state"))
-
def test_samba_tool_drs_clone_dc_secrets_without_targetdir(self):
"""Tests 'samba-tool drs clone-dc-database' command without --targetdir."""
server_rootdse = self._get_rootDSE(self.dc1)