]>
git.ipfire.org Git - thirdparty/systemd.git/blob - test/networkd-test.py
a51b3ebde9a403d446ba7461414b9b5fded44e81
2 # SPDX-License-Identifier: LGPL-2.1+
4 # networkd integration test
5 # This uses temporary configuration in /run and temporary veth devices, and
6 # does not write anything on disk or change any system configuration;
7 # but it assumes (and checks at the beginning) that networkd is not currently
10 # This can be run on a normal installation, in QEMU, nspawn (with
11 # --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
12 # or LXC system containers. You need at least the "ip" tool from the iproute
13 # package; it is recommended to install dnsmasq too to get full test coverage.
15 # ATTENTION: This uses the *installed* networkd, not the one from the built
18 # (C) 2015 Canonical Ltd.
19 # Author: Martin Pitt <martin.pitt@ubuntu.com>
31 HAVE_DNSMASQ
= shutil
.which('dnsmasq') is not None
33 NETWORK_UNITDIR
= '/run/systemd/network'
35 NETWORKD_WAIT_ONLINE
= shutil
.which('systemd-networkd-wait-online',
36 path
='/usr/lib/systemd:/lib/systemd')
38 RESOLV_CONF
= '/run/systemd/resolve/resolv.conf'
42 """Initialize the environment, and perform sanity checks on it."""
43 if NETWORKD_WAIT_ONLINE
is None:
44 raise OSError(errno
.ENOENT
, 'systemd-networkd-wait-online not found')
46 # Do not run any tests if the system is using networkd already.
47 if subprocess
.call(['systemctl', 'is-active', '--quiet',
48 'systemd-networkd.service']) == 0:
49 raise unittest
.SkipTest('networkd is already active')
51 # Avoid "Failed to open /dev/tty" errors in containers.
52 os
.environ
['SYSTEMD_LOG_TARGET'] = 'journal'
54 # Ensure the unit directory exists so tests can dump files into it.
55 os
.makedirs(NETWORK_UNITDIR
, exist_ok
=True)
58 class NetworkdTestingUtilities
:
59 """Provide a set of utility functions to facilitate networkd tests.
61 This class must be inherited along with unittest.TestCase to define
62 some required methods.
65 def add_veth_pair(self
, veth
, peer
, veth_options
=(), peer_options
=()):
66 """Add a veth interface pair, and queue them to be removed."""
67 subprocess
.check_call(['ip', 'link', 'add', 'name', veth
] +
69 ['type', 'veth', 'peer', 'name', peer
] +
71 self
.addCleanup(subprocess
.call
, ['ip', 'link', 'del', 'dev', peer
])
73 def write_network(self
, unit_name
, contents
):
74 """Write a network unit file, and queue it to be removed."""
75 unit_path
= os
.path
.join(NETWORK_UNITDIR
, unit_name
)
77 with
open(unit_path
, 'w') as unit
:
79 self
.addCleanup(os
.remove
, unit_path
)
81 def write_network_dropin(self
, unit_name
, dropin_name
, contents
):
82 """Write a network unit drop-in, and queue it to be removed."""
83 dropin_dir
= os
.path
.join(NETWORK_UNITDIR
, "{}.d".format(unit_name
))
84 dropin_path
= os
.path
.join(dropin_dir
, "{}.conf".format(dropin_name
))
86 os
.makedirs(dropin_dir
, exist_ok
=True)
87 self
.addCleanup(os
.rmdir
, dropin_dir
)
88 with
open(dropin_path
, 'w') as dropin
:
89 dropin
.write(contents
)
90 self
.addCleanup(os
.remove
, dropin_path
)
92 def read_attr(self
, link
, attribute
):
93 """Read a link attributed from the sysfs."""
94 # Note we we don't want to check if interface `link' is managed, we
95 # want to evaluate link variable and pass the value of the link to
96 # assert_link_states e.g. eth0=managed.
97 self
.assert_link_states(**{link
:'managed'})
98 with
open(os
.path
.join('/sys/class/net', link
, attribute
)) as f
:
99 return f
.readline().strip()
101 def assert_link_states(self
, **kwargs
):
102 """Match networkctl link states to the given ones.
104 Each keyword argument should be the name of a network interface
105 with its expected value of the "SETUP" column in output from
106 networkctl. The interfaces have five seconds to come online
107 before the check is performed. Every specified interface must
108 be present in the output, and any other interfaces found in the
111 A special interface state "managed" is supported, which matches
112 any value in the "SETUP" column other than "unmanaged".
116 interfaces
= set(kwargs
)
118 # Wait for the requested interfaces, but don't fail for them.
119 subprocess
.call([NETWORKD_WAIT_ONLINE
, '--timeout=5'] +
120 ['--interface={}'.format(iface
) for iface
in kwargs
])
122 # Validate each link state found in the networkctl output.
123 out
= subprocess
.check_output(['networkctl', '--no-legend']).rstrip()
124 for line
in out
.decode('utf-8').split('\n'):
125 fields
= line
.split()
126 if len(fields
) >= 5 and fields
[1] in kwargs
:
128 expected
= kwargs
[iface
]
130 if (actual
!= expected
and
131 not (expected
== 'managed' and actual
!= 'unmanaged')):
132 self
.fail("Link {} expects state {}, found {}".format(iface
, expected
, actual
))
133 interfaces
.remove(iface
)
135 # Ensure that all requested interfaces have been covered.
137 self
.fail("Missing links in status output: {}".format(interfaces
))
140 class BridgeTest(NetworkdTestingUtilities
, unittest
.TestCase
):
141 """Provide common methods for testing networkd against servers."""
144 self
.write_network('port1.netdev', '''\
148 MACAddress=12:34:56:78:9a:bc''')
149 self
.write_network('port2.netdev', '''\
153 MACAddress=12:34:56:78:9a:bd''')
154 self
.write_network('mybridge.netdev', '''\
158 self
.write_network('port1.network', '''\
163 self
.write_network('port2.network', '''\
168 self
.write_network('mybridge.network', '''\
173 Address=192.168.250.33/24
174 Gateway=192.168.250.1''')
175 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
178 subprocess
.check_call(['systemctl', 'stop', 'systemd-networkd'])
179 subprocess
.check_call(['ip', 'link', 'del', 'mybridge'])
180 subprocess
.check_call(['ip', 'link', 'del', 'port1'])
181 subprocess
.check_call(['ip', 'link', 'del', 'port2'])
183 def test_bridge_init(self
):
184 self
.assert_link_states(
189 def test_bridge_port_priority(self
):
190 self
.assertEqual(self
.read_attr('port1', 'brport/priority'), '32')
191 self
.write_network_dropin('port1.network', 'priority', '''\
195 subprocess
.check_call(['systemctl', 'restart', 'systemd-networkd'])
196 self
.assertEqual(self
.read_attr('port1', 'brport/priority'), '28')
198 def test_bridge_port_priority_set_zero(self
):
199 """It should be possible to set the bridge port priority to 0"""
200 self
.assertEqual(self
.read_attr('port2', 'brport/priority'), '32')
201 self
.write_network_dropin('port2.network', 'priority', '''\
205 subprocess
.check_call(['systemctl', 'restart', 'systemd-networkd'])
206 self
.assertEqual(self
.read_attr('port2', 'brport/priority'), '0')
208 def test_bridge_port_property(self
):
209 """Test the "[Bridge]" section keys"""
210 self
.assertEqual(self
.read_attr('port2', 'brport/priority'), '32')
211 self
.write_network_dropin('port2.network', 'property', '''\
217 AllowPortToBeRoot=true
221 subprocess
.check_call(['systemctl', 'restart', 'systemd-networkd'])
223 self
.assertEqual(self
.read_attr('port2', 'brport/priority'), '23')
224 self
.assertEqual(self
.read_attr('port2', 'brport/hairpin_mode'), '1')
225 self
.assertEqual(self
.read_attr('port2', 'brport/path_cost'), '555')
226 self
.assertEqual(self
.read_attr('port2', 'brport/multicast_fast_leave'), '1')
227 self
.assertEqual(self
.read_attr('port2', 'brport/unicast_flood'), '1')
228 self
.assertEqual(self
.read_attr('port2', 'brport/bpdu_guard'), '1')
229 self
.assertEqual(self
.read_attr('port2', 'brport/root_block'), '1')
231 class ClientTestBase(NetworkdTestingUtilities
):
232 """Provide common methods for testing networkd against servers."""
235 def setUpClass(klass
):
236 klass
.orig_log_level
= subprocess
.check_output(
237 ['systemctl', 'show', '--value', '--property', 'LogLevel'],
238 universal_newlines
=True).strip()
239 subprocess
.check_call(['systemd-analyze', 'set-log-level', 'debug'])
242 def tearDownClass(klass
):
243 subprocess
.check_call(['systemd-analyze', 'set-log-level', klass
.orig_log_level
])
246 self
.iface
= 'test_eth42'
247 self
.if_router
= 'router_eth42'
248 self
.workdir_obj
= tempfile
.TemporaryDirectory()
249 self
.workdir
= self
.workdir_obj
.name
250 self
.config
= 'test_eth42.network'
252 # get current journal cursor
253 subprocess
.check_output(['journalctl', '--sync'])
254 out
= subprocess
.check_output(['journalctl', '-b', '--quiet',
255 '--no-pager', '-n0', '--show-cursor'],
256 universal_newlines
=True)
257 self
.assertTrue(out
.startswith('-- cursor:'))
258 self
.journal_cursor
= out
.split()[-1]
261 self
.shutdown_iface()
262 subprocess
.call(['systemctl', 'stop', 'systemd-networkd'])
263 subprocess
.call(['ip', 'link', 'del', 'dummy0'],
264 stderr
=subprocess
.DEVNULL
)
266 def show_journal(self
, unit
):
267 '''Show journal of given unit since start of the test'''
269 print('---- {} ----'.format(unit
))
270 subprocess
.check_output(['journalctl', '--sync'])
272 subprocess
.call(['journalctl', '-b', '--no-pager', '--quiet',
273 '--cursor', self
.journal_cursor
, '-u', unit
])
275 def create_iface(self
, ipv6
=False):
276 '''Create test interface with DHCP server behind it'''
278 raise NotImplementedError('must be implemented by a subclass')
280 def shutdown_iface(self
):
281 '''Remove test interface and stop DHCP server'''
283 raise NotImplementedError('must be implemented by a subclass')
285 def print_server_log(self
):
286 '''Print DHCP server log for debugging failures'''
288 raise NotImplementedError('must be implemented by a subclass')
290 def do_test(self
, coldplug
=True, ipv6
=False, extra_opts
='',
291 online_timeout
=10, dhcp_mode
='yes'):
293 subprocess
.check_call(['systemctl', 'start', 'systemd-resolved'])
294 except subprocess
.CalledProcessError
:
295 self
.show_journal('systemd-resolved.service')
297 self
.write_network(self
.config
, '''\
302 {}'''.format(self
.iface
, dhcp_mode
, extra_opts
))
305 # create interface first, then start networkd
306 self
.create_iface(ipv6
=ipv6
)
307 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
308 elif coldplug
is not None:
309 # start networkd first, then create interface
310 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
311 self
.create_iface(ipv6
=ipv6
)
313 # "None" means test sets up interface by itself
314 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
317 subprocess
.check_call([NETWORKD_WAIT_ONLINE
, '--interface',
318 self
.iface
, '--timeout=%i' % online_timeout
])
321 # check iface state and IP 6 address; FIXME: we need to wait a bit
322 # longer, as the iface is "configured" already with IPv4 *or*
323 # IPv6, but we want to wait for both
325 out
= subprocess
.check_output(['ip', 'a', 'show', 'dev', self
.iface
])
326 if b
'state UP' in out
and b
'inet6 2600' in out
and b
'inet 192.168' in out
:
330 self
.fail('timed out waiting for IPv6 configuration')
332 self
.assertRegex(out
, b
'inet6 2600::.* scope global .*dynamic')
333 self
.assertRegex(out
, b
'inet6 fe80::.* scope link')
335 # should have link-local address on IPv6 only
336 out
= subprocess
.check_output(['ip', '-6', 'a', 'show', 'dev', self
.iface
])
337 self
.assertRegex(out
, br
'inet6 fe80::.* scope link')
338 self
.assertNotIn(b
'scope global', out
)
340 # should have IPv4 address
341 out
= subprocess
.check_output(['ip', '-4', 'a', 'show', 'dev', self
.iface
])
342 self
.assertIn(b
'state UP', out
)
343 self
.assertRegex(out
, br
'inet 192.168.5.\d+/.* scope global dynamic')
345 # check networkctl state
346 out
= subprocess
.check_output(['networkctl'])
347 self
.assertRegex(out
, (r
'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self
.if_router
)).encode())
348 self
.assertRegex(out
, (r
'{}\s+ether\s+routable\s+configured'.format(self
.iface
)).encode())
350 out
= subprocess
.check_output(['networkctl', 'status', self
.iface
])
351 self
.assertRegex(out
, br
'Type:\s+ether')
352 self
.assertRegex(out
, br
'State:\s+routable.*configured')
353 self
.assertRegex(out
, br
'Address:\s+192.168.5.\d+')
355 self
.assertRegex(out
, br
'2600::')
357 self
.assertNotIn(br
'2600::', out
)
358 self
.assertRegex(out
, br
'fe80::')
359 self
.assertRegex(out
, br
'Gateway:\s+192.168.5.1')
360 self
.assertRegex(out
, br
'DNS:\s+192.168.5.1')
361 except (AssertionError, subprocess
.CalledProcessError
):
362 # show networkd status, journal, and DHCP server log on failure
363 with
open(os
.path
.join(NETWORK_UNITDIR
, self
.config
)) as f
:
364 print('\n---- {} ----\n{}'.format(self
.config
, f
.read()))
365 print('---- interface status ----')
367 subprocess
.call(['ip', 'a', 'show', 'dev', self
.iface
])
368 print('---- networkctl status {} ----'.format(self
.iface
))
370 subprocess
.call(['networkctl', 'status', self
.iface
])
371 self
.show_journal('systemd-networkd.service')
372 self
.print_server_log()
375 for timeout
in range(50):
376 with
open(RESOLV_CONF
) as f
:
378 if 'nameserver 192.168.5.1\n' in contents
:
382 self
.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF
)
384 if coldplug
is False:
385 # check post-down.d hook
386 self
.shutdown_iface()
388 def test_coldplug_dhcp_yes_ip4(self
):
389 # we have a 12s timeout on RA, so we need to wait longer
390 self
.do_test(coldplug
=True, ipv6
=False, online_timeout
=15)
392 def test_coldplug_dhcp_yes_ip4_no_ra(self
):
393 # with disabling RA explicitly things should be fast
394 self
.do_test(coldplug
=True, ipv6
=False,
395 extra_opts
='IPv6AcceptRA=False')
397 def test_coldplug_dhcp_ip4_only(self
):
398 # we have a 12s timeout on RA, so we need to wait longer
399 self
.do_test(coldplug
=True, ipv6
=False, dhcp_mode
='ipv4',
402 def test_coldplug_dhcp_ip4_only_no_ra(self
):
403 # with disabling RA explicitly things should be fast
404 self
.do_test(coldplug
=True, ipv6
=False, dhcp_mode
='ipv4',
405 extra_opts
='IPv6AcceptRA=False')
407 def test_coldplug_dhcp_ip6(self
):
408 self
.do_test(coldplug
=True, ipv6
=True)
410 def test_hotplug_dhcp_ip4(self
):
411 # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
412 self
.do_test(coldplug
=False, ipv6
=False, online_timeout
=15)
414 def test_hotplug_dhcp_ip6(self
):
415 self
.do_test(coldplug
=False, ipv6
=True)
417 def test_route_only_dns(self
):
418 self
.write_network('myvpn.netdev', '''\
422 MACAddress=12:34:56:78:9a:bc''')
423 self
.write_network('myvpn.network', '''\
427 Address=192.168.42.100
429 Domains= ~company''')
431 self
.do_test(coldplug
=True, ipv6
=False,
432 extra_opts
='IPv6AcceptRouterAdvertisements=False')
434 with
open(RESOLV_CONF
) as f
:
436 # ~company is not a search domain, only a routing domain
437 self
.assertNotRegex(contents
, 'search.*company')
438 # our global server should appear
439 self
.assertIn('nameserver 192.168.5.1\n', contents
)
440 # should not have domain-restricted server as global server
441 self
.assertNotIn('nameserver 192.168.42.1\n', contents
)
443 def test_route_only_dns_all_domains(self
):
444 self
.write_network('myvpn.netdev', '''[NetDev]
447 MACAddress=12:34:56:78:9a:bc''')
448 self
.write_network('myvpn.network', '''[Match]
451 Address=192.168.42.100
453 Domains= ~company ~.''')
455 self
.do_test(coldplug
=True, ipv6
=False,
456 extra_opts
='IPv6AcceptRouterAdvertisements=False')
458 with
open(RESOLV_CONF
) as f
:
461 # ~company is not a search domain, only a routing domain
462 self
.assertNotRegex(contents
, 'search.*company')
464 # our global server should appear
465 self
.assertIn('nameserver 192.168.5.1\n', contents
)
466 # should have company server as global server due to ~.
467 self
.assertIn('nameserver 192.168.42.1\n', contents
)
470 @unittest.skipUnless(HAVE_DNSMASQ
, 'dnsmasq not installed')
471 class DnsmasqClientTest(ClientTestBase
, unittest
.TestCase
):
472 '''Test networkd client against dnsmasq'''
477 self
.iface_mac
= 'de:ad:be:ef:47:11'
479 def create_iface(self
, ipv6
=False, dnsmasq_opts
=None):
480 '''Create test interface with DHCP server behind it'''
483 subprocess
.check_call(['ip', 'link', 'add', 'name', self
.iface
,
484 'address', self
.iface_mac
,
485 'type', 'veth', 'peer', 'name', self
.if_router
])
487 # give our router an IP
488 subprocess
.check_call(['ip', 'a', 'flush', 'dev', self
.if_router
])
489 subprocess
.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self
.if_router
])
491 subprocess
.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self
.if_router
])
492 subprocess
.check_call(['ip', 'link', 'set', self
.if_router
, 'up'])
495 self
.dnsmasq_log
= os
.path
.join(self
.workdir
, 'dnsmasq.log')
496 lease_file
= os
.path
.join(self
.workdir
, 'dnsmasq.leases')
498 extra_opts
= ['--enable-ra', '--dhcp-range=2600::10,2600::20']
502 extra_opts
+= dnsmasq_opts
503 self
.dnsmasq
= subprocess
.Popen(
504 ['dnsmasq', '--keep-in-foreground', '--log-queries',
505 '--log-facility=' + self
.dnsmasq_log
, '--conf-file=/dev/null',
506 '--dhcp-leasefile=' + lease_file
, '--bind-interfaces',
507 '--interface=' + self
.if_router
, '--except-interface=lo',
508 '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts
)
510 def shutdown_iface(self
):
511 '''Remove test interface and stop DHCP server'''
514 subprocess
.check_call(['ip', 'link', 'del', 'dev', self
.if_router
])
515 self
.if_router
= None
521 def print_server_log(self
):
522 '''Print DHCP server log for debugging failures'''
524 with
open(self
.dnsmasq_log
) as f
:
525 sys
.stdout
.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f
.read()))
527 def test_resolved_domain_restricted_dns(self
):
528 '''resolved: domain-restricted DNS servers'''
530 # create interface for generic connections; this will map all DNS names
532 self
.create_iface(dnsmasq_opts
=['--address=/#/192.168.42.1'])
533 self
.write_network('general.network', '''\
538 IPv6AcceptRA=False'''.format(self
.iface
))
540 # create second device/dnsmasq for a .company/.lab VPN interface
541 # static IPs for simplicity
542 self
.add_veth_pair('testvpnclient', 'testvpnrouter')
543 subprocess
.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
544 subprocess
.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
545 subprocess
.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
547 vpn_dnsmasq_log
= os
.path
.join(self
.workdir
, 'dnsmasq-vpn.log')
548 vpn_dnsmasq
= subprocess
.Popen(
549 ['dnsmasq', '--keep-in-foreground', '--log-queries',
550 '--log-facility=' + vpn_dnsmasq_log
, '--conf-file=/dev/null',
551 '--dhcp-leasefile=/dev/null', '--bind-interfaces',
552 '--interface=testvpnrouter', '--except-interface=lo',
553 '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
554 self
.addCleanup(vpn_dnsmasq
.wait
)
555 self
.addCleanup(vpn_dnsmasq
.kill
)
557 self
.write_network('vpn.network', '''\
562 Address=10.241.3.2/24
564 Domains= ~company ~lab''')
566 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
567 subprocess
.check_call([NETWORKD_WAIT_ONLINE
, '--interface', self
.iface
,
568 '--interface=testvpnclient', '--timeout=20'])
570 # ensure we start fresh with every test
571 subprocess
.check_call(['systemctl', 'restart', 'systemd-resolved'])
573 # test vpnclient specific domains; these should *not* be answered by
575 out
= subprocess
.check_output(['systemd-resolve', 'math.lab'])
576 self
.assertIn(b
'math.lab: 10.241.3.3', out
)
577 out
= subprocess
.check_output(['systemd-resolve', 'kettle.cantina.company'])
578 self
.assertIn(b
'kettle.cantina.company: 10.241.4.4', out
)
580 # test general domains
581 out
= subprocess
.check_output(['systemd-resolve', 'megasearch.net'])
582 self
.assertIn(b
'megasearch.net: 192.168.42.1', out
)
584 with
open(self
.dnsmasq_log
) as f
:
585 general_log
= f
.read()
586 with
open(vpn_dnsmasq_log
) as f
:
589 # VPN domains should only be sent to VPN DNS
590 self
.assertRegex(vpn_log
, 'query.*math.lab')
591 self
.assertRegex(vpn_log
, 'query.*cantina.company')
592 self
.assertNotIn('.lab', general_log
)
593 self
.assertNotIn('.company', general_log
)
595 # general domains should not be sent to the VPN DNS
596 self
.assertRegex(general_log
, 'query.*megasearch.net')
597 self
.assertNotIn('megasearch.net', vpn_log
)
599 def test_resolved_etc_hosts(self
):
600 '''resolved queries to /etc/hosts'''
602 # FIXME: -t MX query fails with enabled DNSSEC (even when using
603 # the known negative trust anchor .internal instead of .example)
604 conf
= '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
605 os
.makedirs(os
.path
.dirname(conf
), exist_ok
=True)
606 with
open(conf
, 'w') as f
:
607 f
.write('[Resolve]\nDNSSEC=no')
608 self
.addCleanup(os
.remove
, conf
)
610 # create /etc/hosts bind mount which resolves my.example for IPv4
611 hosts
= os
.path
.join(self
.workdir
, 'hosts')
612 with
open(hosts
, 'w') as f
:
613 f
.write('172.16.99.99 my.example\n')
614 subprocess
.check_call(['mount', '--bind', hosts
, '/etc/hosts'])
615 self
.addCleanup(subprocess
.call
, ['umount', '/etc/hosts'])
616 subprocess
.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
618 # note: different IPv4 address here, so that it's easy to tell apart
619 # what resolved the query
620 self
.create_iface(dnsmasq_opts
=['--host-record=my.example,172.16.99.1,2600::99:99',
621 '--host-record=other.example,172.16.0.42,2600::42',
622 '--mx-host=example,mail.example'],
624 self
.do_test(coldplug
=None, ipv6
=True)
627 # family specific queries
628 out
= subprocess
.check_output(['systemd-resolve', '-4', 'my.example'])
629 self
.assertIn(b
'my.example: 172.16.99.99', out
)
630 # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
631 # it's considered a sufficient source
632 self
.assertNotEqual(subprocess
.call(['systemd-resolve', '-6', 'my.example']), 0)
633 # "any family" query; IPv4 should come from /etc/hosts
634 out
= subprocess
.check_output(['systemd-resolve', 'my.example'])
635 self
.assertIn(b
'my.example: 172.16.99.99', out
)
636 # IP → name lookup; again, takes the /etc/hosts one
637 out
= subprocess
.check_output(['systemd-resolve', '172.16.99.99'])
638 self
.assertIn(b
'172.16.99.99: my.example', out
)
640 # non-address RRs should fall back to DNS
641 out
= subprocess
.check_output(['systemd-resolve', '--type=MX', 'example'])
642 self
.assertIn(b
'example IN MX 1 mail.example', out
)
644 # other domains query DNS
645 out
= subprocess
.check_output(['systemd-resolve', 'other.example'])
646 self
.assertIn(b
'172.16.0.42', out
)
647 out
= subprocess
.check_output(['systemd-resolve', '172.16.0.42'])
648 self
.assertIn(b
'172.16.0.42: other.example', out
)
649 except (AssertionError, subprocess
.CalledProcessError
):
650 self
.show_journal('systemd-resolved.service')
651 self
.print_server_log()
654 def test_transient_hostname(self
):
655 '''networkd sets transient hostname from DHCP'''
657 orig_hostname
= socket
.gethostname()
658 self
.addCleanup(socket
.sethostname
, orig_hostname
)
659 # temporarily move /etc/hostname away; restart hostnamed to pick it up
660 if os
.path
.exists('/etc/hostname'):
661 subprocess
.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
662 self
.addCleanup(subprocess
.call
, ['umount', '/etc/hostname'])
663 subprocess
.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
665 self
.create_iface(dnsmasq_opts
=['--dhcp-host={},192.168.5.210,testgreen'.format(self
.iface_mac
)])
666 self
.do_test(coldplug
=None, extra_opts
='IPv6AcceptRA=False', dhcp_mode
='ipv4')
669 # should have received the fixed IP above
670 out
= subprocess
.check_output(['ip', '-4', 'a', 'show', 'dev', self
.iface
])
671 self
.assertRegex(out
, b
'inet 192.168.5.210/24 .* scope global dynamic')
672 # should have set transient hostname in hostnamed; this is
673 # sometimes a bit lagging (issue #4753), so retry a few times
674 for retry
in range(1, 6):
675 out
= subprocess
.check_output(['hostnamectl'])
676 if b
'testgreen' in out
:
679 sys
.stdout
.write('[retry %i] ' % retry
)
682 self
.fail('Transient hostname not found in hostnamectl:\n{}'.format(out
.decode()))
683 # and also applied to the system
684 self
.assertEqual(socket
.gethostname(), 'testgreen')
685 except AssertionError:
686 self
.show_journal('systemd-networkd.service')
687 self
.show_journal('systemd-hostnamed.service')
688 self
.print_server_log()
691 def test_transient_hostname_with_static(self
):
692 '''transient hostname is not applied if static hostname exists'''
694 orig_hostname
= socket
.gethostname()
695 self
.addCleanup(socket
.sethostname
, orig_hostname
)
696 if not os
.path
.exists('/etc/hostname'):
697 self
.writeConfig('/etc/hostname', orig_hostname
)
698 subprocess
.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
700 self
.create_iface(dnsmasq_opts
=['--dhcp-host={},192.168.5.210,testgreen'.format(self
.iface_mac
)])
701 self
.do_test(coldplug
=None, extra_opts
='IPv6AcceptRA=False', dhcp_mode
='ipv4')
704 # should have received the fixed IP above
705 out
= subprocess
.check_output(['ip', '-4', 'a', 'show', 'dev', self
.iface
])
706 self
.assertRegex(out
, b
'inet 192.168.5.210/24 .* scope global dynamic')
707 # static hostname wins over transient one, thus *not* applied
708 self
.assertEqual(socket
.gethostname(), orig_hostname
)
709 except AssertionError:
710 self
.show_journal('systemd-networkd.service')
711 self
.show_journal('systemd-hostnamed.service')
712 self
.print_server_log()
716 class NetworkdClientTest(ClientTestBase
, unittest
.TestCase
):
717 '''Test networkd client against networkd server'''
723 def create_iface(self
, ipv6
=False, dhcpserver_opts
=None):
724 '''Create test interface with DHCP server behind it'''
726 # run "router-side" networkd in own mount namespace to shield it from
727 # "client-side" configuration and networkd
728 (fd
, script
) = tempfile
.mkstemp(prefix
='networkd-router.sh')
729 self
.addCleanup(os
.remove
, script
)
730 with os
.fdopen(fd
, 'w+') as f
:
734 mkdir -p /run/systemd/network
735 mkdir -p /run/systemd/netif
736 mount -t tmpfs none /run/systemd/network
737 mount -t tmpfs none /run/systemd/netif
738 [ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
739 # create router/client veth pair
740 cat << EOF > /run/systemd/network/test.netdev
749 cat << EOF > /run/systemd/network/test.network
754 Address=192.168.5.1/24
765 # run networkd as in systemd-networkd.service
766 exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
767 ''' % {'ifr': self
.if_router
, 'ifc': self
.iface
, 'addr6': ipv6
and 'Address=2600::1/64' or '',
768 'dhopts': dhcpserver_opts
or ''})
772 subprocess
.check_call(['systemd-run', '--unit=networkd-test-router.service',
773 '-p', 'InaccessibleDirectories=-/etc/systemd/network',
774 '-p', 'InaccessibleDirectories=-/run/systemd/network',
775 '-p', 'InaccessibleDirectories=-/run/systemd/netif',
776 '--service-type=notify', script
])
778 # wait until devices got created
780 out
= subprocess
.check_output(['ip', 'a', 'show', 'dev', self
.if_router
])
781 if b
'state UP' in out
and b
'scope global' in out
:
785 def shutdown_iface(self
):
786 '''Remove test interface and stop DHCP server'''
789 subprocess
.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
790 # ensure failed transient unit does not stay around
791 subprocess
.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
792 subprocess
.call(['ip', 'link', 'del', 'dev', self
.if_router
])
793 self
.if_router
= None
795 def print_server_log(self
):
796 '''Print DHCP server log for debugging failures'''
798 self
.show_journal('networkd-test-router.service')
800 @unittest.skip('networkd does not have DHCPv6 server support')
801 def test_hotplug_dhcp_ip6(self
):
804 @unittest.skip('networkd does not have DHCPv6 server support')
805 def test_coldplug_dhcp_ip6(self
):
808 def test_search_domains(self
):
810 # we don't use this interface for this test
811 self
.if_router
= None
813 self
.write_network('test.netdev', '''\
817 MACAddress=12:34:56:78:9a:bc''')
818 self
.write_network('test.network', '''\
822 Address=192.168.42.100
824 Domains= one two three four five six seven eight nine ten''')
826 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
828 for timeout
in range(50):
829 with
open(RESOLV_CONF
) as f
:
831 if ' one' in contents
:
834 self
.assertRegex(contents
, 'search .*one two three four')
835 self
.assertNotIn('seven\n', contents
)
836 self
.assertIn('# Too many search domains configured, remaining ones ignored.\n', contents
)
838 def test_search_domains_too_long(self
):
840 # we don't use this interface for this test
841 self
.if_router
= None
843 name_prefix
= 'a' * 60
845 self
.write_network('test.netdev', '''\
849 MACAddress=12:34:56:78:9a:bc''')
850 self
.write_network('test.network', '''\
854 Address=192.168.42.100
856 Domains={p}0 {p}1 {p}2 {p}3 {p}4'''.format(p
=name_prefix
))
858 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
860 for timeout
in range(50):
861 with
open(RESOLV_CONF
) as f
:
863 if ' one' in contents
:
866 self
.assertRegex(contents
, 'search .*{p}0 {p}1 {p}2'.format(p
=name_prefix
))
867 self
.assertIn('# Total length of all search domains is too long, remaining ones ignored.', contents
)
869 def test_dropin(self
):
870 # we don't use this interface for this test
871 self
.if_router
= None
873 self
.write_network('test.netdev', '''\
877 MACAddress=12:34:56:78:9a:bc''')
878 self
.write_network('test.network', '''\
882 Address=192.168.42.100
884 self
.write_network_dropin('test.network', 'dns', '''\
888 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
890 for timeout
in range(50):
891 with
open(RESOLV_CONF
) as f
:
893 if ' 127.0.0.1' in contents
:
896 self
.assertIn('nameserver 192.168.42.1\n', contents
)
897 self
.assertIn('nameserver 127.0.0.1\n', contents
)
899 def test_dhcp_timezone(self
):
900 '''networkd sets time zone from DHCP'''
903 out
= subprocess
.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
904 '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
905 assert out
.startswith(b
's "')
907 assert out
.endswith(b
'"')
908 return out
[3:-1].decode()
910 orig_timezone
= get_tz()
911 self
.addCleanup(subprocess
.call
, ['timedatectl', 'set-timezone', orig_timezone
])
913 self
.create_iface(dhcpserver_opts
='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
914 self
.do_test(coldplug
=None, extra_opts
='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode
='ipv4')
916 # should have applied the received timezone
918 self
.assertEqual(get_tz(), 'Pacific/Honolulu')
919 except AssertionError:
920 self
.show_journal('systemd-networkd.service')
921 self
.show_journal('systemd-hostnamed.service')
925 class MatchClientTest(unittest
.TestCase
, NetworkdTestingUtilities
):
926 """Test [Match] sections in .network files.
928 Be aware that matching the test host's interfaces will wipe their
929 configuration, so as a precaution, all network files should have a
930 restrictive [Match] section to only ever interfere with the
931 temporary veth interfaces created here.
936 subprocess
.call(['systemctl', 'stop', 'systemd-networkd'])
938 def test_basic_matching(self
):
939 """Verify the Name= line works throughout this class."""
940 self
.add_veth_pair('test_if1', 'fake_if2')
941 self
.write_network('test.network', "[Match]\nName=test_*\n[Network]")
942 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
943 self
.assert_link_states(test_if1
='managed', fake_if2
='unmanaged')
945 def test_inverted_matching(self
):
946 """Verify that a '!'-prefixed value inverts the match."""
947 # Use a MAC address as the interfaces' common matching attribute
948 # to avoid depending on udev, to support testing in containers.
949 mac
= '00:01:02:03:98:99'
950 self
.add_veth_pair('test_veth', 'test_peer',
951 ['addr', mac
], ['addr', mac
])
952 self
.write_network('no-veth.network', """\
955 Name=!nonexistent *peer*
956 [Network]""".format(mac
))
957 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
958 self
.assert_link_states(test_veth
='managed', test_peer
='unmanaged')
961 class UnmanagedClientTest(unittest
.TestCase
, NetworkdTestingUtilities
):
962 """Test if networkd manages the correct interfaces."""
965 """Write .network files to match the named veth devices."""
966 # Define the veth+peer pairs to be created.
967 # Their pairing doesn't actually matter, only their names do.
973 # Define the contents of .network files to be read in order.
975 "[Match]\nName=m1def\n",
976 "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
977 "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
980 # Write out the .network files to be cleaned up automatically.
981 for i
, config
in enumerate(self
.configs
):
982 self
.write_network("%02d-test.network" % i
, config
)
986 subprocess
.call(['systemctl', 'stop', 'systemd-networkd'])
988 def create_iface(self
):
989 """Create temporary veth pairs for interface matching."""
990 for veth
, peer
in self
.veths
.items():
991 self
.add_veth_pair(veth
, peer
)
993 def test_unmanaged_setting(self
):
994 """Verify link states with Unmanaged= settings, hot-plug."""
995 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
997 self
.assert_link_states(m1def
='managed',
1002 def test_unmanaged_setting_coldplug(self
):
1003 """Verify link states with Unmanaged= settings, cold-plug."""
1005 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
1006 self
.assert_link_states(m1def
='managed',
1011 def test_catchall_config(self
):
1012 """Verify link states with a catch-all config, hot-plug."""
1013 # Don't actually catch ALL interfaces. It messes up the host.
1014 self
.write_network('all.network', "[Match]\nName=m[01]???\n")
1015 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
1017 self
.assert_link_states(m1def
='managed',
1022 def test_catchall_config_coldplug(self
):
1023 """Verify link states with a catch-all config, cold-plug."""
1024 # Don't actually catch ALL interfaces. It messes up the host.
1025 self
.write_network('all.network', "[Match]\nName=m[01]???\n")
1027 subprocess
.check_call(['systemctl', 'start', 'systemd-networkd'])
1028 self
.assert_link_states(m1def
='managed',
1034 if __name__
== '__main__':
1035 unittest
.main(testRunner
=unittest
.TextTestRunner(stream
=sys
.stdout
,