+ check_output('systemctl daemon-reload')
+
+ check_output('systemctl start systemd-networkd.socket')
+ check_output('systemctl start systemd-networkd.service')
+
+def read_link_attr(link, dev, attribute):
+ with open(os.path.join(os.path.join(os.path.join('/sys/class/net/', link), dev), attribute)) as f:
+ return f.readline().strip()
+
+def read_bridge_port_attr(bridge, link, attribute):
+ path_bridge = os.path.join('/sys/devices/virtual/net', bridge)
+ path_port = 'lower_' + link + '/brport'
+ path = os.path.join(path_bridge, path_port)
+
+ with open(os.path.join(path, attribute)) as f:
+ return f.readline().strip()
+
+def link_exists(link):
+ return os.path.exists(os.path.join('/sys/class/net', link))
+
+def remove_links(links):
+ for link in links:
+ if link_exists(link):
+ call('ip link del dev', link)
+ time.sleep(1)
+
+def remove_fou_ports(ports):
+ for port in ports:
+ call('ip fou del port', port, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+
+def remove_routing_policy_rule_tables(tables):
+ for table in tables:
+ rc = 0
+ while rc == 0:
+ rc = call('ip rule del table', table, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+
+def remove_routes(routes):
+ for route_type, addr in routes:
+ call('ip route del', route_type, addr, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+
+def remove_l2tp_tunnels(tunnel_ids):
+ output = check_output('ip l2tp show tunnel')
+ for tid in tunnel_ids:
+ words='Tunnel ' + tid + ', encap'
+ if words in output:
+ call('ip l2tp del tunnel tid', tid)
+ time.sleep(1)
+
+def read_ipv6_sysctl_attr(link, attribute):
+ with open(os.path.join(os.path.join(network_sysctl_ipv6_path, link), attribute)) as f:
+ return f.readline().strip()
+
+def read_ipv4_sysctl_attr(link, attribute):
+ with open(os.path.join(os.path.join(network_sysctl_ipv4_path, link), attribute)) as f:
+ return f.readline().strip()
+
+def copy_unit_to_networkd_unit_path(*units):
+ print()
+ for unit in units:
+ shutil.copy(os.path.join(networkd_ci_path, unit), network_unit_file_path)
+ if (os.path.exists(os.path.join(networkd_ci_path, unit + '.d'))):
+ copytree(os.path.join(networkd_ci_path, unit + '.d'), os.path.join(network_unit_file_path, unit + '.d'))
+
+def remove_unit_from_networkd_path(units):
+ for unit in units:
+ if (os.path.exists(os.path.join(network_unit_file_path, unit))):
+ os.remove(os.path.join(network_unit_file_path, unit))
+ if (os.path.exists(os.path.join(network_unit_file_path, unit + '.d'))):
+ shutil.rmtree(os.path.join(network_unit_file_path, unit + '.d'))
+
+def warn_about_firewalld():
+ rc = call('systemctl -q is-active firewalld.service')
+ if rc == 0:
+ print('\nWARNING: firewalld.service is active. The test may fail.')
+
+def start_dnsmasq(additional_options='', ipv4_range='192.168.5.10,192.168.5.200', ipv6_range='2600::10,2600::20', lease_time='1h'):
+ warn_about_firewalld()
+ dnsmasq_command = f'dnsmasq -8 /var/run/networkd-ci/test-dnsmasq-log-file --log-queries=extra --log-dhcp --pid-file=/var/run/networkd-ci/test-test-dnsmasq.pid --conf-file=/dev/null --interface=veth-peer --enable-ra --dhcp-range={ipv6_range},{lease_time} --dhcp-range={ipv4_range},{lease_time} -R --dhcp-leasefile=/var/run/networkd-ci/lease --dhcp-option=26,1492 --dhcp-option=option:router,192.168.5.1 --dhcp-option=33,192.168.5.4,192.168.5.5 --port=0 ' + additional_options
+ check_output(dnsmasq_command)
+
+def stop_dnsmasq(pid_file):
+ if os.path.exists(pid_file):
+ with open(pid_file, 'r') as f:
+ pid = f.read().rstrip(' \t\r\n\0')
+ os.kill(int(pid), signal.SIGTERM)
+
+ os.remove(pid_file)
+
+def search_words_in_dnsmasq_log(words, show_all=False):
+ if os.path.exists(dnsmasq_log_file):
+ with open (dnsmasq_log_file) as in_file:
+ contents = in_file.read()
+ if show_all:
+ print(contents)
+ for line in contents.splitlines():
+ if words in line:
+ in_file.close()
+ print("%s, %s" % (words, line))
+ return True
+ return False
+
+def remove_lease_file():
+ if os.path.exists(os.path.join(networkd_ci_path, 'lease')):
+ os.remove(os.path.join(networkd_ci_path, 'lease'))
+
+def remove_log_file():
+ if os.path.exists(dnsmasq_log_file):
+ os.remove(dnsmasq_log_file)
+
+def start_networkd(sleep_sec=5, remove_state_files=True):
+ if (remove_state_files and
+ os.path.exists(os.path.join(networkd_runtime_directory, 'state'))):
+ check_output('systemctl stop systemd-networkd')
+ os.remove(os.path.join(networkd_runtime_directory, 'state'))
+ check_output('systemctl start systemd-networkd')
+ else:
+ check_output('systemctl restart systemd-networkd')
+ if sleep_sec > 0:
+ time.sleep(sleep_sec)
+
+def wait_online(links_with_operstate, timeout='20s', bool_any=False):
+ args = wait_online_cmd + [f'--timeout={timeout}'] + [f'--interface={link}' for link in links_with_operstate]
+ if bool_any:
+ args += ['--any']
+ try:
+ check_output(*args, env=env)
+ except subprocess.CalledProcessError:
+ for link in links_with_operstate:
+ output = check_output(*networkctl_cmd, 'status', link.split(':')[0], env=env)
+ print(output)
+ raise