#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1-or-later
-# pylint: disable=line-too-long,too-many-lines,too-many-branches,too-many-statements,too-many-arguments
-# pylint: disable=too-many-public-methods,too-many-boolean-expressions,invalid-name
-# pylint: disable=missing-function-docstring,missing-class-docstring,missing-module-docstring
# systemd-networkd tests
# These tests can be executed in the systemd mkosi image when booted in QEMU. After booting the QEMU VM,
import argparse
import errno
import itertools
+import json
import os
import pathlib
import re
import shutil
import signal
+import socket
import subprocess
import sys
import time
isc_dhcpd_pid_file = '/run/networkd-ci/test-isc-dhcpd.pid'
isc_dhcpd_lease_file = '/run/networkd-ci/test-isc-dhcpd.lease'
+radvd_pid_file = '/run/networkd-ci/test-radvd.pid'
+
systemd_lib_paths = ['/usr/lib/systemd', '/lib/systemd']
which_paths = ':'.join(systemd_lib_paths + os.getenv('PATH', os.defpath).lstrip(':').split(':'))
command = command[0].split() + list(command[1:])
return subprocess.run(command, check=False, universal_newlines=True, stderr=subprocess.STDOUT, **kwargs).returncode
+def call_check(*command, **kwargs):
+ # Same as call() above, but it triggers CalledProcessError if rc != 0
+ command = command[0].split() + list(command[1:])
+ return subprocess.run(command, check=False, universal_newlines=True, stderr=subprocess.STDOUT, **kwargs).check_returncode()
+
def call_quiet(*command, **kwargs):
command = command[0].split() + list(command[1:])
return subprocess.run(command, check=False, universal_newlines=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **kwargs).returncode
command = command[0].split() + list(command[1:])
return subprocess.run(command, check=False, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
+def check_json(string):
+ try:
+ json.loads(string)
+ except json.JSONDecodeError:
+ print(f"String is not a valid JSON: '{string}'")
+ raise
+
def is_module_available(*module_names):
for module_name in module_names:
lsmod_output = check_output('lsmod')
with open(f'/run/systemd/system/{unit}.d/00-override.conf', mode='w', encoding='utf-8') as f:
f.write('\n'.join(contents))
-def create_service_dropin(service, command, reload_command=None, additional_settings=None):
+def create_service_dropin(service, command, additional_settings=None):
drop_in = [
'[Service]',
'ExecStart=',
f'ExecStart=!!{valgrind_cmd}{command}',
]
- if reload_command:
- drop_in += [
- 'ExecReload=',
- f'ExecReload={valgrind_cmd}{reload_command}',
- ]
if enable_debug:
drop_in += ['Environment=SYSTEMD_LOG_LEVEL=debug']
if asan_options:
def read_ipv4_sysctl_attr(link, attribute):
return read_ip_sysctl_attr(link, attribute, 'ipv4')
+def stop_by_pid_file(pid_file):
+ if not os.path.exists(pid_file):
+ return
+ with open(pid_file, 'r', encoding='utf-8') as f:
+ pid = f.read().rstrip(' \t\r\n\0')
+ os.kill(int(pid), signal.SIGTERM)
+ for _ in range(25):
+ try:
+ os.kill(int(pid), 0)
+ print(f"PID {pid} is still alive, waiting...")
+ time.sleep(.2)
+ except OSError as e:
+ if e.errno == errno.ESRCH:
+ break
+ print(f"Unexpected exception when waiting for {pid} to die: {e.errno}")
+ rm_f(pid_file)
+
def start_dnsmasq(*additional_options, interface='veth-peer', lease_time='2m', ipv4_range='192.168.5.10,192.168.5.200', ipv4_router='192.168.5.1', ipv6_range='2600::10,2600::20'):
command = (
'dnsmasq',
) + additional_options
check_output(*command)
-def stop_by_pid_file(pid_file):
- if not os.path.exists(pid_file):
- return
- with open(pid_file, 'r', encoding='utf-8') as f:
- pid = f.read().rstrip(' \t\r\n\0')
- os.kill(int(pid), signal.SIGTERM)
- for _ in range(25):
- try:
- os.kill(int(pid), 0)
- print(f"PID {pid} is still alive, waiting...")
- time.sleep(.2)
- except OSError as e:
- if e.errno == errno.ESRCH:
- break
- print(f"Unexpected exception when waiting for {pid} to die: {e.errno}")
- os.remove(pid_file)
-
def stop_dnsmasq():
stop_by_pid_file(dnsmasq_pid_file)
rm_f(dnsmasq_lease_file)
stop_by_pid_file(isc_dhcpd_pid_file)
rm_f(isc_dhcpd_lease_file)
+def start_radvd(*additional_options, config_file):
+ config_file_path = os.path.join(networkd_ci_temp_dir, 'radvd', config_file)
+ command = (
+ 'radvd',
+ f'--pidfile={radvd_pid_file}',
+ f'--config={config_file_path}',
+ '--logmethod=stderr',
+ ) + additional_options
+ check_output(*command)
+
+def stop_radvd():
+ stop_by_pid_file(radvd_pid_file)
+
+def radvd_check_config(config_file):
+ if not shutil.which('radvd'):
+ print('radvd is not installed, assuming the config check failed')
+ return False
+
+ # Note: can't use networkd_ci_temp_dir here, as this command may run before that dir is
+ # set up (one instance is @unittest.skipX())
+ config_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'conf/radvd', config_file)
+ return call(f'radvd --config={config_file_path} --configtest') == 0
+
def networkd_invocation_id():
return check_output('systemctl show --value -p InvocationID systemd-networkd.service')
print()
def tear_down_common():
- # 1. stop DHCP servers
+ # 1. stop DHCP/RA servers
stop_dnsmasq()
stop_isc_dhcpd()
+ stop_radvd()
# 2. remove modules
call_quiet('rmmod netdevsim')
save_timezone()
create_service_dropin('systemd-networkd', networkd_bin,
- f'{networkctl_bin} reload',
['[Service]', 'Restart=no', '[Unit]', 'StartLimitIntervalSec=0'])
create_service_dropin('systemd-resolved', resolved_bin)
create_service_dropin('systemd-timesyncd', timesyncd_bin)
[
'[Service]',
'ExecStart=',
- f'ExecStart=!!{udevd_bin}',
- 'ExecReload=',
- f'ExecReload={udevadm_bin} control --reload --timeout 0',
+ f'ExecStart=!!@{udevd_bin} systemd-udevd',
]
)
create_unit_dropin(
print(output)
self.assertRegex(output, f'interface:{interface},address:{address},label:"{label}"')
+ def setup_nftset(self, filter_name, filter_type, flags=''):
+ if not shutil.which('nft'):
+ print('## Setting up NFT sets skipped: nft command not found.')
+ else:
+ if call(f'nft add table inet sd_test') != 0:
+ print('## Setting up NFT table failed.')
+ self.fail()
+ if call(f'nft add set inet sd_test {filter_name} {{ type {filter_type}; {flags} }}') != 0:
+ print('## Setting up NFT sets failed.')
+ self.fail()
+
+ def teardown_nftset(self, *filters):
+ if not shutil.which('nft'):
+ print('## Tearing down NFT sets skipped: nft command not found.')
+ else:
+ for filter_name in filters:
+ if call(f'nft delete set inet sd_test {filter_name}') != 0:
+ print('## Tearing down NFT sets failed.')
+ self.fail()
+ if call(f'nft delete table inet sd_test') != 0:
+ print('## Tearing down NFT table failed.')
+ self.fail()
+
+ def check_nftset(self, filter_name, contents):
+ if not shutil.which('nft'):
+ print('## Checking NFT sets skipped: nft command not found.')
+ else:
+ output = check_output(f'nft list set inet sd_test {filter_name}')
+ print(output)
+ self.assertRegex(output, r'.*elements = { [^}]*' + contents + r'[^}]* }.*')
+
class NetworkctlTests(unittest.TestCase, Utilities):
def setUp(self):
self.assertIn('inet 10.1.2.4/16 brd 10.1.255.255 scope global secondary dummy98', output)
self.assertIn('inet 10.2.2.4/16 brd 10.2.255.255 scope global dummy98', output)
+ def test_renew(self):
+ def check():
+ self.wait_online(['veth99:routable', 'veth-peer:routable'])
+ output = check_output(*networkctl_cmd, '-n', '0', 'status', 'veth99', env=env)
+ print(output)
+ self.assertRegex(output, r'Address: 192.168.5.[0-9]* \(DHCP4 via 192.168.5.1\)')
+ self.assertIn('Gateway: 192.168.5.3', output)
+ self.assertRegex(output, 'DNS: 192.168.5.1\n *192.168.5.10')
+ self.assertRegex(output, 'NTP: 192.168.5.1\n *192.168.5.11')
+
+ copy_network_unit('25-veth.netdev', '25-dhcp-client.network', '25-dhcp-server.network')
+ start_networkd()
+ check()
+ output = check_output(*networkctl_cmd, '--lines=0', '--stats', '--all', '--full', '--json=short', 'status')
+ check_json(output)
+
+ for verb in ['renew', 'forcerenew']:
+ call_check(*networkctl_cmd, verb, 'veth99')
+ check()
+ call_check(*networkctl_cmd, verb, 'veth99', 'veth99', 'veth99')
+ check()
+
+ def test_up_down(self):
+ copy_network_unit('25-address-static.network', '12-dummy.netdev')
+ start_networkd()
+ self.wait_online(['dummy98:routable'])
+
+ call_check(*networkctl_cmd, 'down', 'dummy98')
+ self.wait_online(['dummy98:off'])
+ call_check(*networkctl_cmd, 'up', 'dummy98')
+ self.wait_online(['dummy98:routable'])
+ call_check(*networkctl_cmd, 'down', 'dummy98', 'dummy98', 'dummy98')
+ self.wait_online(['dummy98:off'])
+ call_check(*networkctl_cmd, 'up', 'dummy98', 'dummy98', 'dummy98')
+ self.wait_online(['dummy98:routable'])
+
def test_reload(self):
start_networkd()
self.check_link_exists('veth99', expected=False)
self.check_link_exists('veth-peer', expected=False)
+ def test_label(self):
+ call_check(*networkctl_cmd, 'label')
+
class NetworkdMatchTests(unittest.TestCase, Utilities):
def setUp(self):
def tearDown(self):
tear_down_common()
- def test_address_static(self):
- # test for #22515. The address will be removed and replaced with /64 prefix.
- check_output('ip link add dummy98 type dummy')
- check_output('ip link set dev dummy98 up')
- check_output('ip -6 address add 2001:db8:0:f101::15/128 dev dummy98')
- self.wait_address('dummy98', '2001:db8:0:f101::15/128', ipv='-6')
- check_output('ip -4 address add 10.3.2.3/16 brd 10.3.255.250 scope global label dummy98:hoge dev dummy98')
- self.wait_address('dummy98', '10.3.2.3/16 brd 10.3.255.250', ipv='-4')
-
- copy_network_unit('25-address-static.network', '12-dummy.netdev')
- start_networkd()
-
- self.wait_online(['dummy98:routable'])
-
- output = check_output('ip -4 address show dev dummy98')
+ def verify_address_static(
+ self,
+ label1: str,
+ label2: str,
+ label3: str,
+ broadcast1: str,
+ broadcast2: str,
+ broadcast3: str,
+ peer1: str,
+ peer2: str,
+ peer3: str,
+ peer4: str,
+ peer5: str,
+ peer6: str,
+ scope1: str,
+ scope2: str,
+ deprecated1: str,
+ deprecated2: str,
+ deprecated3: str,
+ deprecated4: str,
+ route_metric: int,
+ flag1: str,
+ flag2: str,
+ flag3: str,
+ flag4: str,
+ ip4_null_16: str,
+ ip4_null_24: str,
+ ip6_null_73: str,
+ ip6_null_74: str,
+ ):
+ output = check_output('ip address show dev dummy98')
print(output)
+
+ # simple settings
self.assertIn('inet 10.1.2.3/16 brd 10.1.255.255 scope global dummy98', output)
self.assertIn('inet 10.1.2.4/16 brd 10.1.255.255 scope global secondary dummy98', output)
self.assertIn('inet 10.2.2.4/16 brd 10.2.255.255 scope global dummy98', output)
- self.assertIn('inet 10.7.8.9/16 brd 10.7.255.255 scope link deprecated dummy98', output)
- self.assertIn('inet 10.8.8.1/16 scope global dummy98', output)
- self.assertIn('inet 10.8.8.2/16 brd 10.8.8.128 scope global secondary dummy98', output)
- self.assertRegex(output, 'inet 10.9.0.1/16 (metric 128 |)brd 10.9.255.255 scope global dummy98')
+ self.assertIn('inet6 2001:db8:0:f101::15/64 scope global', output)
+ self.assertIn('inet6 2001:db8:0:f101::16/64 scope global', output)
+ self.assertIn('inet6 2001:db8:0:f102::15/64 scope global', output)
- # test for ENOBUFS issue #17012
- for i in range(1, 254):
- self.assertIn(f'inet 10.3.3.{i}/16 brd 10.3.255.255', output)
+ # label
+ self.assertIn(f'inet 10.3.1.1/24 brd 10.3.1.255 scope global {label1}', output)
+ self.assertIn(f'inet 10.3.2.1/24 brd 10.3.2.255 scope global {label2}', output)
+ self.assertIn(f'inet 10.3.3.1/24 brd 10.3.3.255 scope global {label3}', output)
+
+ # broadcast
+ self.assertIn(f'inet 10.4.1.1/24{broadcast1} scope global dummy98', output)
+ self.assertIn(f'inet 10.4.2.1/24{broadcast2} scope global dummy98', output)
+ self.assertIn(f'inet 10.4.3.1/24{broadcast3} scope global dummy98', output)
+
+ # peer
+ self.assertIn(f'inet 10.5.1.1{peer1} scope global dummy98', output)
+ self.assertIn(f'inet 10.5.2.1{peer2} scope global dummy98', output)
+ self.assertIn(f'inet 10.5.3.1{peer3} scope global dummy98', output)
+ self.assertIn(f'inet6 2001:db8:0:f103::1{peer4} scope global', output)
+ self.assertIn(f'inet6 2001:db8:0:f103::2{peer5} scope global', output)
+ self.assertIn(f'inet6 2001:db8:0:f103::3{peer6} scope global', output)
+
+ # scope
+ self.assertIn(f'inet 10.6.1.1/24 brd 10.6.1.255 scope {scope1} dummy98', output)
+ self.assertIn(f'inet 10.6.2.1/24 brd 10.6.2.255 scope {scope2} dummy98', output)
+
+ # lifetime
+ self.assertIn(f'inet 10.7.1.1/24 brd 10.7.1.255 scope global{deprecated1} dummy98', output)
+ self.assertIn(f'inet 10.7.2.1/24 brd 10.7.2.255 scope global{deprecated2} dummy98', output)
+ self.assertIn(f'inet6 2001:db8:0:f104::1/64 scope global{deprecated3}', output)
+ self.assertIn(f'inet6 2001:db8:0:f104::2/64 scope global{deprecated4}', output)
+
+ # route metric
+ self.assertRegex(output, rf'inet 10.8.1.1/24 (metric {route_metric} |)brd 10.8.1.255 scope global dummy98')
+ self.assertRegex(output, rf'inet6 2001:db8:0:f105::1/64 (metric {route_metric} |)scope global')
+
+ output_route = check_output('ip -4 route show dev dummy98 10.8.1.0/24')
+ print(output_route)
+ self.assertIn(f'10.8.1.0/24 proto kernel scope link src 10.8.1.1 metric {route_metric}', output_route)
+
+ output_route = check_output('ip -6 route show dev dummy98 2001:db8:0:f105::/64')
+ print(output_route)
+ self.assertIn(f'2001:db8:0:f105::/64 proto kernel metric {route_metric}', output_route)
+
+ # flags
+ self.assertIn(f'inet 10.9.1.1/24 brd 10.9.1.255 scope global{flag1} dummy98', output)
+ self.assertIn(f'inet 10.9.2.1/24 brd 10.9.2.255 scope global{flag2} dummy98', output)
+ self.assertIn(f'inet6 2001:db8:0:f106::1/64 scope global{flag3}', output)
+ self.assertIn(f'inet6 2001:db8:0:f106::2/64 scope global{flag4}', output)
+
+ # null address
+ self.assertTrue(ip4_null_16.endswith('.0.1'))
+ prefix16 = ip4_null_16[:-len('.0.1')]
+ self.assertTrue(ip4_null_24.endswith('.1'))
+ prefix24 = ip4_null_24[:-len('.1')]
+ self.assertIn(f'inet {ip4_null_16}/16 brd {prefix16}.255.255 scope global subnet16', output)
+ self.assertIn(f'inet {ip4_null_24}/24 brd {prefix24}.255 scope global subnet24', output)
+ self.assertIn(f'inet6 {ip6_null_73}/73 scope global', output)
+ self.assertIn(f'inet6 {ip6_null_74}/74 scope global', output)
# invalid sections
- self.assertNotIn('10.10.0.1/16', output)
- self.assertNotIn('10.10.0.2/16', output)
-
- output = check_output('ip -4 address show dev dummy98 label 32')
- self.assertIn('inet 10.3.2.3/16 brd 10.3.255.255 scope global 32', output)
+ self.assertNotIn('10.4.4.1', output)
+ self.assertNotIn('10.5.4.1', output)
+ self.assertNotIn('10.5.5.1', output)
+ self.assertNotIn('10.8.2.1', output)
+ self.assertNotIn('10.9.3.1', output)
+ self.assertNotIn('2001:db8:0:f101::2', output)
+ self.assertNotIn('2001:db8:0:f103::4', output)
- output = check_output('ip -4 address show dev dummy98 label 33')
- self.assertIn('inet 10.4.2.3 peer 10.4.2.4/16 scope global 33', output)
+ # netlabel
+ self.check_netlabel('dummy98', r'10\.10\.1\.0/24')
- output = check_output('ip -4 address show dev dummy98 label 34')
- self.assertRegex(output, r'inet 192.168.[0-9]*.1/24 brd 192.168.[0-9]*.255 scope global 34')
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
- output = check_output('ip -4 address show dev dummy98 label 35')
- self.assertRegex(output, r'inet 172.[0-9]*.0.1/16 brd 172.[0-9]*.255.255 scope global 35')
+ def test_address_static(self):
+ copy_network_unit('25-address-static.network', '12-dummy.netdev', copy_dropins=False)
+ start_networkd()
+ self.setup_nftset('addr4', 'ipv4_addr')
+ self.setup_nftset('network4', 'ipv4_addr', 'flags interval;')
+ self.setup_nftset('ifindex', 'iface_index')
- output = check_output('ip -4 route show dev dummy98')
- print(output)
- self.assertIn('10.9.0.0/16 proto kernel scope link src 10.9.0.1 metric 128', output)
+ self.wait_online(['dummy98:routable'])
- output = check_output('ip -6 address show dev dummy98')
- print(output)
- self.assertIn('inet6 2001:db8:0:f101::15/64 scope global', output)
- self.assertIn('inet6 2001:db8:0:f101::16/64 scope global', output)
- self.assertIn('inet6 2001:db8:0:f102::15/64 scope global', output)
- self.assertIn('inet6 2001:db8:0:f102::16/64 scope global', output)
- self.assertIn('inet6 2001:db8:0:f103::20 peer 2001:db8:0:f103::10/128 scope global', output)
- self.assertIn('inet6 2001:db8:1:f101::1/64 scope global deprecated', output)
- self.assertRegex(output, r'inet6 fd[0-9a-f:]*1/64 scope global')
+ ip4_null_16 = None
+ ip4_null_24 = None
+ output = check_output('ip -4 --json address show dev dummy98')
+ for i in json.loads(output)[0]['addr_info']:
+ if i['label'] == 'subnet16':
+ ip4_null_16 = i['local']
+ elif i['label'] == 'subnet24':
+ ip4_null_24 = i['local']
+ self.assertTrue(ip4_null_16.endswith('.0.1'))
+ self.assertTrue(ip4_null_24.endswith('.1'))
+
+ ip6_null_73 = None
+ ip6_null_74 = None
+ output = check_output('ip -6 --json address show dev dummy98')
+ for i in json.loads(output)[0]['addr_info']:
+ if i['prefixlen'] == 73:
+ ip6_null_73 = i['local']
+ elif i['prefixlen'] == 74:
+ ip6_null_74 = i['local']
+ self.assertTrue(ip6_null_73.endswith(':1'))
+ self.assertTrue(ip6_null_74.endswith(':1'))
+
+ self.verify_address_static(
+ label1='label1',
+ label2='label2',
+ label3='dummy98',
+ broadcast1='',
+ broadcast2=' brd 10.4.2.255',
+ broadcast3=' brd 10.4.3.63',
+ peer1=' peer 10.5.1.101/24',
+ peer2=' peer 10.5.2.101/24',
+ peer3='/24 brd 10.5.3.255',
+ peer4=' peer 2001:db8:0:f103::101/128',
+ peer5=' peer 2001:db8:0:f103::102/128',
+ peer6='/128',
+ scope1='global',
+ scope2='link',
+ deprecated1='',
+ deprecated2=' deprecated',
+ deprecated3='',
+ deprecated4=' deprecated',
+ route_metric=128,
+ flag1=' noprefixroute',
+ flag2='',
+ flag3=' noprefixroute',
+ flag4=' home mngtmpaddr',
+ ip4_null_16=ip4_null_16,
+ ip4_null_24=ip4_null_24,
+ ip6_null_73=ip6_null_73,
+ ip6_null_74=ip6_null_74,
+ )
+ # nft set
+ self.check_nftset('addr4', r'10\.10\.1\.1')
+ self.check_nftset('network4', r'10\.10\.1\.0/24')
+ self.check_nftset('ifindex', 'dummy98')
+
+ self.teardown_nftset('addr4', 'network4', 'ifindex')
+
+ copy_network_unit('25-address-static.network.d/10-override.conf')
+ networkctl_reload()
+ self.wait_online(['dummy98:routable'])
+ self.verify_address_static(
+ label1='new-label1',
+ label2='dummy98',
+ label3='new-label3',
+ broadcast1=' brd 10.4.1.255',
+ broadcast2='',
+ broadcast3=' brd 10.4.3.31',
+ peer1=' peer 10.5.1.102/24',
+ peer2='/24 brd 10.5.2.255',
+ peer3=' peer 10.5.3.102/24',
+ peer4=' peer 2001:db8:0:f103::201/128',
+ peer5='/128',
+ peer6=' peer 2001:db8:0:f103::203/128',
+ scope1='link',
+ scope2='global',
+ deprecated1=' deprecated',
+ deprecated2='',
+ deprecated3=' deprecated',
+ deprecated4='',
+ route_metric=256,
+ flag1='',
+ flag2=' noprefixroute',
+ flag3=' home mngtmpaddr',
+ flag4=' noprefixroute',
+ ip4_null_16=ip4_null_16,
+ ip4_null_24=ip4_null_24,
+ ip6_null_73=ip6_null_73,
+ ip6_null_74=ip6_null_74,
+ )
- self.check_netlabel('dummy98', r'10\.4\.3\.0/24')
+ networkctl_reconfigure('dummy98')
+ self.wait_online(['dummy98:routable'])
+ self.verify_address_static(
+ label1='new-label1',
+ label2='dummy98',
+ label3='new-label3',
+ broadcast1=' brd 10.4.1.255',
+ broadcast2='',
+ broadcast3=' brd 10.4.3.31',
+ peer1=' peer 10.5.1.102/24',
+ peer2='/24 brd 10.5.2.255',
+ peer3=' peer 10.5.3.102/24',
+ peer4=' peer 2001:db8:0:f103::201/128',
+ peer5='/128',
+ peer6=' peer 2001:db8:0:f103::203/128',
+ scope1='link',
+ scope2='global',
+ deprecated1=' deprecated',
+ deprecated2='',
+ deprecated3=' deprecated',
+ deprecated4='',
+ route_metric=256,
+ flag1='',
+ flag2=' noprefixroute',
+ flag3=' home mngtmpaddr',
+ flag4=' noprefixroute',
+ ip4_null_16=ip4_null_16,
+ ip4_null_24=ip4_null_24,
+ ip6_null_73=ip6_null_73,
+ ip6_null_74=ip6_null_74,
+ )
# Tests for #20891.
# 1. set preferred lifetime forever to drop the deprecated flag for testing #20891.
- check_output('ip address change 10.7.8.9/16 dev dummy98 preferred_lft forever')
- check_output('ip address change 2001:db8:1:f101::1/64 dev dummy98 preferred_lft forever')
- output = check_output('ip -4 address show dev dummy98')
- print(output)
- self.assertNotIn('deprecated', output)
- output = check_output('ip -6 address show dev dummy98')
+ check_output('ip address change 10.7.1.1/24 dev dummy98 preferred_lft forever')
+ check_output('ip address change 2001:db8:0:f104::1/64 dev dummy98 preferred_lft forever')
+ output = check_output('ip address show dev dummy98')
print(output)
- self.assertNotIn('deprecated', output)
+ self.assertNotRegex(output, '10.7.1.1/24 .* deprecated')
+ self.assertNotRegex(output, '2001:db8:0:f104::1/64 .* deprecated')
- # 2. reconfigure the interface.
+ # 2. reconfigure the interface, and check the deprecated flag is set again
networkctl_reconfigure('dummy98')
self.wait_online(['dummy98:routable'])
-
- # 3. check the deprecated flag is set for the address configured with PreferredLifetime=0
+ self.verify_address_static(
+ label1='new-label1',
+ label2='dummy98',
+ label3='new-label3',
+ broadcast1=' brd 10.4.1.255',
+ broadcast2='',
+ broadcast3=' brd 10.4.3.31',
+ peer1=' peer 10.5.1.102/24',
+ peer2='/24 brd 10.5.2.255',
+ peer3=' peer 10.5.3.102/24',
+ peer4=' peer 2001:db8:0:f103::201/128',
+ peer5='/128',
+ peer6=' peer 2001:db8:0:f103::203/128',
+ scope1='link',
+ scope2='global',
+ deprecated1=' deprecated',
+ deprecated2='',
+ deprecated3=' deprecated',
+ deprecated4='',
+ route_metric=256,
+ flag1='',
+ flag2=' noprefixroute',
+ flag3=' home mngtmpaddr',
+ flag4=' noprefixroute',
+ ip4_null_16=ip4_null_16,
+ ip4_null_24=ip4_null_24,
+ ip6_null_73=ip6_null_73,
+ ip6_null_74=ip6_null_74,
+ )
+
+ # test for ENOBUFS issue #17012 (with reload)
+ copy_network_unit('25-address-static.network.d/10-many-address.conf')
+ networkctl_reload()
+ self.wait_online(['dummy98:routable'])
output = check_output('ip -4 address show dev dummy98')
- print(output)
- self.assertIn('inet 10.7.8.9/16 brd 10.7.255.255 scope link deprecated dummy98', output)
- output = check_output('ip -6 address show dev dummy98')
- print(output)
- self.assertIn('inet6 2001:db8:1:f101::1/64 scope global deprecated', output)
+ for i in range(1, 254):
+ self.assertIn(f'inet 10.3.3.{i}/16 brd 10.3.255.255', output)
- # test for ENOBUFS issue #17012
+ # (with reconfigure)
+ networkctl_reconfigure('dummy98')
+ self.wait_online(['dummy98:routable'])
output = check_output('ip -4 address show dev dummy98')
for i in range(1, 254):
self.assertIn(f'inet 10.3.3.{i}/16 brd 10.3.255.255', output)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
-
def test_address_ipv4acd(self):
check_output('ip netns add ns99')
check_output('ip link add veth99 type veth peer veth-peer')
self.assertRegex(output, 'iif test1')
self.assertRegex(output, 'lookup 10')
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
def test_routing_policy_rule_issue_11280(self):
copy_network_unit('25-routing-policy-rule-test1.network', '11-dummy.netdev',
self.assertIn('192.168.1.1 proto static scope link initcwnd 20', output)
self.assertIn('192.168.1.2 proto static scope link initrwnd 30', output)
self.assertIn('192.168.1.3 proto static scope link advmss 30', output)
+ self.assertIn('192.168.1.4 proto static scope link hoplimit 122', output)
self.assertIn('multicast 149.10.123.4 proto static', output)
print('### ip -4 route show dev dummy98 default')
self.assertIn('via 2001:1234:5:8fff:ff:ff:ff:ff dev dummy98', output)
self.assertIn('via 2001:1234:5:9fff:ff:ff:ff:ff dev dummy98', output)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
copy_network_unit('25-address-static.network')
networkctl_reload()
print(output)
self.assertIn('149.10.124.66 proto static', output)
self.assertIn('congctl dctcp', output)
+ self.assertIn('rto_min 300s', output)
@expectedFailureIfModuleIsNotAvailable('vrf')
def test_route_vrf(self):
output = check_output('ip -6 route list dev bond199')
print(output)
- self.assertRegex(output, 'abcd::/16')
- self.assertRegex(output, 'src')
- self.assertRegex(output, '2001:1234:56:8f63::2')
+ self.assertIn('abcd::/16 via 2001:1234:56:8f63::1:1 proto static src 2001:1234:56:8f63::2', output)
+
+ def test_route_preferred_source_with_existing_address(self):
+ # See issue #28009.
+ copy_network_unit('25-route-preferred-source.network', '12-dummy.netdev')
+ start_networkd()
+
+ for i in range(3):
+ if i != 0:
+ networkctl_reconfigure('dummy98')
+
+ self.wait_online(['dummy98:routable'])
+
+ output = check_output('ip -6 route list dev dummy98')
+ print(output)
+ self.assertIn('abcd::/16 via 2001:1234:56:8f63::1:1 proto static src 2001:1234:56:8f63::1', output)
def test_ip_link_mac_address(self):
copy_network_unit('25-address-link-section.network', '12-dummy.netdev')
self.assertRegex(output, f'2607:5300:203:5215:{i}::1 *proxy')
def test_neighbor_section(self):
- copy_network_unit('25-neighbor-section.network', '12-dummy.netdev')
+ copy_network_unit('25-neighbor-section.network', '12-dummy.netdev', copy_dropins=False)
start_networkd()
- self.wait_online(['dummy98:degraded'], timeout='40s')
+ self.wait_online(['dummy98:degraded'])
print('### ip neigh list dev dummy98')
output = check_output('ip neigh list dev dummy98')
print(output)
- self.assertRegex(output, '192.168.10.1.*00:00:5e:00:02:65.*PERMANENT')
- self.assertRegex(output, '2004:da8:1::1.*00:00:5e:00:02:66.*PERMANENT')
+ self.assertIn('192.168.10.1 lladdr 00:00:5e:00:02:65 PERMANENT', output)
+ self.assertIn('2004:da8:1::1 lladdr 00:00:5e:00:02:66 PERMANENT', output)
+ self.assertNotIn('2004:da8:1:0::2', output)
+ self.assertNotIn('192.168.10.2', output)
+ self.assertNotIn('00:00:5e:00:02:67', output)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
+
+ copy_network_unit('25-neighbor-section.network.d/override.conf')
+ networkctl_reload()
+ self.wait_online(['dummy98:degraded'])
+
+ print('### ip neigh list dev dummy98 (after reloading)')
+ output = check_output('ip neigh list dev dummy98')
+ print(output)
+ self.assertIn('192.168.10.1 lladdr 00:00:5e:00:03:65 PERMANENT', output)
+ self.assertIn('2004:da8:1::1 lladdr 00:00:5e:00:03:66 PERMANENT', output)
+ self.assertNotIn('2004:da8:1:0::2', output)
+ self.assertNotIn('192.168.10.2', output)
+ self.assertNotIn('00:00:5e:00:02', output)
def test_neighbor_reconfigure(self):
- copy_network_unit('25-neighbor-section.network', '12-dummy.netdev')
+ copy_network_unit('25-neighbor-section.network', '12-dummy.netdev', copy_dropins=False)
start_networkd()
- self.wait_online(['dummy98:degraded'], timeout='40s')
+ self.wait_online(['dummy98:degraded'])
print('### ip neigh list dev dummy98')
output = check_output('ip neigh list dev dummy98')
print(output)
- self.assertRegex(output, '192.168.10.1.*00:00:5e:00:02:65.*PERMANENT')
- self.assertRegex(output, '2004:da8:1::1.*00:00:5e:00:02:66.*PERMANENT')
+ self.assertIn('192.168.10.1 lladdr 00:00:5e:00:02:65 PERMANENT', output)
+ self.assertIn('2004:da8:1::1 lladdr 00:00:5e:00:02:66 PERMANENT', output)
remove_network_unit('25-neighbor-section.network')
copy_network_unit('25-neighbor-next.network')
networkctl_reload()
- self.wait_online(['dummy98:degraded'], timeout='40s')
+ self.wait_online(['dummy98:degraded'])
print('### ip neigh list dev dummy98')
output = check_output('ip neigh list dev dummy98')
print(output)
- self.assertNotRegex(output, '192.168.10.1.*00:00:5e:00:02:65.*PERMANENT')
- self.assertRegex(output, '192.168.10.1.*00:00:5e:00:02:66.*PERMANENT')
- self.assertNotRegex(output, '2004:da8:1::1.*PERMANENT')
+ self.assertNotIn('00:00:5e:00:02:65', output)
+ self.assertIn('192.168.10.1 lladdr 00:00:5e:00:02:66 PERMANENT', output)
+ self.assertNotIn('2004:da8:1::1', output)
def test_neighbor_gre(self):
copy_network_unit('25-neighbor-ip.network', '25-neighbor-ipv6.network', '25-neighbor-ip-dummy.network',
output = check_output('ip neigh list dev gretun97')
print(output)
- self.assertRegex(output, '10.0.0.22 lladdr 10.65.223.239 PERMANENT')
+ self.assertIn('10.0.0.22 lladdr 10.65.223.239 PERMANENT', output)
+ self.assertNotIn('10.0.0.23', output)
output = check_output('ip neigh list dev ip6gretun97')
print(output)
self.assertRegex(output, '2001:db8:0:f102::17 lladdr 2a:?00:ff:?de:45:?67:ed:?de:[0:]*:49:?88 PERMANENT')
+ self.assertNotIn('2001:db8:0:f102::18', output)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
def test_link_local_addressing(self):
copy_network_unit('25-link-local-addressing-yes.network', '11-dummy.netdev',
self.check_ipv4_sysctl_attr('dummy98', 'forwarding', '1')
self.check_ipv4_sysctl_attr('dummy98', 'proxy_arp', '1')
self.check_ipv4_sysctl_attr('dummy98', 'accept_local', '1')
+ self.check_ipv4_sysctl_attr('dummy98', 'rp_filter', '0')
copy_network_unit('25-sysctl.network.d/25-ipv6-privacy-extensions.conf')
networkctl_reload()
self.assertIn('nexthop via 192.168.20.1 dev dummy98 weight 1', output)
self.assertIn('nexthop via 192.168.5.1 dev veth99 weight 3', output)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
copy_network_unit('25-nexthop.network', '25-veth.netdev', '25-veth-peer.network',
'12-dummy.netdev', '25-nexthop-dummy.network')
# make link state file updated
check_output(*resolvectl_cmd, 'revert', 'dummy98', env=env)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
output = read_link_state_file('dummy98')
print(output)
check_output(*resolvectl_cmd, 'dnssec', 'dummy98', 'yes', env=env)
check_output(*timedatectl_cmd, 'ntp-servers', 'dummy98', '2.fedora.pool.ntp.org', '3.fedora.pool.ntp.org', env=env)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
output = read_link_state_file('dummy98')
print(output)
check_output(*timedatectl_cmd, 'revert', 'dummy98', env=env)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
output = read_link_state_file('dummy98')
print(output)
check_output(*resolvectl_cmd, 'revert', 'dummy98', env=env)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
output = read_link_state_file('dummy98')
print(output)
def test_ipv6_prefix_delegation(self):
copy_network_unit('25-veth.netdev', '25-ipv6-prefix.network', '25-ipv6-prefix-veth.network')
+ self.setup_nftset('addr6', 'ipv6_addr')
+ self.setup_nftset('network6', 'ipv6_addr', 'flags interval;')
+ self.setup_nftset('ifindex', 'iface_index')
start_networkd()
self.wait_online(['veth99:routable', 'veth-peer:degraded'])
self.check_netlabel('veth99', '2002:da8:1::/64')
self.check_netlabel('veth99', '2002:da8:2::/64')
+ self.check_nftset('addr6', '2002:da8:1:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*')
+ self.check_nftset('addr6', '2002:da8:2:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*')
+ self.check_nftset('network6', '2002:da8:1::/64')
+ self.check_nftset('network6', '2002:da8:2::/64')
+ self.check_nftset('ifindex', 'veth99')
+
+ self.teardown_nftset('addr6', 'network6', 'ifindex')
+
def test_ipv6_token_static(self):
copy_network_unit('25-veth.netdev', '25-ipv6-prefix.network', '25-ipv6-prefix-veth-token-static.network')
start_networkd()
print(output)
self.assertIn('pref low', output)
+ @unittest.skipUnless(radvd_check_config('captive-portal.conf'), "Installed radvd doesn't support captive portals")
+ def test_captive_portal(self):
+ copy_network_unit('25-veth-client.netdev',
+ '25-veth-router-captive.netdev',
+ '26-bridge.netdev',
+ '25-veth-client-captive.network',
+ '25-veth-router-captive.network',
+ '25-veth-bridge-captive.network',
+ '25-bridge99.network')
+ start_networkd()
+ self.wait_online(['bridge99:routable', 'client-p:enslaved',
+ 'router-captive:degraded', 'router-captivep:enslaved'])
+
+ start_radvd(config_file='captive-portal.conf')
+ networkctl_reconfigure('client')
+ self.wait_online(['client:routable'])
+
+ self.wait_address('client', '2002:da8:1:99:1034:56ff:fe78:9a00/64', ipv='-6', timeout_sec=10)
+ output = check_output(*networkctl_cmd, 'status', 'client', env=env)
+ print(output)
+ self.assertIn('Captive Portal: http://systemd.io', output)
+
+ @unittest.skipUnless(radvd_check_config('captive-portal.conf'), "Installed radvd doesn't support captive portals")
+ def test_invalid_captive_portal(self):
+ def radvd_write_config(captive_portal_uri):
+ with open(os.path.join(networkd_ci_temp_dir, 'radvd/bogus-captive-portal.conf'), mode='w', encoding='utf-8') as f:
+ f.write(f'interface router-captive {{ AdvSendAdvert on; AdvCaptivePortalAPI "{captive_portal_uri}"; prefix 2002:da8:1:99::/64 {{ AdvOnLink on; AdvAutonomous on; }}; }};')
+
+ captive_portal_uris = [
+ "42ěščěškd ěšč ě s",
+ " ",
+ "🤔",
+ ]
+
+ copy_network_unit('25-veth-client.netdev',
+ '25-veth-router-captive.netdev',
+ '26-bridge.netdev',
+ '25-veth-client-captive.network',
+ '25-veth-router-captive.network',
+ '25-veth-bridge-captive.network',
+ '25-bridge99.network')
+ start_networkd()
+ self.wait_online(['bridge99:routable', 'client-p:enslaved',
+ 'router-captive:degraded', 'router-captivep:enslaved'])
+
+ for uri in captive_portal_uris:
+ print(f"Captive portal: {uri}")
+ radvd_write_config(uri)
+ stop_radvd()
+ start_radvd(config_file='bogus-captive-portal.conf')
+ networkctl_reconfigure('client')
+ self.wait_online(['client:routable'])
+
+ self.wait_address('client', '2002:da8:1:99:1034:56ff:fe78:9a00/64', ipv='-6', timeout_sec=10)
+ output = check_output(*networkctl_cmd, 'status', 'client', env=env)
+ print(output)
+ self.assertNotIn('Captive Portal:', output)
+
class NetworkdDHCPServerTests(unittest.TestCase, Utilities):
def setUp(self):
self.assertRegex(output, 'DNS: 192.168.5.1\n *192.168.5.10')
self.assertRegex(output, 'NTP: 192.168.5.1\n *192.168.5.11')
+ output = check_output(*networkctl_cmd, '-n', '0', 'status', 'veth-peer', env=env)
+ self.assertRegex(output, "Offered DHCP leases: 192.168.5.[0-9]*")
+
+ def test_dhcp_server_null_server_address(self):
+ copy_network_unit('25-veth.netdev', '25-dhcp-client.network', '25-dhcp-server-null-server-address.network')
+ start_networkd()
+ self.wait_online(['veth99:routable', 'veth-peer:routable'])
+
+ output = check_output('ip --json address show dev veth-peer')
+ server_address = json.loads(output)[0]['addr_info'][0]['local']
+ print(server_address)
+
+ output = check_output('ip --json address show dev veth99')
+ client_address = json.loads(output)[0]['addr_info'][0]['local']
+ print(client_address)
+
+ output = check_output(*networkctl_cmd, '-n', '0', 'status', 'veth99', env=env)
+ print(output)
+ self.assertRegex(output, rf'Address: {client_address} \(DHCP4 via {server_address}\)')
+ self.assertIn(f'Gateway: {server_address}', output)
+ self.assertIn(f'DNS: {server_address}', output)
+ self.assertIn(f'NTP: {server_address}', output)
+
+ output = check_output(*networkctl_cmd, '-n', '0', 'status', 'veth-peer', env=env)
+ self.assertIn(f'Offered DHCP leases: {client_address}', output)
+
def test_dhcp_server_with_uplink(self):
copy_network_unit('25-veth.netdev', '25-dhcp-client.network', '25-dhcp-server-downstream.network',
'12-dummy.netdev', '25-dhcp-server-uplink.network')
self.assertIn('DHCPREPLY(veth-peer)', output)
self.assertNotIn('rapid-commit', output)
+ def test_dhcp_client_ipv6_dbus_status(self):
+ def get_dbus_dhcp6_client_state(IF):
+ out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
+ '/org/freedesktop/network1', 'org.freedesktop.network1.Manager',
+ 'GetLinkByName', 's', IF])
+
+ assert out.startswith(b'io ')
+ out = out.strip()
+ assert out.endswith(b'"')
+ out = out.decode()
+ linkPath = out[:-1].split('"')[1]
+
+ print(f"Found {IF} link path: {linkPath}")
+
+ out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.network1',
+ linkPath, 'org.freedesktop.network1.DHCPv6Client', 'State'])
+ assert out.startswith(b's "')
+ out = out.strip()
+ assert out.endswith(b'"')
+ return out[3:-1].decode()
+
+ copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client-ipv6-only.network')
+
+ start_networkd()
+ self.wait_online(['veth-peer:carrier'])
+
+ # Note that at this point the DHCPv6 client has not been started because no RA (with managed
+ # bit set) has yet been recieved and the configuration does not include WithoutRA=true
+ state = get_dbus_dhcp6_client_state('veth99')
+ print(f"State = {state}")
+ self.assertEqual(state, 'stopped')
+
+ start_dnsmasq()
+ self.wait_online(['veth99:routable', 'veth-peer:routable'])
+
+ state = get_dbus_dhcp6_client_state('veth99')
+ print(f"State = {state}")
+ self.assertEqual(state, 'bound')
+
+ def test_dhcp_client_ipv6_only_with_custom_client_identifier(self):
+ copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client-ipv6-only-custom-client-identifier.network')
+
+ start_networkd()
+ self.wait_online(['veth-peer:carrier'])
+ start_dnsmasq()
+ self.wait_online(['veth99:routable', 'veth-peer:routable'])
+
+ # checking address
+ output = check_output('ip address show dev veth99 scope global')
+ print(output)
+ self.assertRegex(output, r'inet6 2600::[0-9a-f:]*/128 scope global dynamic noprefixroute')
+ self.assertNotIn('192.168.5', output)
+
+ print('## dnsmasq log')
+ output = read_dnsmasq_log_file()
+ print(output)
+ self.assertIn('DHCPSOLICIT(veth-peer) 00:42:00:00:ab:11:f9:2a:c2:77:29:f9:5c:00', output)
+ self.assertNotIn('DHCPADVERTISE(veth-peer)', output)
+ self.assertNotIn('DHCPREQUEST(veth-peer)', output)
+ self.assertIn('DHCPREPLY(veth-peer)', output)
+ self.assertIn('sent size: 0 option: 14 rapid-commit', output)
+
+ stop_dnsmasq()
+
def test_dhcp_client_ipv4_only(self):
copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client-ipv4-only.network')
+ self.setup_nftset('addr4', 'ipv4_addr')
+ self.setup_nftset('network4', 'ipv4_addr', 'flags interval;')
+ self.setup_nftset('ifindex', 'iface_index')
+
start_networkd()
self.wait_online(['veth-peer:carrier'])
start_dnsmasq('--dhcp-option=option:dns-server,192.168.5.6,192.168.5.7',
self.check_netlabel('veth99', r'192\.168\.5\.0/24')
+ self.check_nftset('addr4', r'192\.168\.5\.1')
+ self.check_nftset('network4', r'192\.168\.5\.0/24')
+ self.check_nftset('ifindex', 'veth99')
+
+ self.teardown_nftset('addr4', 'network4', 'ifindex')
+
+ def test_dhcp_client_ipv4_dbus_status(self):
+ def get_dbus_dhcp4_client_state(IF):
+ out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
+ '/org/freedesktop/network1', 'org.freedesktop.network1.Manager',
+ 'GetLinkByName', 's', IF])
+
+ assert out.startswith(b'io ')
+ out = out.strip()
+ assert out.endswith(b'"')
+ out = out.decode()
+ linkPath = out[:-1].split('"')[1]
+
+ print(f"Found {IF} link path: {linkPath}")
+
+ out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.network1',
+ linkPath, 'org.freedesktop.network1.DHCPv4Client', 'State'])
+ assert out.startswith(b's "')
+ out = out.strip()
+ assert out.endswith(b'"')
+ return out[3:-1].decode()
+
+ copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client-ipv4-only.network')
+
+ start_networkd()
+ self.wait_online(['veth-peer:carrier'])
+
+ state = get_dbus_dhcp4_client_state('veth99')
+ print(f"State = {state}")
+ self.assertEqual(state, 'selecting')
+
+ start_dnsmasq('--dhcp-option=option:dns-server,192.168.5.6,192.168.5.7',
+ '--dhcp-option=option:domain-search,example.com',
+ '--dhcp-alternate-port=67,5555',
+ ipv4_range='192.168.5.110,192.168.5.119')
+ self.wait_online(['veth99:routable', 'veth-peer:routable'])
+ self.wait_address('veth99', r'inet 192.168.5.11[0-9]*/24', ipv='-4')
+
+ state = get_dbus_dhcp4_client_state('veth99')
+ print(f"State = {state}")
+ self.assertEqual(state, 'bound')
+
def test_dhcp_client_ipv4_use_routes_gateway(self):
first = True
for (routes, gateway, dns_and_ntp_routes, classless) in itertools.product([True, False], repeat=4):
additional_options = [
'--dhcp-option=option:dns-server,192.168.5.10,8.8.8.8',
'--dhcp-option=option:ntp-server,192.168.5.11,9.9.9.9',
- '--dhcp-option=option:static-route,192.168.5.100,192.168.5.2,8.8.8.8,192.168.5.3'
+ '--dhcp-option=option:static-route,192.168.6.100,192.168.5.2,8.8.8.8,192.168.5.3'
]
if classless:
additional_options += [
- '--dhcp-option=option:classless-static-route,0.0.0.0/0,192.168.5.4,8.0.0.0/8,192.168.5.5'
+ '--dhcp-option=option:classless-static-route,0.0.0.0/0,192.168.5.4,8.0.0.0/8,192.168.5.5,192.168.5.64/26,192.168.5.5'
]
start_dnsmasq(*additional_options)
self.wait_online(['veth99:routable', 'veth-peer:routable'])
if classless:
self.assertRegex(output, r'default via 192.168.5.4 proto dhcp src 192.168.5.[0-9]* metric 1024')
self.assertRegex(output, r'8.0.0.0/8 via 192.168.5.5 proto dhcp src 192.168.5.[0-9]* metric 1024')
+ self.assertRegex(output, r'192.168.5.64/26 via 192.168.5.5 proto dhcp src 192.168.5.[0-9]* metric 1024')
self.assertRegex(output, r'192.168.5.4 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
self.assertRegex(output, r'192.168.5.5 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
else:
- self.assertRegex(output, r'192.168.5.0/24 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
+ self.assertRegex(output, r'192.168.6.0/24 via 192.168.5.2 proto dhcp src 192.168.5.[0-9]* metric 1024')
self.assertRegex(output, r'8.0.0.0/8 via 192.168.5.3 proto dhcp src 192.168.5.[0-9]* metric 1024')
+ self.assertRegex(output, r'192.168.5.2 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
self.assertRegex(output, r'192.168.5.3 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
else:
self.assertNotRegex(output, r'default via 192.168.5.4 proto dhcp src 192.168.5.[0-9]* metric 1024')
self.assertNotRegex(output, r'8.0.0.0/8 via 192.168.5.5 proto dhcp src 192.168.5.[0-9]* metric 1024')
self.assertNotRegex(output, r'192.168.5.4 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
self.assertNotRegex(output, r'192.168.5.5 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
- self.assertNotRegex(output, r'192.168.5.0/24 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
+ self.assertNotRegex(output, r'192.168.6.0/24 via 192.168.5.2 proto dhcp src 192.168.5.[0-9]* metric 1024')
self.assertNotRegex(output, r'8.0.0.0/8 via 192.168.5.3 proto dhcp src 192.168.5.[0-9]* metric 1024')
+ self.assertNotRegex(output, r'192.168.5.2 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
self.assertNotRegex(output, r'192.168.5.3 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
# Check UseGateway=
if dns_and_ntp_routes:
self.assertRegex(output, r'192.168.5.10 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
self.assertRegex(output, r'192.168.5.11 proto dhcp scope link src 192.168.5.[0-9]* metric 1024')
- if classless and use_routes:
- self.assertRegex(output, r'8.8.8.8 via 192.168.5.4 proto dhcp src 192.168.5.[0-9]* metric 1024')
- self.assertRegex(output, r'9.9.9.9 via 192.168.5.4 proto dhcp src 192.168.5.[0-9]* metric 1024')
+ if use_routes:
+ if classless:
+ self.assertRegex(output, r'8.8.8.8 via 192.168.5.5 proto dhcp src 192.168.5.[0-9]* metric 1024')
+ self.assertRegex(output, r'9.9.9.9 via 192.168.5.4 proto dhcp src 192.168.5.[0-9]* metric 1024')
+ else:
+ self.assertRegex(output, r'8.8.8.8 via 192.168.5.3 proto dhcp src 192.168.5.[0-9]* metric 1024')
+ self.assertRegex(output, r'9.9.9.9 via 192.168.5.1 proto dhcp src 192.168.5.[0-9]* metric 1024')
else:
self.assertRegex(output, r'8.8.8.8 via 192.168.5.1 proto dhcp src 192.168.5.[0-9]* metric 1024')
self.assertRegex(output, r'9.9.9.9 via 192.168.5.1 proto dhcp src 192.168.5.[0-9]* metric 1024')
self.assertNotRegex(output, r'8.8.8.8 via 192.168.5.[0-9]* proto dhcp src 192.168.5.[0-9]* metric 1024')
self.assertNotRegex(output, r'9.9.9.9 via 192.168.5.[0-9]* proto dhcp src 192.168.5.[0-9]* metric 1024')
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
def test_dhcp_client_settings_anonymize(self):
copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client-anonymize.network')
else:
self.assertNotIn('2600::1', output)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client.network', copy_dropins=False)
check(self, False, True)
check(self, False, False)
+ def test_dhcp_client_use_captive_portal(self):
+ def check(self, ipv4, ipv6):
+ os.makedirs(os.path.join(network_unit_dir, '25-dhcp-client.network.d'), exist_ok=True)
+ with open(os.path.join(network_unit_dir, '25-dhcp-client.network.d/override.conf'), mode='w', encoding='utf-8') as f:
+ f.write('[DHCPv4]\nUseCaptivePortal=')
+ f.write('yes' if ipv4 else 'no')
+ f.write('\n[DHCPv6]\nUseCaptivePortal=')
+ f.write('yes' if ipv6 else 'no')
+ f.write('\n[IPv6AcceptRA]\nUseCaptivePortal=no')
+
+ networkctl_reload()
+ self.wait_online(['veth99:routable'])
+
+ # link becomes 'routable' when at least one protocol provide an valid address. Hence, we need to explicitly wait for both addresses.
+ self.wait_address('veth99', r'inet 192.168.5.[0-9]*/24 metric 1024 brd 192.168.5.255 scope global dynamic', ipv='-4')
+ self.wait_address('veth99', r'inet6 2600::[0-9a-f]*/128 scope global (dynamic noprefixroute|noprefixroute dynamic)', ipv='-6')
+
+ output = check_output(*networkctl_cmd, 'status', 'veth99', env=env)
+ print(output)
+ if ipv4 or ipv6:
+ self.assertIn('Captive Portal: http://systemd.io', output)
+ else:
+ self.assertNotIn('Captive Portal: http://systemd.io', output)
+
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
+
+ copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client.network', copy_dropins=False)
+
+ start_networkd()
+ self.wait_online(['veth-peer:carrier'])
+ start_dnsmasq('--dhcp-option=114,http://systemd.io',
+ '--dhcp-option=option6:103,http://systemd.io')
+
+ check(self, True, True)
+ check(self, True, False)
+ check(self, False, True)
+ check(self, False, False)
+
+ def test_dhcp_client_reject_captive_portal(self):
+ def check(self, ipv4, ipv6):
+ os.makedirs(os.path.join(network_unit_dir, '25-dhcp-client.network.d'), exist_ok=True)
+ with open(os.path.join(network_unit_dir, '25-dhcp-client.network.d/override.conf'), mode='w', encoding='utf-8') as f:
+ f.write('[DHCPv4]\nUseCaptivePortal=')
+ f.write('yes' if ipv4 else 'no')
+ f.write('\n[DHCPv6]\nUseCaptivePortal=')
+ f.write('yes' if ipv6 else 'no')
+ f.write('\n[IPv6AcceptRA]\nUseCaptivePortal=no')
+
+ networkctl_reload()
+ self.wait_online(['veth99:routable'])
+
+ # link becomes 'routable' when at least one protocol provide an valid address. Hence, we need to explicitly wait for both addresses.
+ self.wait_address('veth99', r'inet 192.168.5.[0-9]*/24 metric 1024 brd 192.168.5.255 scope global dynamic', ipv='-4')
+ self.wait_address('veth99', r'inet6 2600::[0-9a-f]*/128 scope global (dynamic noprefixroute|noprefixroute dynamic)', ipv='-6')
+
+ output = check_output(*networkctl_cmd, 'status', 'veth99', env=env)
+ print(output)
+ self.assertNotIn('Captive Portal: ', output)
+ self.assertNotIn('invalid/url', output)
+
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
+
+ copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client.network', copy_dropins=False)
+
+ start_networkd()
+ self.wait_online(['veth-peer:carrier'])
+ masq = lambda bs: ':'.join(f'{b:02x}' for b in bs)
+ start_dnsmasq('--dhcp-option=114,' + masq(b'http://\x00invalid/url'),
+ '--dhcp-option=option6:103,' + masq(b'http://\x00/invalid/url'))
+
+ check(self, True, True)
+ check(self, True, False)
+ check(self, False, True)
+ check(self, False, False)
+
class NetworkdDHCPPDTests(unittest.TestCase, Utilities):
def setUp(self):
tear_down_common()
def test_dhcp6pd(self):
+ def get_dbus_dhcp6_prefix(IF):
+ # busctl call org.freedesktop.network1 /org/freedesktop/network1 org.freedesktop.network1.Manager GetLinkByName s IF
+ out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
+ '/org/freedesktop/network1', 'org.freedesktop.network1.Manager',
+ 'GetLinkByName', 's', IF])
+
+ assert out.startswith(b'io ')
+ out = out.strip()
+ assert out.endswith(b'"')
+ out = out.decode()
+ linkPath = out[:-1].split('"')[1]
+
+ print(f"Found {IF} link path: {linkPath}")
+
+ out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
+ linkPath, 'org.freedesktop.network1.Link', 'Describe'])
+ assert out.startswith(b's "')
+ out = out.strip()
+ assert out.endswith(b'"')
+ json_raw = out[2:].decode()
+ check_json(json_raw)
+ description = json.loads(json_raw) # Convert from escaped sequences to json
+ check_json(description)
+ description = json.loads(description) # Now parse the json
+
+ self.assertIn('DHCPv6Client', description.keys())
+ self.assertIn('Prefixes', description['DHCPv6Client'])
+
+ prefixInfo = description['DHCPv6Client']['Prefixes']
+
+ return prefixInfo
+
copy_network_unit('25-veth.netdev', '25-dhcp6pd-server.network', '25-dhcp6pd-upstream.network',
'25-veth-downstream-veth97.netdev', '25-dhcp-pd-downstream-veth97.network', '25-dhcp-pd-downstream-veth97-peer.network',
'25-veth-downstream-veth98.netdev', '25-dhcp-pd-downstream-veth98.network', '25-dhcp-pd-downstream-veth98-peer.network',
'12-dummy.netdev', '25-dhcp-pd-downstream-dummy98.network',
'13-dummy.netdev', '25-dhcp-pd-downstream-dummy99.network')
+ self.setup_nftset('addr6', 'ipv6_addr')
+ self.setup_nftset('network6', 'ipv6_addr', 'flags interval;')
+ self.setup_nftset('ifindex', 'iface_index')
+
start_networkd()
self.wait_online(['veth-peer:routable'])
start_isc_dhcpd(conf_file='isc-dhcpd-dhcp6pd.conf', ipv='-6')
self.wait_online(['veth99:routable', 'test1:routable', 'dummy98:routable', 'dummy99:degraded',
'veth97:routable', 'veth97-peer:routable', 'veth98:routable', 'veth98-peer:routable'])
+ # Check DBus assigned prefix information to veth99
+ prefixInfo = get_dbus_dhcp6_prefix('veth99')
+
+ self.assertEqual(len(prefixInfo), 1)
+ prefixInfo = prefixInfo[0]
+
+ self.assertIn('Prefix', prefixInfo.keys())
+ self.assertIn('PrefixLength', prefixInfo.keys())
+ self.assertIn('PreferredLifetimeUSec', prefixInfo.keys())
+ self.assertIn('ValidLifetimeUSec', prefixInfo.keys())
+
+ self.assertEqual(prefixInfo['Prefix'][0:6], [63, 254, 5, 1, 255, 255])
+ self.assertEqual(prefixInfo['PrefixLength'], 56)
+ self.assertGreater(prefixInfo['PreferredLifetimeUSec'], 0)
+ self.assertGreater(prefixInfo['ValidLifetimeUSec'], 0)
+
print('### ip -6 address show dev veth-peer scope global')
output = check_output('ip -6 address show dev veth-peer scope global')
print(output)
self.check_netlabel('dummy98', '3ffe:501:ffff:[2-9a-f]00::/64')
+ self.check_nftset('addr6', '3ffe:501:ffff:[2-9a-f]00:1a:2b:3c:4d')
+ self.check_nftset('addr6', '3ffe:501:ffff:[2-9a-f]00:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*')
+ self.check_nftset('network6', '3ffe:501:ffff:[2-9a-f]00::/64')
+ self.check_nftset('ifindex', 'dummy98')
+
+ self.teardown_nftset('addr6', 'network6', 'ifindex')
+
def verify_dhcp4_6rd(self, tunnel_name):
print('### ip -4 address show dev veth-peer scope global')
output = check_output('ip -4 address show dev veth-peer scope global')
self.assertIn(f'via ::10.0.0.1 dev {tunnel_name}', output)
def test_dhcp4_6rd(self):
+ def get_dbus_dhcp_6rd_prefix(IF):
+ out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
+ '/org/freedesktop/network1', 'org.freedesktop.network1.Manager',
+ 'GetLinkByName', 's', IF])
+
+ assert out.startswith(b'io ')
+ out = out.strip()
+ assert out.endswith(b'"')
+ out = out.decode()
+ linkPath = out[:-1].split('"')[1]
+
+ print(f"Found {IF} link path: {linkPath}")
+
+ out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
+ linkPath, 'org.freedesktop.network1.Link', 'Describe'])
+ assert out.startswith(b's "')
+ out = out.strip()
+ assert out.endswith(b'"')
+ json_raw = out[2:].decode()
+ check_json(json_raw)
+ description = json.loads(json_raw) # Convert from escaped sequences to json
+ check_json(description)
+ description = json.loads(description) # Now parse the json
+
+ self.assertIn('DHCPv4Client', description.keys())
+ self.assertIn('6rdPrefix', description['DHCPv4Client'].keys())
+
+ prefixInfo = description['DHCPv4Client']['6rdPrefix']
+ self.assertIn('Prefix', prefixInfo.keys())
+ self.assertIn('PrefixLength', prefixInfo.keys())
+ self.assertIn('IPv4MaskLength', prefixInfo.keys())
+ self.assertIn('BorderRouters', prefixInfo.keys())
+
+ return prefixInfo
+
copy_network_unit('25-veth.netdev', '25-dhcp4-6rd-server.network', '25-dhcp4-6rd-upstream.network',
'25-veth-downstream-veth97.netdev', '25-dhcp-pd-downstream-veth97.network', '25-dhcp-pd-downstream-veth97-peer.network',
'25-veth-downstream-veth98.netdev', '25-dhcp-pd-downstream-veth98.network', '25-dhcp-pd-downstream-veth98-peer.network',
self.wait_online(['veth99:routable', 'test1:routable', 'dummy98:routable', 'dummy99:degraded',
'veth97:routable', 'veth97-peer:routable', 'veth98:routable', 'veth98-peer:routable'])
+ # Check the DBus interface for assigned prefix information
+ prefixInfo = get_dbus_dhcp_6rd_prefix('veth99')
+
+ self.assertEqual(prefixInfo['Prefix'], [32,1,13,184,0,0,0,0,0,0,0,0,0,0,0,0]) # 2001:db8::
+ self.assertEqual(prefixInfo['PrefixLength'], 32)
+ self.assertEqual(prefixInfo['IPv4MaskLength'], 8)
+ self.assertEqual(prefixInfo['BorderRouters'], [[10,0,0,1]])
+
# Test case for a downstream which appears later
check_output('ip link add dummy97 type dummy')
self.wait_online(['dummy97:routable'])
print(output)
self.assertIn('example.com', output)
- # TODO: check json string
- check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ output = check_output(*networkctl_cmd, '--json=short', 'status', env=env)
+ check_json(output)
+
+ output = check_output(*networkctl_cmd, '--json=short', 'status', 'veth-peer', env=env)
+ check_json(output)
+
+ # PREF64 or NAT64
+ pref64 = json.loads(output)['NDisc']['PREF64'][0]
+
+ prefix = socket.inet_ntop(socket.AF_INET6, bytearray(pref64['Prefix']))
+ self.assertEqual(prefix, '64:ff9b::')
+
+ prefix_length = pref64['PrefixLength']
+ self.assertEqual(prefix_length, 96)
def test_ipv6_route_prefix_deny_list(self):
copy_network_unit('25-veth.netdev', '25-ipv6ra-prefix-client-deny-list.network', '25-ipv6ra-prefix.network',
networkd_bin = os.path.join(ns.build_dir, 'systemd-networkd')
resolved_bin = os.path.join(ns.build_dir, 'systemd-resolved')
timesyncd_bin = os.path.join(ns.build_dir, 'systemd-timesyncd')
- udevd_bin = os.path.join(ns.build_dir, 'systemd-udevd')
+ udevd_bin = os.path.join(ns.build_dir, 'udevadm')
wait_online_bin = os.path.join(ns.build_dir, 'systemd-networkd-wait-online')
networkctl_bin = os.path.join(ns.build_dir, 'networkctl')
resolvectl_bin = os.path.join(ns.build_dir, 'resolvectl')