]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/networkd-test.py
a51b3ebde9a403d446ba7461414b9b5fded44e81
[thirdparty/systemd.git] / test / networkd-test.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: LGPL-2.1+
3 #
4 # networkd integration test
5 # This uses temporary configuration in /run and temporary veth devices, and
6 # does not write anything on disk or change any system configuration;
7 # but it assumes (and checks at the beginning) that networkd is not currently
8 # running.
9 #
10 # This can be run on a normal installation, in QEMU, nspawn (with
11 # --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
12 # or LXC system containers. You need at least the "ip" tool from the iproute
13 # package; it is recommended to install dnsmasq too to get full test coverage.
14 #
15 # ATTENTION: This uses the *installed* networkd, not the one from the built
16 # source tree.
17 #
18 # (C) 2015 Canonical Ltd.
19 # Author: Martin Pitt <martin.pitt@ubuntu.com>
20
21 import errno
22 import os
23 import shutil
24 import socket
25 import subprocess
26 import sys
27 import tempfile
28 import time
29 import unittest
30
31 HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
32
33 NETWORK_UNITDIR = '/run/systemd/network'
34
35 NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
36 path='/usr/lib/systemd:/lib/systemd')
37
38 RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
39
40
41 def setUpModule():
42 """Initialize the environment, and perform sanity checks on it."""
43 if NETWORKD_WAIT_ONLINE is None:
44 raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
45
46 # Do not run any tests if the system is using networkd already.
47 if subprocess.call(['systemctl', 'is-active', '--quiet',
48 'systemd-networkd.service']) == 0:
49 raise unittest.SkipTest('networkd is already active')
50
51 # Avoid "Failed to open /dev/tty" errors in containers.
52 os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
53
54 # Ensure the unit directory exists so tests can dump files into it.
55 os.makedirs(NETWORK_UNITDIR, exist_ok=True)
56
57
58 class NetworkdTestingUtilities:
59 """Provide a set of utility functions to facilitate networkd tests.
60
61 This class must be inherited along with unittest.TestCase to define
62 some required methods.
63 """
64
65 def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
66 """Add a veth interface pair, and queue them to be removed."""
67 subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
68 list(veth_options) +
69 ['type', 'veth', 'peer', 'name', peer] +
70 list(peer_options))
71 self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
72
73 def write_network(self, unit_name, contents):
74 """Write a network unit file, and queue it to be removed."""
75 unit_path = os.path.join(NETWORK_UNITDIR, unit_name)
76
77 with open(unit_path, 'w') as unit:
78 unit.write(contents)
79 self.addCleanup(os.remove, unit_path)
80
81 def write_network_dropin(self, unit_name, dropin_name, contents):
82 """Write a network unit drop-in, and queue it to be removed."""
83 dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
84 dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
85
86 os.makedirs(dropin_dir, exist_ok=True)
87 self.addCleanup(os.rmdir, dropin_dir)
88 with open(dropin_path, 'w') as dropin:
89 dropin.write(contents)
90 self.addCleanup(os.remove, dropin_path)
91
92 def read_attr(self, link, attribute):
93 """Read a link attributed from the sysfs."""
94 # Note we we don't want to check if interface `link' is managed, we
95 # want to evaluate link variable and pass the value of the link to
96 # assert_link_states e.g. eth0=managed.
97 self.assert_link_states(**{link:'managed'})
98 with open(os.path.join('/sys/class/net', link, attribute)) as f:
99 return f.readline().strip()
100
101 def assert_link_states(self, **kwargs):
102 """Match networkctl link states to the given ones.
103
104 Each keyword argument should be the name of a network interface
105 with its expected value of the "SETUP" column in output from
106 networkctl. The interfaces have five seconds to come online
107 before the check is performed. Every specified interface must
108 be present in the output, and any other interfaces found in the
109 output are ignored.
110
111 A special interface state "managed" is supported, which matches
112 any value in the "SETUP" column other than "unmanaged".
113 """
114 if not kwargs:
115 return
116 interfaces = set(kwargs)
117
118 # Wait for the requested interfaces, but don't fail for them.
119 subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
120 ['--interface={}'.format(iface) for iface in kwargs])
121
122 # Validate each link state found in the networkctl output.
123 out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
124 for line in out.decode('utf-8').split('\n'):
125 fields = line.split()
126 if len(fields) >= 5 and fields[1] in kwargs:
127 iface = fields[1]
128 expected = kwargs[iface]
129 actual = fields[-1]
130 if (actual != expected and
131 not (expected == 'managed' and actual != 'unmanaged')):
132 self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
133 interfaces.remove(iface)
134
135 # Ensure that all requested interfaces have been covered.
136 if interfaces:
137 self.fail("Missing links in status output: {}".format(interfaces))
138
139
140 class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
141 """Provide common methods for testing networkd against servers."""
142
143 def setUp(self):
144 self.write_network('port1.netdev', '''\
145 [NetDev]
146 Name=port1
147 Kind=dummy
148 MACAddress=12:34:56:78:9a:bc''')
149 self.write_network('port2.netdev', '''\
150 [NetDev]
151 Name=port2
152 Kind=dummy
153 MACAddress=12:34:56:78:9a:bd''')
154 self.write_network('mybridge.netdev', '''\
155 [NetDev]
156 Name=mybridge
157 Kind=bridge''')
158 self.write_network('port1.network', '''\
159 [Match]
160 Name=port1
161 [Network]
162 Bridge=mybridge''')
163 self.write_network('port2.network', '''\
164 [Match]
165 Name=port2
166 [Network]
167 Bridge=mybridge''')
168 self.write_network('mybridge.network', '''\
169 [Match]
170 Name=mybridge
171 [Network]
172 DNS=192.168.250.1
173 Address=192.168.250.33/24
174 Gateway=192.168.250.1''')
175 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
176
177 def tearDown(self):
178 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd'])
179 subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
180 subprocess.check_call(['ip', 'link', 'del', 'port1'])
181 subprocess.check_call(['ip', 'link', 'del', 'port2'])
182
183 def test_bridge_init(self):
184 self.assert_link_states(
185 port1='managed',
186 port2='managed',
187 mybridge='managed')
188
189 def test_bridge_port_priority(self):
190 self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
191 self.write_network_dropin('port1.network', 'priority', '''\
192 [Bridge]
193 Priority=28
194 ''')
195 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
196 self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
197
198 def test_bridge_port_priority_set_zero(self):
199 """It should be possible to set the bridge port priority to 0"""
200 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
201 self.write_network_dropin('port2.network', 'priority', '''\
202 [Bridge]
203 Priority=0
204 ''')
205 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
206 self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
207
208 def test_bridge_port_property(self):
209 """Test the "[Bridge]" section keys"""
210 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
211 self.write_network_dropin('port2.network', 'property', '''\
212 [Bridge]
213 UnicastFlood=true
214 HairPin=true
215 UseBPDU=true
216 FastLeave=true
217 AllowPortToBeRoot=true
218 Cost=555
219 Priority=23
220 ''')
221 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
222
223 self.assertEqual(self.read_attr('port2', 'brport/priority'), '23')
224 self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1')
225 self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555')
226 self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1')
227 self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1')
228 self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1')
229 self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1')
230
231 class ClientTestBase(NetworkdTestingUtilities):
232 """Provide common methods for testing networkd against servers."""
233
234 @classmethod
235 def setUpClass(klass):
236 klass.orig_log_level = subprocess.check_output(
237 ['systemctl', 'show', '--value', '--property', 'LogLevel'],
238 universal_newlines=True).strip()
239 subprocess.check_call(['systemd-analyze', 'set-log-level', 'debug'])
240
241 @classmethod
242 def tearDownClass(klass):
243 subprocess.check_call(['systemd-analyze', 'set-log-level', klass.orig_log_level])
244
245 def setUp(self):
246 self.iface = 'test_eth42'
247 self.if_router = 'router_eth42'
248 self.workdir_obj = tempfile.TemporaryDirectory()
249 self.workdir = self.workdir_obj.name
250 self.config = 'test_eth42.network'
251
252 # get current journal cursor
253 subprocess.check_output(['journalctl', '--sync'])
254 out = subprocess.check_output(['journalctl', '-b', '--quiet',
255 '--no-pager', '-n0', '--show-cursor'],
256 universal_newlines=True)
257 self.assertTrue(out.startswith('-- cursor:'))
258 self.journal_cursor = out.split()[-1]
259
260 def tearDown(self):
261 self.shutdown_iface()
262 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
263 subprocess.call(['ip', 'link', 'del', 'dummy0'],
264 stderr=subprocess.DEVNULL)
265
266 def show_journal(self, unit):
267 '''Show journal of given unit since start of the test'''
268
269 print('---- {} ----'.format(unit))
270 subprocess.check_output(['journalctl', '--sync'])
271 sys.stdout.flush()
272 subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
273 '--cursor', self.journal_cursor, '-u', unit])
274
275 def create_iface(self, ipv6=False):
276 '''Create test interface with DHCP server behind it'''
277
278 raise NotImplementedError('must be implemented by a subclass')
279
280 def shutdown_iface(self):
281 '''Remove test interface and stop DHCP server'''
282
283 raise NotImplementedError('must be implemented by a subclass')
284
285 def print_server_log(self):
286 '''Print DHCP server log for debugging failures'''
287
288 raise NotImplementedError('must be implemented by a subclass')
289
290 def do_test(self, coldplug=True, ipv6=False, extra_opts='',
291 online_timeout=10, dhcp_mode='yes'):
292 try:
293 subprocess.check_call(['systemctl', 'start', 'systemd-resolved'])
294 except subprocess.CalledProcessError:
295 self.show_journal('systemd-resolved.service')
296 raise
297 self.write_network(self.config, '''\
298 [Match]
299 Name={}
300 [Network]
301 DHCP={}
302 {}'''.format(self.iface, dhcp_mode, extra_opts))
303
304 if coldplug:
305 # create interface first, then start networkd
306 self.create_iface(ipv6=ipv6)
307 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
308 elif coldplug is not None:
309 # start networkd first, then create interface
310 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
311 self.create_iface(ipv6=ipv6)
312 else:
313 # "None" means test sets up interface by itself
314 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
315
316 try:
317 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
318 self.iface, '--timeout=%i' % online_timeout])
319
320 if ipv6:
321 # check iface state and IP 6 address; FIXME: we need to wait a bit
322 # longer, as the iface is "configured" already with IPv4 *or*
323 # IPv6, but we want to wait for both
324 for _ in range(10):
325 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
326 if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out:
327 break
328 time.sleep(1)
329 else:
330 self.fail('timed out waiting for IPv6 configuration')
331
332 self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
333 self.assertRegex(out, b'inet6 fe80::.* scope link')
334 else:
335 # should have link-local address on IPv6 only
336 out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
337 self.assertRegex(out, br'inet6 fe80::.* scope link')
338 self.assertNotIn(b'scope global', out)
339
340 # should have IPv4 address
341 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
342 self.assertIn(b'state UP', out)
343 self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
344
345 # check networkctl state
346 out = subprocess.check_output(['networkctl'])
347 self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
348 self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
349
350 out = subprocess.check_output(['networkctl', 'status', self.iface])
351 self.assertRegex(out, br'Type:\s+ether')
352 self.assertRegex(out, br'State:\s+routable.*configured')
353 self.assertRegex(out, br'Address:\s+192.168.5.\d+')
354 if ipv6:
355 self.assertRegex(out, br'2600::')
356 else:
357 self.assertNotIn(br'2600::', out)
358 self.assertRegex(out, br'fe80::')
359 self.assertRegex(out, br'Gateway:\s+192.168.5.1')
360 self.assertRegex(out, br'DNS:\s+192.168.5.1')
361 except (AssertionError, subprocess.CalledProcessError):
362 # show networkd status, journal, and DHCP server log on failure
363 with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
364 print('\n---- {} ----\n{}'.format(self.config, f.read()))
365 print('---- interface status ----')
366 sys.stdout.flush()
367 subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
368 print('---- networkctl status {} ----'.format(self.iface))
369 sys.stdout.flush()
370 subprocess.call(['networkctl', 'status', self.iface])
371 self.show_journal('systemd-networkd.service')
372 self.print_server_log()
373 raise
374
375 for timeout in range(50):
376 with open(RESOLV_CONF) as f:
377 contents = f.read()
378 if 'nameserver 192.168.5.1\n' in contents:
379 break
380 time.sleep(0.1)
381 else:
382 self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF)
383
384 if coldplug is False:
385 # check post-down.d hook
386 self.shutdown_iface()
387
388 def test_coldplug_dhcp_yes_ip4(self):
389 # we have a 12s timeout on RA, so we need to wait longer
390 self.do_test(coldplug=True, ipv6=False, online_timeout=15)
391
392 def test_coldplug_dhcp_yes_ip4_no_ra(self):
393 # with disabling RA explicitly things should be fast
394 self.do_test(coldplug=True, ipv6=False,
395 extra_opts='IPv6AcceptRA=False')
396
397 def test_coldplug_dhcp_ip4_only(self):
398 # we have a 12s timeout on RA, so we need to wait longer
399 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
400 online_timeout=15)
401
402 def test_coldplug_dhcp_ip4_only_no_ra(self):
403 # with disabling RA explicitly things should be fast
404 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
405 extra_opts='IPv6AcceptRA=False')
406
407 def test_coldplug_dhcp_ip6(self):
408 self.do_test(coldplug=True, ipv6=True)
409
410 def test_hotplug_dhcp_ip4(self):
411 # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
412 self.do_test(coldplug=False, ipv6=False, online_timeout=15)
413
414 def test_hotplug_dhcp_ip6(self):
415 self.do_test(coldplug=False, ipv6=True)
416
417 def test_route_only_dns(self):
418 self.write_network('myvpn.netdev', '''\
419 [NetDev]
420 Name=dummy0
421 Kind=dummy
422 MACAddress=12:34:56:78:9a:bc''')
423 self.write_network('myvpn.network', '''\
424 [Match]
425 Name=dummy0
426 [Network]
427 Address=192.168.42.100
428 DNS=192.168.42.1
429 Domains= ~company''')
430
431 self.do_test(coldplug=True, ipv6=False,
432 extra_opts='IPv6AcceptRouterAdvertisements=False')
433
434 with open(RESOLV_CONF) as f:
435 contents = f.read()
436 # ~company is not a search domain, only a routing domain
437 self.assertNotRegex(contents, 'search.*company')
438 # our global server should appear
439 self.assertIn('nameserver 192.168.5.1\n', contents)
440 # should not have domain-restricted server as global server
441 self.assertNotIn('nameserver 192.168.42.1\n', contents)
442
443 def test_route_only_dns_all_domains(self):
444 self.write_network('myvpn.netdev', '''[NetDev]
445 Name=dummy0
446 Kind=dummy
447 MACAddress=12:34:56:78:9a:bc''')
448 self.write_network('myvpn.network', '''[Match]
449 Name=dummy0
450 [Network]
451 Address=192.168.42.100
452 DNS=192.168.42.1
453 Domains= ~company ~.''')
454
455 self.do_test(coldplug=True, ipv6=False,
456 extra_opts='IPv6AcceptRouterAdvertisements=False')
457
458 with open(RESOLV_CONF) as f:
459 contents = f.read()
460
461 # ~company is not a search domain, only a routing domain
462 self.assertNotRegex(contents, 'search.*company')
463
464 # our global server should appear
465 self.assertIn('nameserver 192.168.5.1\n', contents)
466 # should have company server as global server due to ~.
467 self.assertIn('nameserver 192.168.42.1\n', contents)
468
469
470 @unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
471 class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
472 '''Test networkd client against dnsmasq'''
473
474 def setUp(self):
475 super().setUp()
476 self.dnsmasq = None
477 self.iface_mac = 'de:ad:be:ef:47:11'
478
479 def create_iface(self, ipv6=False, dnsmasq_opts=None):
480 '''Create test interface with DHCP server behind it'''
481
482 # add veth pair
483 subprocess.check_call(['ip', 'link', 'add', 'name', self.iface,
484 'address', self.iface_mac,
485 'type', 'veth', 'peer', 'name', self.if_router])
486
487 # give our router an IP
488 subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
489 subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
490 if ipv6:
491 subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
492 subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])
493
494 # add DHCP server
495 self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
496 lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
497 if ipv6:
498 extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
499 else:
500 extra_opts = []
501 if dnsmasq_opts:
502 extra_opts += dnsmasq_opts
503 self.dnsmasq = subprocess.Popen(
504 ['dnsmasq', '--keep-in-foreground', '--log-queries',
505 '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
506 '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
507 '--interface=' + self.if_router, '--except-interface=lo',
508 '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)
509
510 def shutdown_iface(self):
511 '''Remove test interface and stop DHCP server'''
512
513 if self.if_router:
514 subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
515 self.if_router = None
516 if self.dnsmasq:
517 self.dnsmasq.kill()
518 self.dnsmasq.wait()
519 self.dnsmasq = None
520
521 def print_server_log(self):
522 '''Print DHCP server log for debugging failures'''
523
524 with open(self.dnsmasq_log) as f:
525 sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
526
527 def test_resolved_domain_restricted_dns(self):
528 '''resolved: domain-restricted DNS servers'''
529
530 # create interface for generic connections; this will map all DNS names
531 # to 192.168.42.1
532 self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
533 self.write_network('general.network', '''\
534 [Match]
535 Name={}
536 [Network]
537 DHCP=ipv4
538 IPv6AcceptRA=False'''.format(self.iface))
539
540 # create second device/dnsmasq for a .company/.lab VPN interface
541 # static IPs for simplicity
542 self.add_veth_pair('testvpnclient', 'testvpnrouter')
543 subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
544 subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
545 subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
546
547 vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log')
548 vpn_dnsmasq = subprocess.Popen(
549 ['dnsmasq', '--keep-in-foreground', '--log-queries',
550 '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null',
551 '--dhcp-leasefile=/dev/null', '--bind-interfaces',
552 '--interface=testvpnrouter', '--except-interface=lo',
553 '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
554 self.addCleanup(vpn_dnsmasq.wait)
555 self.addCleanup(vpn_dnsmasq.kill)
556
557 self.write_network('vpn.network', '''\
558 [Match]
559 Name=testvpnclient
560 [Network]
561 IPv6AcceptRA=False
562 Address=10.241.3.2/24
563 DNS=10.241.3.1
564 Domains= ~company ~lab''')
565
566 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
567 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
568 '--interface=testvpnclient', '--timeout=20'])
569
570 # ensure we start fresh with every test
571 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved'])
572
573 # test vpnclient specific domains; these should *not* be answered by
574 # the general DNS
575 out = subprocess.check_output(['systemd-resolve', 'math.lab'])
576 self.assertIn(b'math.lab: 10.241.3.3', out)
577 out = subprocess.check_output(['systemd-resolve', 'kettle.cantina.company'])
578 self.assertIn(b'kettle.cantina.company: 10.241.4.4', out)
579
580 # test general domains
581 out = subprocess.check_output(['systemd-resolve', 'megasearch.net'])
582 self.assertIn(b'megasearch.net: 192.168.42.1', out)
583
584 with open(self.dnsmasq_log) as f:
585 general_log = f.read()
586 with open(vpn_dnsmasq_log) as f:
587 vpn_log = f.read()
588
589 # VPN domains should only be sent to VPN DNS
590 self.assertRegex(vpn_log, 'query.*math.lab')
591 self.assertRegex(vpn_log, 'query.*cantina.company')
592 self.assertNotIn('.lab', general_log)
593 self.assertNotIn('.company', general_log)
594
595 # general domains should not be sent to the VPN DNS
596 self.assertRegex(general_log, 'query.*megasearch.net')
597 self.assertNotIn('megasearch.net', vpn_log)
598
599 def test_resolved_etc_hosts(self):
600 '''resolved queries to /etc/hosts'''
601
602 # FIXME: -t MX query fails with enabled DNSSEC (even when using
603 # the known negative trust anchor .internal instead of .example)
604 conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
605 os.makedirs(os.path.dirname(conf), exist_ok=True)
606 with open(conf, 'w') as f:
607 f.write('[Resolve]\nDNSSEC=no')
608 self.addCleanup(os.remove, conf)
609
610 # create /etc/hosts bind mount which resolves my.example for IPv4
611 hosts = os.path.join(self.workdir, 'hosts')
612 with open(hosts, 'w') as f:
613 f.write('172.16.99.99 my.example\n')
614 subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
615 self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
616 subprocess.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
617
618 # note: different IPv4 address here, so that it's easy to tell apart
619 # what resolved the query
620 self.create_iface(dnsmasq_opts=['--host-record=my.example,172.16.99.1,2600::99:99',
621 '--host-record=other.example,172.16.0.42,2600::42',
622 '--mx-host=example,mail.example'],
623 ipv6=True)
624 self.do_test(coldplug=None, ipv6=True)
625
626 try:
627 # family specific queries
628 out = subprocess.check_output(['systemd-resolve', '-4', 'my.example'])
629 self.assertIn(b'my.example: 172.16.99.99', out)
630 # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
631 # it's considered a sufficient source
632 self.assertNotEqual(subprocess.call(['systemd-resolve', '-6', 'my.example']), 0)
633 # "any family" query; IPv4 should come from /etc/hosts
634 out = subprocess.check_output(['systemd-resolve', 'my.example'])
635 self.assertIn(b'my.example: 172.16.99.99', out)
636 # IP → name lookup; again, takes the /etc/hosts one
637 out = subprocess.check_output(['systemd-resolve', '172.16.99.99'])
638 self.assertIn(b'172.16.99.99: my.example', out)
639
640 # non-address RRs should fall back to DNS
641 out = subprocess.check_output(['systemd-resolve', '--type=MX', 'example'])
642 self.assertIn(b'example IN MX 1 mail.example', out)
643
644 # other domains query DNS
645 out = subprocess.check_output(['systemd-resolve', 'other.example'])
646 self.assertIn(b'172.16.0.42', out)
647 out = subprocess.check_output(['systemd-resolve', '172.16.0.42'])
648 self.assertIn(b'172.16.0.42: other.example', out)
649 except (AssertionError, subprocess.CalledProcessError):
650 self.show_journal('systemd-resolved.service')
651 self.print_server_log()
652 raise
653
654 def test_transient_hostname(self):
655 '''networkd sets transient hostname from DHCP'''
656
657 orig_hostname = socket.gethostname()
658 self.addCleanup(socket.sethostname, orig_hostname)
659 # temporarily move /etc/hostname away; restart hostnamed to pick it up
660 if os.path.exists('/etc/hostname'):
661 subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
662 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
663 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
664
665 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
666 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
667
668 try:
669 # should have received the fixed IP above
670 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
671 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
672 # should have set transient hostname in hostnamed; this is
673 # sometimes a bit lagging (issue #4753), so retry a few times
674 for retry in range(1, 6):
675 out = subprocess.check_output(['hostnamectl'])
676 if b'testgreen' in out:
677 break
678 time.sleep(5)
679 sys.stdout.write('[retry %i] ' % retry)
680 sys.stdout.flush()
681 else:
682 self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
683 # and also applied to the system
684 self.assertEqual(socket.gethostname(), 'testgreen')
685 except AssertionError:
686 self.show_journal('systemd-networkd.service')
687 self.show_journal('systemd-hostnamed.service')
688 self.print_server_log()
689 raise
690
691 def test_transient_hostname_with_static(self):
692 '''transient hostname is not applied if static hostname exists'''
693
694 orig_hostname = socket.gethostname()
695 self.addCleanup(socket.sethostname, orig_hostname)
696 if not os.path.exists('/etc/hostname'):
697 self.writeConfig('/etc/hostname', orig_hostname)
698 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
699
700 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
701 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
702
703 try:
704 # should have received the fixed IP above
705 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
706 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
707 # static hostname wins over transient one, thus *not* applied
708 self.assertEqual(socket.gethostname(), orig_hostname)
709 except AssertionError:
710 self.show_journal('systemd-networkd.service')
711 self.show_journal('systemd-hostnamed.service')
712 self.print_server_log()
713 raise
714
715
716 class NetworkdClientTest(ClientTestBase, unittest.TestCase):
717 '''Test networkd client against networkd server'''
718
719 def setUp(self):
720 super().setUp()
721 self.dnsmasq = None
722
723 def create_iface(self, ipv6=False, dhcpserver_opts=None):
724 '''Create test interface with DHCP server behind it'''
725
726 # run "router-side" networkd in own mount namespace to shield it from
727 # "client-side" configuration and networkd
728 (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh')
729 self.addCleanup(os.remove, script)
730 with os.fdopen(fd, 'w+') as f:
731 f.write('''\
732 #!/bin/sh
733 set -eu
734 mkdir -p /run/systemd/network
735 mkdir -p /run/systemd/netif
736 mount -t tmpfs none /run/systemd/network
737 mount -t tmpfs none /run/systemd/netif
738 [ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
739 # create router/client veth pair
740 cat << EOF > /run/systemd/network/test.netdev
741 [NetDev]
742 Name=%(ifr)s
743 Kind=veth
744
745 [Peer]
746 Name=%(ifc)s
747 EOF
748
749 cat << EOF > /run/systemd/network/test.network
750 [Match]
751 Name=%(ifr)s
752
753 [Network]
754 Address=192.168.5.1/24
755 %(addr6)s
756 DHCPServer=yes
757
758 [DHCPServer]
759 PoolOffset=10
760 PoolSize=50
761 DNS=192.168.5.1
762 %(dhopts)s
763 EOF
764
765 # run networkd as in systemd-networkd.service
766 exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
767 ''' % {'ifr': self.if_router, 'ifc': self.iface, 'addr6': ipv6 and 'Address=2600::1/64' or '',
768 'dhopts': dhcpserver_opts or ''})
769
770 os.fchmod(fd, 0o755)
771
772 subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service',
773 '-p', 'InaccessibleDirectories=-/etc/systemd/network',
774 '-p', 'InaccessibleDirectories=-/run/systemd/network',
775 '-p', 'InaccessibleDirectories=-/run/systemd/netif',
776 '--service-type=notify', script])
777
778 # wait until devices got created
779 for _ in range(50):
780 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
781 if b'state UP' in out and b'scope global' in out:
782 break
783 time.sleep(0.1)
784
785 def shutdown_iface(self):
786 '''Remove test interface and stop DHCP server'''
787
788 if self.if_router:
789 subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
790 # ensure failed transient unit does not stay around
791 subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
792 subprocess.call(['ip', 'link', 'del', 'dev', self.if_router])
793 self.if_router = None
794
795 def print_server_log(self):
796 '''Print DHCP server log for debugging failures'''
797
798 self.show_journal('networkd-test-router.service')
799
800 @unittest.skip('networkd does not have DHCPv6 server support')
801 def test_hotplug_dhcp_ip6(self):
802 pass
803
804 @unittest.skip('networkd does not have DHCPv6 server support')
805 def test_coldplug_dhcp_ip6(self):
806 pass
807
808 def test_search_domains(self):
809
810 # we don't use this interface for this test
811 self.if_router = None
812
813 self.write_network('test.netdev', '''\
814 [NetDev]
815 Name=dummy0
816 Kind=dummy
817 MACAddress=12:34:56:78:9a:bc''')
818 self.write_network('test.network', '''\
819 [Match]
820 Name=dummy0
821 [Network]
822 Address=192.168.42.100
823 DNS=192.168.42.1
824 Domains= one two three four five six seven eight nine ten''')
825
826 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
827
828 for timeout in range(50):
829 with open(RESOLV_CONF) as f:
830 contents = f.read()
831 if ' one' in contents:
832 break
833 time.sleep(0.1)
834 self.assertRegex(contents, 'search .*one two three four')
835 self.assertNotIn('seven\n', contents)
836 self.assertIn('# Too many search domains configured, remaining ones ignored.\n', contents)
837
838 def test_search_domains_too_long(self):
839
840 # we don't use this interface for this test
841 self.if_router = None
842
843 name_prefix = 'a' * 60
844
845 self.write_network('test.netdev', '''\
846 [NetDev]
847 Name=dummy0
848 Kind=dummy
849 MACAddress=12:34:56:78:9a:bc''')
850 self.write_network('test.network', '''\
851 [Match]
852 Name=dummy0
853 [Network]
854 Address=192.168.42.100
855 DNS=192.168.42.1
856 Domains={p}0 {p}1 {p}2 {p}3 {p}4'''.format(p=name_prefix))
857
858 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
859
860 for timeout in range(50):
861 with open(RESOLV_CONF) as f:
862 contents = f.read()
863 if ' one' in contents:
864 break
865 time.sleep(0.1)
866 self.assertRegex(contents, 'search .*{p}0 {p}1 {p}2'.format(p=name_prefix))
867 self.assertIn('# Total length of all search domains is too long, remaining ones ignored.', contents)
868
869 def test_dropin(self):
870 # we don't use this interface for this test
871 self.if_router = None
872
873 self.write_network('test.netdev', '''\
874 [NetDev]
875 Name=dummy0
876 Kind=dummy
877 MACAddress=12:34:56:78:9a:bc''')
878 self.write_network('test.network', '''\
879 [Match]
880 Name=dummy0
881 [Network]
882 Address=192.168.42.100
883 DNS=192.168.42.1''')
884 self.write_network_dropin('test.network', 'dns', '''\
885 [Network]
886 DNS=127.0.0.1''')
887
888 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
889
890 for timeout in range(50):
891 with open(RESOLV_CONF) as f:
892 contents = f.read()
893 if ' 127.0.0.1' in contents:
894 break
895 time.sleep(0.1)
896 self.assertIn('nameserver 192.168.42.1\n', contents)
897 self.assertIn('nameserver 127.0.0.1\n', contents)
898
899 def test_dhcp_timezone(self):
900 '''networkd sets time zone from DHCP'''
901
902 def get_tz():
903 out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
904 '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
905 assert out.startswith(b's "')
906 out = out.strip()
907 assert out.endswith(b'"')
908 return out[3:-1].decode()
909
910 orig_timezone = get_tz()
911 self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone])
912
913 self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
914 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4')
915
916 # should have applied the received timezone
917 try:
918 self.assertEqual(get_tz(), 'Pacific/Honolulu')
919 except AssertionError:
920 self.show_journal('systemd-networkd.service')
921 self.show_journal('systemd-hostnamed.service')
922 raise
923
924
925 class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
926 """Test [Match] sections in .network files.
927
928 Be aware that matching the test host's interfaces will wipe their
929 configuration, so as a precaution, all network files should have a
930 restrictive [Match] section to only ever interfere with the
931 temporary veth interfaces created here.
932 """
933
934 def tearDown(self):
935 """Stop networkd."""
936 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
937
938 def test_basic_matching(self):
939 """Verify the Name= line works throughout this class."""
940 self.add_veth_pair('test_if1', 'fake_if2')
941 self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
942 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
943 self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
944
945 def test_inverted_matching(self):
946 """Verify that a '!'-prefixed value inverts the match."""
947 # Use a MAC address as the interfaces' common matching attribute
948 # to avoid depending on udev, to support testing in containers.
949 mac = '00:01:02:03:98:99'
950 self.add_veth_pair('test_veth', 'test_peer',
951 ['addr', mac], ['addr', mac])
952 self.write_network('no-veth.network', """\
953 [Match]
954 MACAddress={}
955 Name=!nonexistent *peer*
956 [Network]""".format(mac))
957 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
958 self.assert_link_states(test_veth='managed', test_peer='unmanaged')
959
960
961 class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
962 """Test if networkd manages the correct interfaces."""
963
964 def setUp(self):
965 """Write .network files to match the named veth devices."""
966 # Define the veth+peer pairs to be created.
967 # Their pairing doesn't actually matter, only their names do.
968 self.veths = {
969 'm1def': 'm0unm',
970 'm1man': 'm1unm',
971 }
972
973 # Define the contents of .network files to be read in order.
974 self.configs = (
975 "[Match]\nName=m1def\n",
976 "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
977 "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
978 )
979
980 # Write out the .network files to be cleaned up automatically.
981 for i, config in enumerate(self.configs):
982 self.write_network("%02d-test.network" % i, config)
983
984 def tearDown(self):
985 """Stop networkd."""
986 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
987
988 def create_iface(self):
989 """Create temporary veth pairs for interface matching."""
990 for veth, peer in self.veths.items():
991 self.add_veth_pair(veth, peer)
992
993 def test_unmanaged_setting(self):
994 """Verify link states with Unmanaged= settings, hot-plug."""
995 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
996 self.create_iface()
997 self.assert_link_states(m1def='managed',
998 m1man='managed',
999 m1unm='unmanaged',
1000 m0unm='unmanaged')
1001
1002 def test_unmanaged_setting_coldplug(self):
1003 """Verify link states with Unmanaged= settings, cold-plug."""
1004 self.create_iface()
1005 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1006 self.assert_link_states(m1def='managed',
1007 m1man='managed',
1008 m1unm='unmanaged',
1009 m0unm='unmanaged')
1010
1011 def test_catchall_config(self):
1012 """Verify link states with a catch-all config, hot-plug."""
1013 # Don't actually catch ALL interfaces. It messes up the host.
1014 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1015 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1016 self.create_iface()
1017 self.assert_link_states(m1def='managed',
1018 m1man='managed',
1019 m1unm='unmanaged',
1020 m0unm='managed')
1021
1022 def test_catchall_config_coldplug(self):
1023 """Verify link states with a catch-all config, cold-plug."""
1024 # Don't actually catch ALL interfaces. It messes up the host.
1025 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1026 self.create_iface()
1027 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1028 self.assert_link_states(m1def='managed',
1029 m1man='managed',
1030 m1unm='unmanaged',
1031 m0unm='managed')
1032
1033
1034 if __name__ == '__main__':
1035 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
1036 verbosity=2))