From: Stefan Metzmacher Date: Mon, 15 Jan 2024 13:20:00 +0000 (+0100) Subject: python:tests/rpcd_witness_samba_only: add tests for 'net witness {client,share}-move' X-Git-Tag: talloc-2.4.2~13 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=290ef547d869100bdea42784b8a8783085eed805;p=thirdparty%2Fsamba.git python:tests/rpcd_witness_samba_only: add tests for 'net witness {client,share}-move' Signed-off-by: Stefan Metzmacher Reviewed-by: Günther Deschner --- diff --git a/python/samba/tests/blackbox/rpcd_witness_samba_only.py b/python/samba/tests/blackbox/rpcd_witness_samba_only.py index 3467e988385..ca6b6210f50 100755 --- a/python/samba/tests/blackbox/rpcd_witness_samba_only.py +++ b/python/samba/tests/blackbox/rpcd_witness_samba_only.py @@ -171,17 +171,23 @@ class RpcdWitnessSambaTests(BlackboxTestCase): def call_net_witness_subcmd(self, subcmd, as_json=False, + apply_to_all=False, registration=None, net_name=None, share_name=None, ip_address=None, - client_computer=None): + client_computer=None, + new_ip=None, + new_node=None): COMMAND = "UID_WRAPPER_ROOT=1 bin/net witness" argv = "%s %s" % (COMMAND, subcmd) if as_json: argv += " --json" + if apply_to_all: + argv += " --witness-apply-to-all" + if registration is not None: argv += " --witness-registration='%s'" % ( registration.uuid) @@ -198,6 +204,12 @@ class RpcdWitnessSambaTests(BlackboxTestCase): if client_computer is not None: argv += " --witness-client-computer-name='%s'" % (client_computer) + if new_ip is not None: + argv += " --witness-new-ip='%s'" % (new_ip) + + if new_node is not None: + argv += " --witness-new-node='%s'" % (new_node) + try: if self.verbose: print("Calling: %s" % argv) @@ -312,6 +324,40 @@ class RpcdWitnessSambaTests(BlackboxTestCase): expected_resource_changes = [expected_resource_change] self.assertResourceChanges(response, expected_resource_changes) + def assertGenericIpLists(self, response, expected_type, expected_ip_lists): + self.assertIsNotNone(response) + self.assertEqual(response.type, expected_type) + self.assertEqual(response.num, len(expected_ip_lists)) + self.assertEqual(len(response.messages), len(expected_ip_lists)) + for li in range(0, len(expected_ip_lists)): + + expected_ip_list = expected_ip_lists[li] + ip_list = response.messages[li] + self.assertIsNotNone(ip_list) + self.assertEqual(ip_list.num, len(expected_ip_list)) + + for i in range(0, len(expected_ip_list)): + ip_info = ip_list.addr[i] + + expected_flags = 0 + expected_flags |= witness.WITNESS_IPADDR_V4 + expected_flags |= witness.WITNESS_IPADDR_ONLINE + expected_flags = expected_ip_list[i].get('flags', expected_flags) + + expected_ipv4 = '0.0.0.0' + expected_ipv4 = expected_ip_list[i].get('ipv4', expected_ipv4) + + expected_ipv6 = '0000:0000:0000:0000:0000:0000:0000:0000' + expected_ipv6 = expected_ip_list[i].get('ipv6', expected_ipv6) + + self.assertEqual(ip_info.flags, expected_flags) + + self.assertIsNotNone(ip_info.ipv4) + self.assertEqual(ip_info.ipv4, expected_ipv4) + + self.assertIsNotNone(ip_info.ipv6) + self.assertEqual(ip_info.ipv6, expected_ipv6) + @classmethod def _define_ResourceChangeCTDB_tests(cls, conn_idx, monitor_idx, ndr64=False): if ndr64: @@ -716,11 +762,16 @@ class RpcdWitnessSambaTests(BlackboxTestCase): def check_net_witness_output(self, cmd, regs, + apply_to_all=False, registration_idx=None, net_name=None, share_name=None, ip_address=None, - client_computer=None): + client_computer=None, + new_ip=None, + new_node=None, + expected_msg_type=None, + callback=None): self.open_all_registrations() if registration_idx is not None: registration = regs[registration_idx]['context'] @@ -729,17 +780,24 @@ class RpcdWitnessSambaTests(BlackboxTestCase): registration = None plain_res = self.call_net_witness_subcmd(cmd, + apply_to_all=apply_to_all, registration=registration, net_name=net_name, share_name=share_name, ip_address=ip_address, - client_computer=client_computer) + client_computer=client_computer, + new_ip=new_ip, + new_node=new_node) if self.verbose: print("%s" % plain_res) plain_lines = plain_res.splitlines() num_headlines = 2 + if expected_msg_type: + num_headlines += 1 self.assertEqual(len(plain_lines), num_headlines+len(regs)) + if expected_msg_type: + self.assertIn(expected_msg_type, plain_lines[0]) plain_lines = plain_lines[num_headlines:] self.assertEqual(len(plain_lines), len(regs)) @@ -765,6 +823,9 @@ class RpcdWitnessSambaTests(BlackboxTestCase): self.assertEqual(line, expected_line) self.assertIsNotNone(line) + if callback is not None: + callback(reg) + self.close_all_registrations() self.open_all_registrations() @@ -776,13 +837,18 @@ class RpcdWitnessSambaTests(BlackboxTestCase): json_res = self.call_net_witness_subcmd(cmd, as_json=True, + apply_to_all=apply_to_all, registration=registration, net_name=net_name, share_name=share_name, ip_address=ip_address, - client_computer=client_computer) + client_computer=client_computer, + new_ip=new_ip, + new_node=new_node) num_filters = 0 + if apply_to_all: + num_filters += 1 if registration: num_filters += 1 if net_name: @@ -795,14 +861,21 @@ class RpcdWitnessSambaTests(BlackboxTestCase): num_filters += 1 num_toplevel = 2 + if expected_msg_type: + num_toplevel += 1 self.assertIn('filters', json_res); + if expected_msg_type: + self.assertIn('message', json_res); self.assertIn('registrations', json_res); self.assertEqual(len(json_res.keys()), num_toplevel) json_filters = json_res['filters'] self.assertEqual(len(json_filters.keys()), num_filters) + if apply_to_all: + self.assertTrue(json_filters['--witness-apply-to-all']) + if registration: self.assertEqual(json_filters['--witness-registration'], str(registration.uuid)) @@ -818,6 +891,22 @@ class RpcdWitnessSambaTests(BlackboxTestCase): if client_computer: self.assertEqual(json_filters['--witness-client-computer-name'], client_computer) + if expected_msg_type: + json_message = json_res['message'] + num_sub = 1 + self.assertEqual(json_message['type'], expected_msg_type); + + if new_ip is not None: + num_sub += 1 + self.assertEqual(json_message['new_ip'], new_ip); + elif new_node == -1: + num_sub += 1 + self.assertTrue(json_message['all_nodes']) + elif new_node is not None: + num_sub += 1 + self.assertEqual(json_message['new_node'], new_node) + + self.assertEqual(len(json_message.keys()), num_sub) json_regs = json_res['registrations'] self.assertEqual(len(json_regs.keys()), len(regs)) @@ -829,6 +918,9 @@ class RpcdWitnessSambaTests(BlackboxTestCase): json_reg = json_regs[str(reg_uuid)] self.assertJsonReg(json_reg, reg) + if callback is not None: + callback(reg) + self.close_all_registrations() def check_combinations(self, check_func, only_shares=False): @@ -846,6 +938,10 @@ class RpcdWitnessSambaTests(BlackboxTestCase): else: no_share_name_regs.append(reg) + if only_shares: + all_regs = all_share_name_regs + no_share_name_regs = [] + ip_address_regs = {} computer_name_regs = {} for reg in all_regs: @@ -864,7 +960,8 @@ class RpcdWitnessSambaTests(BlackboxTestCase): all_computer_names = '|'.join(computer_name_regs.keys()) common_computer_name = self.max_common_prefix(computer_name_regs.keys()) - check_func(all_regs) + check_func(all_regs, + apply_to_all=True) check_func(all_regs, net_name=self.server_hostname) check_func(all_regs, @@ -916,13 +1013,17 @@ class RpcdWitnessSambaTests(BlackboxTestCase): def test_net_witness_list(self): def check_list(regs, + apply_to_all=False, registration_idx=None, net_name=None, share_name=None, ip_address=None, client_computer=None): + # --witness-apply-to-all is not needed for 'list' + apply_to_all = None return self.check_net_witness_output('list', regs, + apply_to_all=apply_to_all, registration_idx=registration_idx, net_name=net_name, share_name=share_name, @@ -931,6 +1032,112 @@ class RpcdWitnessSambaTests(BlackboxTestCase): self.check_combinations(check_list) + def _test_net_witness_generic_move(self, + move_cmd, + msg_type_prefix, + msg_type): + def _check_generic_move(regs, + apply_to_all=False, + registration_idx=None, + net_name=None, + share_name=None, + ip_address=None, + client_computer=None, + new_ip=None, + new_node=None): + + if new_ip: + expected_msg_type = "%s_IPV4" % msg_type_prefix + else: + expected_msg_type = "%s_NODE" % msg_type_prefix + + expected_ip_list = [] + if new_ip: + ip = { 'ipv4': str(new_ip), } + expected_ip_list.append(ip) + if new_node == -1: + for node_idx in range(0, len(self.nodes)): + node = self.nodes[node_idx] + ip = { 'ipv4': str(node['ip']), } + expected_ip_list.append(ip) + elif new_node is not None: + node = self.nodes[new_node] + ip = { 'ipv4': str(node['ip']), } + expected_ip_list.append(ip) + + expected_ip_lists = [expected_ip_list] + + def check_generic_move_response(reg): + conn = reg['conn'] + reg_context = reg['context'] + response = conn.AsyncNotify(reg_context) + self.assertGenericIpLists(response, msg_type, expected_ip_lists) + + return self.check_net_witness_output(move_cmd, + regs, + apply_to_all=apply_to_all, + registration_idx=registration_idx, + net_name=net_name, + share_name=share_name, + ip_address=ip_address, + client_computer=client_computer, + new_ip=new_ip, + new_node=new_node, + expected_msg_type=expected_msg_type, + callback=check_generic_move_response) + + def check_generic_move(regs, + apply_to_all=False, + registration_idx=None, + net_name=None, + share_name=None, + ip_address=None, + client_computer=None): + _check_generic_move(regs, + apply_to_all=apply_to_all, + registration_idx=registration_idx, + net_name=net_name, + share_name=share_name, + ip_address=ip_address, + client_computer=client_computer, + new_node=-1) + + for node_idx in range(0, len(self.nodes)): + node = self.nodes[node_idx] + + _check_generic_move(regs, + apply_to_all=apply_to_all, + registration_idx=registration_idx, + net_name=net_name, + share_name=share_name, + ip_address=ip_address, + client_computer=client_computer, + new_node=node_idx) + _check_generic_move(regs, + apply_to_all=apply_to_all, + registration_idx=registration_idx, + net_name=net_name, + share_name=share_name, + ip_address=ip_address, + client_computer=client_computer, + new_ip=node['ip']) + + if msg_type == witness.WITNESS_NOTIFY_CLIENT_MOVE: + only_shares = False + elif msg_type == witness.WITNESS_NOTIFY_SHARE_MOVE: + only_shares = True + + self.check_combinations(check_generic_move, only_shares=only_shares) + + def test_net_witness_client_move(self): + self._test_net_witness_generic_move('client-move', + 'CLIENT_MOVE_TO', + witness.WITNESS_NOTIFY_CLIENT_MOVE) + def test_net_witness_share_move(self): + self._test_net_witness_generic_move('share-move', + 'SHARE_MOVE_TO', + witness.WITNESS_NOTIFY_SHARE_MOVE) + if __name__ == "__main__": import unittest unittest.main()