]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/networkd-test.py
Merge pull request #12802 from irtimmer/fix-openssl
[thirdparty/systemd.git] / test / networkd-test.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: LGPL-2.1+
3 #
4 # networkd integration test
5 # This uses temporary configuration in /run and temporary veth devices, and
6 # does not write anything on disk or change any system configuration;
7 # but it assumes (and checks at the beginning) that networkd is not currently
8 # running.
9 #
10 # This can be run on a normal installation, in QEMU, nspawn (with
11 # --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
12 # or LXC system containers. You need at least the "ip" tool from the iproute
13 # package; it is recommended to install dnsmasq too to get full test coverage.
14 #
15 # ATTENTION: This uses the *installed* networkd, not the one from the built
16 # source tree.
17 #
18 # © 2015 Canonical Ltd.
19 # Author: Martin Pitt <martin.pitt@ubuntu.com>
20
21 import errno
22 import os
23 import shutil
24 import socket
25 import subprocess
26 import sys
27 import tempfile
28 import time
29 import unittest
30
31 HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
32 IS_CONTAINER = subprocess.call(['systemd-detect-virt', '--quiet', '--container']) == 0
33
34 NETWORK_UNITDIR = '/run/systemd/network'
35
36 NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
37 path='/usr/lib/systemd:/lib/systemd')
38
39 RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
40
41 tmpmounts = []
42 running_units = []
43 stopped_units = []
44
45
46 def setUpModule():
47 global tmpmounts
48
49 """Initialize the environment, and perform sanity checks on it."""
50 if NETWORKD_WAIT_ONLINE is None:
51 raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
52
53 # Do not run any tests if the system is using networkd already and it's not virtualized
54 if (subprocess.call(['systemctl', 'is-active', '--quiet', 'systemd-networkd.service']) == 0 and
55 subprocess.call(['systemd-detect-virt', '--quiet']) != 0):
56 raise unittest.SkipTest('not virtualized and networkd is already active')
57
58 # Ensure we don't mess with an existing networkd config
59 for u in ['systemd-networkd.socket', 'systemd-networkd', 'systemd-resolved']:
60 if subprocess.call(['systemctl', 'is-active', '--quiet', u]) == 0:
61 subprocess.call(['systemctl', 'stop', u])
62 running_units.append(u)
63 else:
64 stopped_units.append(u)
65
66 # create static systemd-network user for networkd-test-router.service (it
67 # needs to do some stuff as root and can't start as user; but networkd
68 # still insists on the user)
69 subprocess.call(['adduser', '--system', '--no-create-home', 'systemd-network'])
70
71 for d in ['/etc/systemd/network', '/run/systemd/network',
72 '/run/systemd/netif', '/run/systemd/resolve']:
73 if os.path.isdir(d):
74 subprocess.check_call(["mount", "-t", "tmpfs", "none", d])
75 tmpmounts.append(d)
76 if os.path.isdir('/run/systemd/resolve'):
77 os.chmod('/run/systemd/resolve', 0o755)
78 shutil.chown('/run/systemd/resolve', 'systemd-resolve', 'systemd-resolve')
79 if os.path.isdir('/run/systemd/netif'):
80 os.chmod('/run/systemd/netif', 0o755)
81 shutil.chown('/run/systemd/netif', 'systemd-network', 'systemd-network')
82
83 # Avoid "Failed to open /dev/tty" errors in containers.
84 os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
85
86 # Ensure the unit directory exists so tests can dump files into it.
87 os.makedirs(NETWORK_UNITDIR, exist_ok=True)
88
89
90 def tearDownModule():
91 global tmpmounts
92 for d in tmpmounts:
93 subprocess.check_call(["umount", d])
94 for u in stopped_units:
95 subprocess.call(["systemctl", "stop", u])
96 for u in running_units:
97 subprocess.call(["systemctl", "restart", u])
98
99
100 class NetworkdTestingUtilities:
101 """Provide a set of utility functions to facilitate networkd tests.
102
103 This class must be inherited along with unittest.TestCase to define
104 some required methods.
105 """
106
107 def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
108 """Add a veth interface pair, and queue them to be removed."""
109 subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
110 list(veth_options) +
111 ['type', 'veth', 'peer', 'name', peer] +
112 list(peer_options))
113 self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
114
115 def write_config(self, path, contents):
116 """"Write a configuration file, and queue it to be removed."""
117
118 with open(path, 'w') as f:
119 f.write(contents)
120
121 self.addCleanup(os.remove, path)
122
123 def write_network(self, unit_name, contents):
124 """Write a network unit file, and queue it to be removed."""
125 self.write_config(os.path.join(NETWORK_UNITDIR, unit_name), contents)
126
127 def write_network_dropin(self, unit_name, dropin_name, contents):
128 """Write a network unit drop-in, and queue it to be removed."""
129 dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
130 dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
131
132 os.makedirs(dropin_dir, exist_ok=True)
133 self.addCleanup(os.rmdir, dropin_dir)
134 with open(dropin_path, 'w') as dropin:
135 dropin.write(contents)
136 self.addCleanup(os.remove, dropin_path)
137
138 def read_attr(self, link, attribute):
139 """Read a link attributed from the sysfs."""
140 # Note we we don't want to check if interface `link' is managed, we
141 # want to evaluate link variable and pass the value of the link to
142 # assert_link_states e.g. eth0=managed.
143 self.assert_link_states(**{link:'managed'})
144 with open(os.path.join('/sys/class/net', link, attribute)) as f:
145 return f.readline().strip()
146
147 def assert_link_states(self, **kwargs):
148 """Match networkctl link states to the given ones.
149
150 Each keyword argument should be the name of a network interface
151 with its expected value of the "SETUP" column in output from
152 networkctl. The interfaces have five seconds to come online
153 before the check is performed. Every specified interface must
154 be present in the output, and any other interfaces found in the
155 output are ignored.
156
157 A special interface state "managed" is supported, which matches
158 any value in the "SETUP" column other than "unmanaged".
159 """
160 if not kwargs:
161 return
162 interfaces = set(kwargs)
163
164 # Wait for the requested interfaces, but don't fail for them.
165 subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
166 ['--interface={}'.format(iface) for iface in kwargs])
167
168 # Validate each link state found in the networkctl output.
169 out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
170 for line in out.decode('utf-8').split('\n'):
171 fields = line.split()
172 if len(fields) >= 5 and fields[1] in kwargs:
173 iface = fields[1]
174 expected = kwargs[iface]
175 actual = fields[-1]
176 if (actual != expected and
177 not (expected == 'managed' and actual != 'unmanaged')):
178 self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
179 interfaces.remove(iface)
180
181 # Ensure that all requested interfaces have been covered.
182 if interfaces:
183 self.fail("Missing links in status output: {}".format(interfaces))
184
185
186 class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
187 """Provide common methods for testing networkd against servers."""
188
189 def setUp(self):
190 self.write_network('port1.netdev', '''\
191 [NetDev]
192 Name=port1
193 Kind=dummy
194 MACAddress=12:34:56:78:9a:bc''')
195 self.write_network('port2.netdev', '''\
196 [NetDev]
197 Name=port2
198 Kind=dummy
199 MACAddress=12:34:56:78:9a:bd''')
200 self.write_network('mybridge.netdev', '''\
201 [NetDev]
202 Name=mybridge
203 Kind=bridge''')
204 self.write_network('port1.network', '''\
205 [Match]
206 Name=port1
207 [Network]
208 Bridge=mybridge''')
209 self.write_network('port2.network', '''\
210 [Match]
211 Name=port2
212 [Network]
213 Bridge=mybridge''')
214 self.write_network('mybridge.network', '''\
215 [Match]
216 Name=mybridge
217 [Network]
218 DNS=192.168.250.1
219 Address=192.168.250.33/24
220 Gateway=192.168.250.1''')
221 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
222 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
223
224 def tearDown(self):
225 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd'])
226 subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
227 subprocess.check_call(['ip', 'link', 'del', 'port1'])
228 subprocess.check_call(['ip', 'link', 'del', 'port2'])
229
230 def test_bridge_init(self):
231 self.assert_link_states(
232 port1='managed',
233 port2='managed',
234 mybridge='managed')
235
236 def test_bridge_port_priority(self):
237 self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
238 self.write_network_dropin('port1.network', 'priority', '''\
239 [Bridge]
240 Priority=28
241 ''')
242 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
243 self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
244
245 def test_bridge_port_priority_set_zero(self):
246 """It should be possible to set the bridge port priority to 0"""
247 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
248 self.write_network_dropin('port2.network', 'priority', '''\
249 [Bridge]
250 Priority=0
251 ''')
252 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
253 self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
254
255 def test_bridge_port_property(self):
256 """Test the "[Bridge]" section keys"""
257 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
258 self.write_network_dropin('port2.network', 'property', '''\
259 [Bridge]
260 UnicastFlood=true
261 HairPin=true
262 UseBPDU=true
263 FastLeave=true
264 AllowPortToBeRoot=true
265 Cost=555
266 Priority=23
267 ''')
268 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
269
270 self.assertEqual(self.read_attr('port2', 'brport/priority'), '23')
271 self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1')
272 self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555')
273 self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1')
274 self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1')
275 self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1')
276 self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1')
277
278 class ClientTestBase(NetworkdTestingUtilities):
279 """Provide common methods for testing networkd against servers."""
280
281 @classmethod
282 def setUpClass(klass):
283 klass.orig_log_level = subprocess.check_output(
284 ['systemctl', 'show', '--value', '--property', 'LogLevel'],
285 universal_newlines=True).strip()
286 subprocess.check_call(['systemd-analyze', 'log-level', 'debug'])
287
288 @classmethod
289 def tearDownClass(klass):
290 subprocess.check_call(['systemd-analyze', 'log-level', klass.orig_log_level])
291
292 def setUp(self):
293 self.iface = 'test_eth42'
294 self.if_router = 'router_eth42'
295 self.workdir_obj = tempfile.TemporaryDirectory()
296 self.workdir = self.workdir_obj.name
297 self.config = 'test_eth42.network'
298
299 # get current journal cursor
300 subprocess.check_output(['journalctl', '--sync'])
301 out = subprocess.check_output(['journalctl', '-b', '--quiet',
302 '--no-pager', '-n0', '--show-cursor'],
303 universal_newlines=True)
304 self.assertTrue(out.startswith('-- cursor:'))
305 self.journal_cursor = out.split()[-1]
306
307 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
308
309 def tearDown(self):
310 self.shutdown_iface()
311 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
312 subprocess.call(['ip', 'link', 'del', 'dummy0'],
313 stderr=subprocess.DEVNULL)
314
315 def show_journal(self, unit):
316 '''Show journal of given unit since start of the test'''
317
318 print('---- {} ----'.format(unit))
319 subprocess.check_output(['journalctl', '--sync'])
320 sys.stdout.flush()
321 subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
322 '--cursor', self.journal_cursor, '-u', unit])
323
324 def create_iface(self, ipv6=False):
325 '''Create test interface with DHCP server behind it'''
326
327 raise NotImplementedError('must be implemented by a subclass')
328
329 def shutdown_iface(self):
330 '''Remove test interface and stop DHCP server'''
331
332 raise NotImplementedError('must be implemented by a subclass')
333
334 def print_server_log(self):
335 '''Print DHCP server log for debugging failures'''
336
337 raise NotImplementedError('must be implemented by a subclass')
338
339 def start_unit(self, unit):
340 try:
341 subprocess.check_call(['systemctl', 'start', unit])
342 except subprocess.CalledProcessError:
343 self.show_journal(unit)
344 raise
345
346 def do_test(self, coldplug=True, ipv6=False, extra_opts='',
347 online_timeout=10, dhcp_mode='yes'):
348 self.start_unit('systemd-resolved')
349 self.write_network(self.config, '''\
350 [Match]
351 Name={}
352 [Network]
353 DHCP={}
354 {}'''.format(self.iface, dhcp_mode, extra_opts))
355
356 if coldplug:
357 # create interface first, then start networkd
358 self.create_iface(ipv6=ipv6)
359 self.start_unit('systemd-networkd')
360 elif coldplug is not None:
361 # start networkd first, then create interface
362 self.start_unit('systemd-networkd')
363 self.create_iface(ipv6=ipv6)
364 else:
365 # "None" means test sets up interface by itself
366 self.start_unit('systemd-networkd')
367
368 try:
369 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
370 self.iface, '--timeout=%i' % online_timeout])
371
372 if ipv6:
373 # check iface state and IP 6 address; FIXME: we need to wait a bit
374 # longer, as the iface is "configured" already with IPv4 *or*
375 # IPv6, but we want to wait for both
376 for _ in range(10):
377 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
378 if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out:
379 break
380 time.sleep(1)
381 else:
382 self.fail('timed out waiting for IPv6 configuration')
383
384 self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
385 self.assertRegex(out, b'inet6 fe80::.* scope link')
386 else:
387 # should have link-local address on IPv6 only
388 out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
389 self.assertRegex(out, br'inet6 fe80::.* scope link')
390 self.assertNotIn(b'scope global', out)
391
392 # should have IPv4 address
393 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
394 self.assertIn(b'state UP', out)
395 self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
396
397 # check networkctl state
398 out = subprocess.check_output(['networkctl'])
399 self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
400 self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
401
402 out = subprocess.check_output(['networkctl', 'status', self.iface])
403 self.assertRegex(out, br'Type:\s+ether')
404 self.assertRegex(out, br'State:\s+routable.*configured')
405 self.assertRegex(out, br'Address:\s+192.168.5.\d+')
406 if ipv6:
407 self.assertRegex(out, br'2600::')
408 else:
409 self.assertNotIn(br'2600::', out)
410 self.assertRegex(out, br'fe80::')
411 self.assertRegex(out, br'Gateway:\s+192.168.5.1')
412 self.assertRegex(out, br'DNS:\s+192.168.5.1')
413 except (AssertionError, subprocess.CalledProcessError):
414 # show networkd status, journal, and DHCP server log on failure
415 with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
416 print('\n---- {} ----\n{}'.format(self.config, f.read()))
417 print('---- interface status ----')
418 sys.stdout.flush()
419 subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
420 print('---- networkctl status {} ----'.format(self.iface))
421 sys.stdout.flush()
422 subprocess.call(['networkctl', 'status', self.iface])
423 self.show_journal('systemd-networkd.service')
424 self.print_server_log()
425 raise
426
427 for timeout in range(50):
428 with open(RESOLV_CONF) as f:
429 contents = f.read()
430 if 'nameserver 192.168.5.1\n' in contents:
431 break
432 time.sleep(0.1)
433 else:
434 self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF)
435
436 if coldplug is False:
437 # check post-down.d hook
438 self.shutdown_iface()
439
440 def test_coldplug_dhcp_yes_ip4(self):
441 # we have a 12s timeout on RA, so we need to wait longer
442 self.do_test(coldplug=True, ipv6=False, online_timeout=15)
443
444 def test_coldplug_dhcp_yes_ip4_no_ra(self):
445 # with disabling RA explicitly things should be fast
446 self.do_test(coldplug=True, ipv6=False,
447 extra_opts='IPv6AcceptRA=False')
448
449 def test_coldplug_dhcp_ip4_only(self):
450 # we have a 12s timeout on RA, so we need to wait longer
451 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
452 online_timeout=15)
453
454 def test_coldplug_dhcp_ip4_only_no_ra(self):
455 # with disabling RA explicitly things should be fast
456 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
457 extra_opts='IPv6AcceptRA=False')
458
459 def test_coldplug_dhcp_ip6(self):
460 self.do_test(coldplug=True, ipv6=True)
461
462 def test_hotplug_dhcp_ip4(self):
463 # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
464 self.do_test(coldplug=False, ipv6=False, online_timeout=15)
465
466 def test_hotplug_dhcp_ip6(self):
467 self.do_test(coldplug=False, ipv6=True)
468
469 def test_route_only_dns(self):
470 self.write_network('myvpn.netdev', '''\
471 [NetDev]
472 Name=dummy0
473 Kind=dummy
474 MACAddress=12:34:56:78:9a:bc''')
475 self.write_network('myvpn.network', '''\
476 [Match]
477 Name=dummy0
478 [Network]
479 Address=192.168.42.100/24
480 DNS=192.168.42.1
481 Domains= ~company''')
482
483 try:
484 self.do_test(coldplug=True, ipv6=False,
485 extra_opts='IPv6AcceptRouterAdvertisements=False')
486 except subprocess.CalledProcessError as e:
487 # networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848
488 if IS_CONTAINER and e.cmd == ['systemctl', 'start', 'systemd-networkd']:
489 raise unittest.SkipTest('https://github.com/systemd/systemd/issues/11848')
490 else:
491 raise
492
493 with open(RESOLV_CONF) as f:
494 contents = f.read()
495 # ~company is not a search domain, only a routing domain
496 self.assertNotRegex(contents, 'search.*company')
497 # our global server should appear
498 self.assertIn('nameserver 192.168.5.1\n', contents)
499 # should not have domain-restricted server as global server
500 self.assertNotIn('nameserver 192.168.42.1\n', contents)
501
502 def test_route_only_dns_all_domains(self):
503 self.write_network('myvpn.netdev', '''[NetDev]
504 Name=dummy0
505 Kind=dummy
506 MACAddress=12:34:56:78:9a:bc''')
507 self.write_network('myvpn.network', '''[Match]
508 Name=dummy0
509 [Network]
510 Address=192.168.42.100/24
511 DNS=192.168.42.1
512 Domains= ~company ~.''')
513
514 try:
515 self.do_test(coldplug=True, ipv6=False,
516 extra_opts='IPv6AcceptRouterAdvertisements=False')
517 except subprocess.CalledProcessError as e:
518 # networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848
519 if IS_CONTAINER and e.cmd == ['systemctl', 'start', 'systemd-networkd']:
520 raise unittest.SkipTest('https://github.com/systemd/systemd/issues/11848')
521 else:
522 raise
523
524 with open(RESOLV_CONF) as f:
525 contents = f.read()
526
527 # ~company is not a search domain, only a routing domain
528 self.assertNotRegex(contents, 'search.*company')
529
530 # our global server should appear
531 self.assertIn('nameserver 192.168.5.1\n', contents)
532 # should have company server as global server due to ~.
533 self.assertIn('nameserver 192.168.42.1\n', contents)
534
535
536 @unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
537 class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
538 '''Test networkd client against dnsmasq'''
539
540 def setUp(self):
541 super().setUp()
542 self.dnsmasq = None
543 self.iface_mac = 'de:ad:be:ef:47:11'
544
545 def create_iface(self, ipv6=False, dnsmasq_opts=None):
546 '''Create test interface with DHCP server behind it'''
547
548 # add veth pair
549 subprocess.check_call(['ip', 'link', 'add', 'name', self.iface,
550 'address', self.iface_mac,
551 'type', 'veth', 'peer', 'name', self.if_router])
552
553 # give our router an IP
554 subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
555 subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
556 if ipv6:
557 subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
558 subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])
559
560 # add DHCP server
561 self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
562 lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
563 if ipv6:
564 extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
565 else:
566 extra_opts = []
567 if dnsmasq_opts:
568 extra_opts += dnsmasq_opts
569 self.dnsmasq = subprocess.Popen(
570 ['dnsmasq', '--keep-in-foreground', '--log-queries',
571 '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
572 '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
573 '--interface=' + self.if_router, '--except-interface=lo',
574 '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)
575
576 def shutdown_iface(self):
577 '''Remove test interface and stop DHCP server'''
578
579 if self.if_router:
580 subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
581 self.if_router = None
582 if self.dnsmasq:
583 self.dnsmasq.kill()
584 self.dnsmasq.wait()
585 self.dnsmasq = None
586
587 def print_server_log(self):
588 '''Print DHCP server log for debugging failures'''
589
590 with open(self.dnsmasq_log) as f:
591 sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
592
593 def test_resolved_domain_restricted_dns(self):
594 '''resolved: domain-restricted DNS servers'''
595
596 # FIXME: resolvectl query fails with enabled DNSSEC against our dnsmasq
597 conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
598 os.makedirs(os.path.dirname(conf), exist_ok=True)
599 with open(conf, 'w') as f:
600 f.write('[Resolve]\nDNSSEC=no\n')
601 self.addCleanup(os.remove, conf)
602
603 # create interface for generic connections; this will map all DNS names
604 # to 192.168.42.1
605 self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
606 self.write_network('general.network', '''\
607 [Match]
608 Name={}
609 [Network]
610 DHCP=ipv4
611 IPv6AcceptRA=False'''.format(self.iface))
612
613 # create second device/dnsmasq for a .company/.lab VPN interface
614 # static IPs for simplicity
615 self.add_veth_pair('testvpnclient', 'testvpnrouter')
616 subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
617 subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
618 subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
619
620 vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log')
621 vpn_dnsmasq = subprocess.Popen(
622 ['dnsmasq', '--keep-in-foreground', '--log-queries',
623 '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null',
624 '--dhcp-leasefile=/dev/null', '--bind-interfaces',
625 '--interface=testvpnrouter', '--except-interface=lo',
626 '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
627 self.addCleanup(vpn_dnsmasq.wait)
628 self.addCleanup(vpn_dnsmasq.kill)
629
630 self.write_network('vpn.network', '''\
631 [Match]
632 Name=testvpnclient
633 [Network]
634 IPv6AcceptRA=False
635 Address=10.241.3.2/24
636 DNS=10.241.3.1
637 Domains= ~company ~lab''')
638
639 self.start_unit('systemd-networkd')
640 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
641 '--interface=testvpnclient', '--timeout=20'])
642
643 # ensure we start fresh with every test
644 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved'])
645
646 # test vpnclient specific domains; these should *not* be answered by
647 # the general DNS
648 out = subprocess.check_output(['resolvectl', 'query', 'math.lab'])
649 self.assertIn(b'math.lab: 10.241.3.3', out)
650 out = subprocess.check_output(['resolvectl', 'query', 'kettle.cantina.company'])
651 self.assertIn(b'kettle.cantina.company: 10.241.4.4', out)
652
653 # test general domains
654 out = subprocess.check_output(['resolvectl', 'query', 'megasearch.net'])
655 self.assertIn(b'megasearch.net: 192.168.42.1', out)
656
657 with open(self.dnsmasq_log) as f:
658 general_log = f.read()
659 with open(vpn_dnsmasq_log) as f:
660 vpn_log = f.read()
661
662 # VPN domains should only be sent to VPN DNS
663 self.assertRegex(vpn_log, 'query.*math.lab')
664 self.assertRegex(vpn_log, 'query.*cantina.company')
665 self.assertNotIn('.lab', general_log)
666 self.assertNotIn('.company', general_log)
667
668 # general domains should not be sent to the VPN DNS
669 self.assertRegex(general_log, 'query.*megasearch.net')
670 self.assertNotIn('megasearch.net', vpn_log)
671
672 def test_resolved_etc_hosts(self):
673 '''resolved queries to /etc/hosts'''
674
675 # FIXME: -t MX query fails with enabled DNSSEC (even when using
676 # the known negative trust anchor .internal instead of .example.com)
677 conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
678 os.makedirs(os.path.dirname(conf), exist_ok=True)
679 with open(conf, 'w') as f:
680 f.write('[Resolve]\nDNSSEC=no\nLLMNR=no\nMulticastDNS=no\n')
681 self.addCleanup(os.remove, conf)
682
683 # create /etc/hosts bind mount which resolves my.example.com for IPv4
684 hosts = os.path.join(self.workdir, 'hosts')
685 with open(hosts, 'w') as f:
686 f.write('172.16.99.99 my.example.com\n')
687 subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
688 self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
689 subprocess.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
690
691 # note: different IPv4 address here, so that it's easy to tell apart
692 # what resolved the query
693 self.create_iface(dnsmasq_opts=['--host-record=my.example.com,172.16.99.1,2600::99:99',
694 '--host-record=other.example.com,172.16.0.42,2600::42',
695 '--mx-host=example.com,mail.example.com'],
696 ipv6=True)
697 self.do_test(coldplug=None, ipv6=True)
698
699 try:
700 # family specific queries
701 out = subprocess.check_output(['resolvectl', 'query', '-4', 'my.example.com'])
702 self.assertIn(b'my.example.com: 172.16.99.99', out)
703 # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
704 # it's considered a sufficient source
705 self.assertNotEqual(subprocess.call(['resolvectl', 'query', '-6', 'my.example.com']), 0)
706 # "any family" query; IPv4 should come from /etc/hosts
707 out = subprocess.check_output(['resolvectl', 'query', 'my.example.com'])
708 self.assertIn(b'my.example.com: 172.16.99.99', out)
709 # IP → name lookup; again, takes the /etc/hosts one
710 out = subprocess.check_output(['resolvectl', 'query', '172.16.99.99'])
711 self.assertIn(b'172.16.99.99: my.example.com', out)
712
713 # non-address RRs should fall back to DNS
714 out = subprocess.check_output(['resolvectl', 'query', '--type=MX', 'example.com'])
715 self.assertIn(b'example.com IN MX 1 mail.example.com', out)
716
717 # other domains query DNS
718 out = subprocess.check_output(['resolvectl', 'query', 'other.example.com'])
719 self.assertIn(b'172.16.0.42', out)
720 out = subprocess.check_output(['resolvectl', 'query', '172.16.0.42'])
721 self.assertIn(b'172.16.0.42: other.example.com', out)
722 except (AssertionError, subprocess.CalledProcessError):
723 self.show_journal('systemd-resolved.service')
724 self.print_server_log()
725 raise
726
727 def test_transient_hostname(self):
728 '''networkd sets transient hostname from DHCP'''
729
730 orig_hostname = socket.gethostname()
731 self.addCleanup(socket.sethostname, orig_hostname)
732 # temporarily move /etc/hostname away; restart hostnamed to pick it up
733 if os.path.exists('/etc/hostname'):
734 subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
735 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
736 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
737 self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
738
739 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
740 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
741
742 try:
743 # should have received the fixed IP above
744 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
745 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
746 # should have set transient hostname in hostnamed; this is
747 # sometimes a bit lagging (issue #4753), so retry a few times
748 for retry in range(1, 6):
749 out = subprocess.check_output(['hostnamectl'])
750 if b'testgreen' in out:
751 break
752 time.sleep(5)
753 sys.stdout.write('[retry %i] ' % retry)
754 sys.stdout.flush()
755 else:
756 self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
757 # and also applied to the system
758 self.assertEqual(socket.gethostname(), 'testgreen')
759 except AssertionError:
760 self.show_journal('systemd-networkd.service')
761 self.show_journal('systemd-hostnamed.service')
762 self.print_server_log()
763 raise
764
765 def test_transient_hostname_with_static(self):
766 '''transient hostname is not applied if static hostname exists'''
767
768 orig_hostname = socket.gethostname()
769 self.addCleanup(socket.sethostname, orig_hostname)
770
771 if not os.path.exists('/etc/hostname'):
772 self.write_config('/etc/hostname', "foobarqux")
773 else:
774 self.write_config('/run/hostname.tmp', "foobarqux")
775 subprocess.check_call(['mount', '--bind', '/run/hostname.tmp', '/etc/hostname'])
776 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
777
778 socket.sethostname("foobarqux");
779
780 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
781 self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
782
783 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
784 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
785
786 try:
787 # should have received the fixed IP above
788 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
789 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
790 # static hostname wins over transient one, thus *not* applied
791 self.assertEqual(socket.gethostname(), "foobarqux")
792 except AssertionError:
793 self.show_journal('systemd-networkd.service')
794 self.show_journal('systemd-hostnamed.service')
795 self.print_server_log()
796 raise
797
798
799 class NetworkdClientTest(ClientTestBase, unittest.TestCase):
800 '''Test networkd client against networkd server'''
801
802 def setUp(self):
803 super().setUp()
804 self.dnsmasq = None
805
806 def create_iface(self, ipv6=False, dhcpserver_opts=None):
807 '''Create test interface with DHCP server behind it'''
808
809 # run "router-side" networkd in own mount namespace to shield it from
810 # "client-side" configuration and networkd
811 (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh')
812 self.addCleanup(os.remove, script)
813 with os.fdopen(fd, 'w+') as f:
814 f.write('''\
815 #!/bin/sh
816 set -eu
817 mkdir -p /run/systemd/network
818 mkdir -p /run/systemd/netif
819 mount -t tmpfs none /run/systemd/network
820 mount -t tmpfs none /run/systemd/netif
821 [ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
822 # create router/client veth pair
823 cat << EOF > /run/systemd/network/test.netdev
824 [NetDev]
825 Name=%(ifr)s
826 Kind=veth
827
828 [Peer]
829 Name=%(ifc)s
830 EOF
831
832 cat << EOF > /run/systemd/network/test.network
833 [Match]
834 Name=%(ifr)s
835
836 [Network]
837 Address=192.168.5.1/24
838 %(addr6)s
839 DHCPServer=yes
840
841 [DHCPServer]
842 PoolOffset=10
843 PoolSize=50
844 DNS=192.168.5.1
845 %(dhopts)s
846 EOF
847
848 # run networkd as in systemd-networkd.service
849 exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
850 ''' % {'ifr': self.if_router, 'ifc': self.iface, 'addr6': ipv6 and 'Address=2600::1/64' or '',
851 'dhopts': dhcpserver_opts or ''})
852
853 os.fchmod(fd, 0o755)
854
855 subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service',
856 '-p', 'InaccessibleDirectories=-/etc/systemd/network',
857 '-p', 'InaccessibleDirectories=-/run/systemd/network',
858 '-p', 'InaccessibleDirectories=-/run/systemd/netif',
859 '--service-type=notify', script])
860
861 # wait until devices got created
862 for _ in range(50):
863 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
864 if b'state UP' in out and b'scope global' in out:
865 break
866 time.sleep(0.1)
867
868 def shutdown_iface(self):
869 '''Remove test interface and stop DHCP server'''
870
871 if self.if_router:
872 subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
873 # ensure failed transient unit does not stay around
874 subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
875 subprocess.call(['ip', 'link', 'del', 'dev', self.if_router])
876 self.if_router = None
877
878 def print_server_log(self):
879 '''Print DHCP server log for debugging failures'''
880
881 self.show_journal('networkd-test-router.service')
882
883 @unittest.skip('networkd does not have DHCPv6 server support')
884 def test_hotplug_dhcp_ip6(self):
885 pass
886
887 @unittest.skip('networkd does not have DHCPv6 server support')
888 def test_coldplug_dhcp_ip6(self):
889 pass
890
891 def test_search_domains(self):
892
893 # we don't use this interface for this test
894 self.if_router = None
895
896 self.write_network('test.netdev', '''\
897 [NetDev]
898 Name=dummy0
899 Kind=dummy
900 MACAddress=12:34:56:78:9a:bc''')
901 self.write_network('test.network', '''\
902 [Match]
903 Name=dummy0
904 [Network]
905 Address=192.168.42.100/24
906 DNS=192.168.42.1
907 Domains= one two three four five six seven eight nine ten''')
908
909 self.start_unit('systemd-networkd')
910
911 for timeout in range(50):
912 with open(RESOLV_CONF) as f:
913 contents = f.read()
914 if ' one' in contents:
915 break
916 time.sleep(0.1)
917 self.assertRegex(contents, 'search .*one two three four')
918 self.assertNotIn('seven\n', contents)
919 self.assertIn('# Too many search domains configured, remaining ones ignored.\n', contents)
920
921 def test_search_domains_too_long(self):
922
923 # we don't use this interface for this test
924 self.if_router = None
925
926 name_prefix = 'a' * 60
927
928 self.write_network('test.netdev', '''\
929 [NetDev]
930 Name=dummy0
931 Kind=dummy
932 MACAddress=12:34:56:78:9a:bc''')
933 self.write_network('test.network', '''\
934 [Match]
935 Name=dummy0
936 [Network]
937 Address=192.168.42.100/24
938 DNS=192.168.42.1
939 Domains={p}0 {p}1 {p}2 {p}3 {p}4'''.format(p=name_prefix))
940
941 self.start_unit('systemd-networkd')
942
943 for timeout in range(50):
944 with open(RESOLV_CONF) as f:
945 contents = f.read()
946 if ' one' in contents:
947 break
948 time.sleep(0.1)
949 self.assertRegex(contents, 'search .*{p}0 {p}1 {p}2'.format(p=name_prefix))
950 self.assertIn('# Total length of all search domains is too long, remaining ones ignored.', contents)
951
952 def test_dropin(self):
953 # we don't use this interface for this test
954 self.if_router = None
955
956 self.write_network('test.netdev', '''\
957 [NetDev]
958 Name=dummy0
959 Kind=dummy
960 MACAddress=12:34:56:78:9a:bc''')
961 self.write_network('test.network', '''\
962 [Match]
963 Name=dummy0
964 [Network]
965 Address=192.168.42.100/24
966 DNS=192.168.42.1''')
967 self.write_network_dropin('test.network', 'dns', '''\
968 [Network]
969 DNS=127.0.0.1''')
970
971 self.start_unit('systemd-resolved')
972 self.start_unit('systemd-networkd')
973
974 for timeout in range(50):
975 with open(RESOLV_CONF) as f:
976 contents = f.read()
977 if ' 127.0.0.1' in contents and '192.168.42.1' in contents:
978 break
979 time.sleep(0.1)
980 self.assertIn('nameserver 192.168.42.1\n', contents)
981 self.assertIn('nameserver 127.0.0.1\n', contents)
982
983 def test_dhcp_timezone(self):
984 '''networkd sets time zone from DHCP'''
985
986 def get_tz():
987 out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
988 '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
989 assert out.startswith(b's "')
990 out = out.strip()
991 assert out.endswith(b'"')
992 return out[3:-1].decode()
993
994 orig_timezone = get_tz()
995 self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone])
996
997 self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
998 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4')
999
1000 # should have applied the received timezone
1001 try:
1002 self.assertEqual(get_tz(), 'Pacific/Honolulu')
1003 except AssertionError:
1004 self.show_journal('systemd-networkd.service')
1005 self.show_journal('systemd-hostnamed.service')
1006 raise
1007
1008
1009 class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
1010 """Test [Match] sections in .network files.
1011
1012 Be aware that matching the test host's interfaces will wipe their
1013 configuration, so as a precaution, all network files should have a
1014 restrictive [Match] section to only ever interfere with the
1015 temporary veth interfaces created here.
1016 """
1017
1018 def tearDown(self):
1019 """Stop networkd."""
1020 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
1021
1022 def test_basic_matching(self):
1023 """Verify the Name= line works throughout this class."""
1024 self.add_veth_pair('test_if1', 'fake_if2')
1025 self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
1026 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1027 self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
1028
1029 def test_inverted_matching(self):
1030 """Verify that a '!'-prefixed value inverts the match."""
1031 # Use a MAC address as the interfaces' common matching attribute
1032 # to avoid depending on udev, to support testing in containers.
1033 mac = '00:01:02:03:98:99'
1034 self.add_veth_pair('test_veth', 'test_peer',
1035 ['addr', mac], ['addr', mac])
1036 self.write_network('no-veth.network', """\
1037 [Match]
1038 MACAddress={}
1039 Name=!nonexistent *peer*
1040 [Network]""".format(mac))
1041 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1042 self.assert_link_states(test_veth='managed', test_peer='unmanaged')
1043
1044
1045 class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
1046 """Test if networkd manages the correct interfaces."""
1047
1048 def setUp(self):
1049 """Write .network files to match the named veth devices."""
1050 # Define the veth+peer pairs to be created.
1051 # Their pairing doesn't actually matter, only their names do.
1052 self.veths = {
1053 'm1def': 'm0unm',
1054 'm1man': 'm1unm',
1055 }
1056
1057 # Define the contents of .network files to be read in order.
1058 self.configs = (
1059 "[Match]\nName=m1def\n",
1060 "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
1061 "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
1062 )
1063
1064 # Write out the .network files to be cleaned up automatically.
1065 for i, config in enumerate(self.configs):
1066 self.write_network("%02d-test.network" % i, config)
1067
1068 def tearDown(self):
1069 """Stop networkd."""
1070 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
1071
1072 def create_iface(self):
1073 """Create temporary veth pairs for interface matching."""
1074 for veth, peer in self.veths.items():
1075 self.add_veth_pair(veth, peer)
1076
1077 def test_unmanaged_setting(self):
1078 """Verify link states with Unmanaged= settings, hot-plug."""
1079 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1080 self.create_iface()
1081 self.assert_link_states(m1def='managed',
1082 m1man='managed',
1083 m1unm='unmanaged',
1084 m0unm='unmanaged')
1085
1086 def test_unmanaged_setting_coldplug(self):
1087 """Verify link states with Unmanaged= settings, cold-plug."""
1088 self.create_iface()
1089 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1090 self.assert_link_states(m1def='managed',
1091 m1man='managed',
1092 m1unm='unmanaged',
1093 m0unm='unmanaged')
1094
1095 def test_catchall_config(self):
1096 """Verify link states with a catch-all config, hot-plug."""
1097 # Don't actually catch ALL interfaces. It messes up the host.
1098 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1099 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1100 self.create_iface()
1101 self.assert_link_states(m1def='managed',
1102 m1man='managed',
1103 m1unm='unmanaged',
1104 m0unm='managed')
1105
1106 def test_catchall_config_coldplug(self):
1107 """Verify link states with a catch-all config, cold-plug."""
1108 # Don't actually catch ALL interfaces. It messes up the host.
1109 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1110 self.create_iface()
1111 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1112 self.assert_link_states(m1def='managed',
1113 m1man='managed',
1114 m1unm='unmanaged',
1115 m0unm='managed')
1116
1117
1118 if __name__ == '__main__':
1119 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
1120 verbosity=2))