- self.assertTrue(self.link_exists(link))
-
- def remove_links(self, links):
- for link in links:
- if self.link_exists(link):
- subprocess.call(['ip', 'link', 'del', 'dev', link])
- time.sleep(1)
-
- def remove_fou_ports(self, ports):
- for port in ports:
- subprocess.call(['ip', 'fou', 'del', 'port', port])
-
- def remove_routing_policy_rule_tables(self, tables):
- for table in tables:
- subprocess.call(['ip', 'rule', 'del', 'table', table])
-
- def remove_routes(self, routes):
- for route_type, addr in routes:
- subprocess.call(['ip', 'route', 'del', route_type, addr])
-
- def l2tp_tunnel_remove(self, tunnel_ids):
- output = subprocess.check_output(['ip', 'l2tp', 'show', 'tunnel'], universal_newlines=True).rstrip()
- for tid in tunnel_ids:
- words='Tunnel ' + tid + ', encap'
- if words in output:
- subprocess.call(['ip', 'l2tp', 'del', 'tunnel', 'tid', tid])
- time.sleep(1)
-
- def read_ipv6_sysctl_attr(self, link, attribute):
- with open(os.path.join(os.path.join(network_sysctl_ipv6_path, link), attribute)) as f:
- return f.readline().strip()
-
- def read_ipv4_sysctl_attr(self, link, attribute):
- with open(os.path.join(os.path.join(network_sysctl_ipv4_path, link), attribute)) as f:
- return f.readline().strip()
-
- def copy_unit_to_networkd_unit_path(self, *units):
- print()
- for unit in units:
- shutil.copy(os.path.join(networkd_ci_path, unit), network_unit_file_path)
- if (os.path.exists(os.path.join(networkd_ci_path, unit + '.d'))):
- copytree(os.path.join(networkd_ci_path, unit + '.d'), os.path.join(network_unit_file_path, unit + '.d'))
-
- def remove_unit_from_networkd_path(self, units):
- for unit in units:
- if (os.path.exists(os.path.join(network_unit_file_path, unit))):
- os.remove(os.path.join(network_unit_file_path, unit))
- if (os.path.exists(os.path.join(network_unit_file_path, unit + '.d'))):
- shutil.rmtree(os.path.join(network_unit_file_path, unit + '.d'))
-
- def warn_about_firewalld(self):
- rc = subprocess.call(['systemctl', '-q', 'is-active', 'firewalld.service'])
- if rc == 0:
- print('\nWARNING: firewalld.service is active. The test may fail.')
-
- def start_dnsmasq(self, additional_options='', ipv4_range='192.168.5.10,192.168.5.200', ipv6_range='2600::10,2600::20', lease_time='1h'):
- self.warn_about_firewalld()
- dnsmasq_command = f'dnsmasq -8 /var/run/networkd-ci/test-dnsmasq-log-file --log-queries=extra --log-dhcp --pid-file=/var/run/networkd-ci/test-test-dnsmasq.pid --conf-file=/dev/null --interface=veth-peer --enable-ra --dhcp-range={ipv6_range},{lease_time} --dhcp-range={ipv4_range},{lease_time} -R --dhcp-leasefile=/var/run/networkd-ci/lease --dhcp-option=26,1492 --dhcp-option=option:router,192.168.5.1 --dhcp-option=33,192.168.5.4,192.168.5.5 --port=0 ' + additional_options
- subprocess.check_call(dnsmasq_command, shell=True)
-
- def stop_dnsmasq(self, pid_file):
- if os.path.exists(pid_file):
- with open(pid_file, 'r') as f:
- pid = f.read().rstrip(' \t\r\n\0')
- os.kill(int(pid), signal.SIGTERM)
-
- os.remove(pid_file)
-
- def search_words_in_dnsmasq_log(self, words, show_all=False):
- if os.path.exists(dnsmasq_log_file):
- with open (dnsmasq_log_file) as in_file:
- contents = in_file.read()
- if show_all:
- print(contents)
- for line in contents.splitlines():
- if words in line:
- in_file.close()
- print("%s, %s" % (words, line))
- return True
- return False
-
- def remove_lease_file(self):
- if os.path.exists(os.path.join(networkd_ci_path, 'lease')):
- os.remove(os.path.join(networkd_ci_path, 'lease'))
-
- def remove_log_file(self):
- if os.path.exists(dnsmasq_log_file):
- os.remove(dnsmasq_log_file)
-
- def start_networkd(self, sleep_sec=5, remove_state_files=True):
- if (remove_state_files and
- os.path.exists(os.path.join(networkd_runtime_directory, 'state'))):
- subprocess.check_call('systemctl stop systemd-networkd', shell=True)
- os.remove(os.path.join(networkd_runtime_directory, 'state'))
- subprocess.check_call('systemctl start systemd-networkd', shell=True)
- else:
- subprocess.check_call('systemctl restart systemd-networkd', shell=True)
- if sleep_sec > 0:
- time.sleep(sleep_sec)
-
- def wait_online(self, links_with_operstate, timeout='20s', bool_any=False):
- args = wait_online_cmd + [f'--timeout={timeout}'] + [f'--interface={link}' for link in links_with_operstate]
- if bool_any:
- args += ['--any']
- try:
- subprocess.check_call(args, env=env)
- except subprocess.CalledProcessError:
- for link in links_with_operstate:
- output = subprocess.check_output(networkctl_cmd + ['status', link.split(':')[0]], universal_newlines=True, env=env).rstrip()
- print(output)
- raise
-
- def get_operstate(self, link, show_status=True, setup_state='configured'):
- output = subprocess.check_output(networkctl_cmd + ['status', link], universal_newlines=True, env=env).rstrip()
- if show_status:
- print(output)
- for line in output.splitlines():
- if 'State:' in line and (not setup_state or setup_state in line):
- return line.split()[1]
- return None