]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/networkd-test.py
networkd-test: fix resolved_domain_restricted_dns
[thirdparty/systemd.git] / test / networkd-test.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: LGPL-2.1-or-later
3 #
4 # networkd integration test
5 # This uses temporary configuration in /run and temporary veth devices, and
6 # does not write anything on disk or change any system configuration;
7 # but it assumes (and checks at the beginning) that networkd is not currently
8 # running.
9 #
10 # This can be run on a normal installation, in QEMU, nspawn (with
11 # --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
12 # or LXC system containers. You need at least the "ip" tool from the iproute
13 # package; it is recommended to install dnsmasq too to get full test coverage.
14 #
15 # ATTENTION: This uses the *installed* networkd, not the one from the built
16 # source tree.
17 #
18 # © 2015 Canonical Ltd.
19 # Author: Martin Pitt <martin.pitt@ubuntu.com>
20
21 import errno
22 import os
23 import shutil
24 import socket
25 import subprocess
26 import sys
27 import tempfile
28 import time
29 import unittest
30
31 HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
32 IS_CONTAINER = subprocess.call(['systemd-detect-virt', '--quiet', '--container']) == 0
33
34 NETWORK_UNITDIR = '/run/systemd/network'
35
36 NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
37 path='/usr/lib/systemd:/lib/systemd')
38
39 RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
40
41 tmpmounts = []
42 running_units = []
43 stopped_units = []
44
45
46 def setUpModule():
47 global tmpmounts
48
49 """Initialize the environment, and perform sanity checks on it."""
50 if NETWORKD_WAIT_ONLINE is None:
51 raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
52
53 # Do not run any tests if the system is using networkd already and it's not virtualized
54 if (subprocess.call(['systemctl', 'is-active', '--quiet', 'systemd-networkd.service']) == 0 and
55 subprocess.call(['systemd-detect-virt', '--quiet']) != 0):
56 raise unittest.SkipTest('not virtualized and networkd is already active')
57
58 # Ensure we don't mess with an existing networkd config
59 for u in ['systemd-networkd.socket', 'systemd-networkd', 'systemd-resolved']:
60 if subprocess.call(['systemctl', 'is-active', '--quiet', u]) == 0:
61 subprocess.call(['systemctl', 'stop', u])
62 running_units.append(u)
63 else:
64 stopped_units.append(u)
65
66 # create static systemd-network user for networkd-test-router.service (it
67 # needs to do some stuff as root and can't start as user; but networkd
68 # still insists on the user)
69 if subprocess.call(['getent', 'passwd', 'systemd-network']) != 0:
70 subprocess.call(['useradd', '--system', '--no-create-home', 'systemd-network'])
71
72 for d in ['/etc/systemd/network', '/run/systemd/network',
73 '/run/systemd/netif', '/run/systemd/resolve']:
74 if os.path.isdir(d):
75 subprocess.check_call(["mount", "-t", "tmpfs", "none", d])
76 tmpmounts.append(d)
77 if os.path.isdir('/run/systemd/resolve'):
78 os.chmod('/run/systemd/resolve', 0o755)
79 shutil.chown('/run/systemd/resolve', 'systemd-resolve', 'systemd-resolve')
80 if os.path.isdir('/run/systemd/netif'):
81 os.chmod('/run/systemd/netif', 0o755)
82 shutil.chown('/run/systemd/netif', 'systemd-network', 'systemd-network')
83
84 # Avoid "Failed to open /dev/tty" errors in containers.
85 os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
86
87 # Ensure the unit directory exists so tests can dump files into it.
88 os.makedirs(NETWORK_UNITDIR, exist_ok=True)
89
90
91 def tearDownModule():
92 global tmpmounts
93 for d in tmpmounts:
94 subprocess.check_call(["umount", d])
95 for u in stopped_units:
96 subprocess.call(["systemctl", "stop", u])
97 for u in running_units:
98 subprocess.call(["systemctl", "restart", u])
99
100
101 class NetworkdTestingUtilities:
102 """Provide a set of utility functions to facilitate networkd tests.
103
104 This class must be inherited along with unittest.TestCase to define
105 some required methods.
106 """
107
108 def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
109 """Add a veth interface pair, and queue them to be removed."""
110 subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
111 list(veth_options) +
112 ['type', 'veth', 'peer', 'name', peer] +
113 list(peer_options))
114 self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
115
116 def write_config(self, path, contents):
117 """"Write a configuration file, and queue it to be removed."""
118
119 with open(path, 'w') as f:
120 f.write(contents)
121
122 self.addCleanup(os.remove, path)
123
124 def write_network(self, unit_name, contents):
125 """Write a network unit file, and queue it to be removed."""
126 self.write_config(os.path.join(NETWORK_UNITDIR, unit_name), contents)
127
128 def write_network_dropin(self, unit_name, dropin_name, contents):
129 """Write a network unit drop-in, and queue it to be removed."""
130 dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
131 dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
132
133 os.makedirs(dropin_dir, exist_ok=True)
134 self.addCleanup(os.rmdir, dropin_dir)
135 with open(dropin_path, 'w') as dropin:
136 dropin.write(contents)
137 self.addCleanup(os.remove, dropin_path)
138
139 def read_attr(self, link, attribute):
140 """Read a link attributed from the sysfs."""
141 # Note we we don't want to check if interface `link' is managed, we
142 # want to evaluate link variable and pass the value of the link to
143 # assert_link_states e.g. eth0=managed.
144 self.assert_link_states(**{link:'managed'})
145 with open(os.path.join('/sys/class/net', link, attribute)) as f:
146 return f.readline().strip()
147
148 def assert_link_states(self, **kwargs):
149 """Match networkctl link states to the given ones.
150
151 Each keyword argument should be the name of a network interface
152 with its expected value of the "SETUP" column in output from
153 networkctl. The interfaces have five seconds to come online
154 before the check is performed. Every specified interface must
155 be present in the output, and any other interfaces found in the
156 output are ignored.
157
158 A special interface state "managed" is supported, which matches
159 any value in the "SETUP" column other than "unmanaged".
160 """
161 if not kwargs:
162 return
163 interfaces = set(kwargs)
164
165 # Wait for the requested interfaces, but don't fail for them.
166 subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
167 ['--interface={}'.format(iface) for iface in kwargs])
168
169 # Validate each link state found in the networkctl output.
170 out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
171 for line in out.decode('utf-8').split('\n'):
172 fields = line.split()
173 if len(fields) >= 5 and fields[1] in kwargs:
174 iface = fields[1]
175 expected = kwargs[iface]
176 actual = fields[-1]
177 if (actual != expected and
178 not (expected == 'managed' and actual != 'unmanaged')):
179 self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
180 interfaces.remove(iface)
181
182 # Ensure that all requested interfaces have been covered.
183 if interfaces:
184 self.fail("Missing links in status output: {}".format(interfaces))
185
186
187 class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
188 """Provide common methods for testing networkd against servers."""
189
190 def setUp(self):
191 self.write_network('port1.netdev', '''\
192 [NetDev]
193 Name=port1
194 Kind=dummy
195 MACAddress=12:34:56:78:9a:bc
196 ''')
197 self.write_network('port2.netdev', '''\
198 [NetDev]
199 Name=port2
200 Kind=dummy
201 MACAddress=12:34:56:78:9a:bd
202 ''')
203 self.write_network('mybridge.netdev', '''\
204 [NetDev]
205 Name=mybridge
206 Kind=bridge
207 ''')
208 self.write_network('port1.network', '''\
209 [Match]
210 Name=port1
211 [Network]
212 Bridge=mybridge
213 ''')
214 self.write_network('port2.network', '''\
215 [Match]
216 Name=port2
217 [Network]
218 Bridge=mybridge
219 ''')
220 self.write_network('mybridge.network', '''\
221 [Match]
222 Name=mybridge
223 [Network]
224 DNS=192.168.250.1
225 Address=192.168.250.33/24
226 Gateway=192.168.250.1
227 ''')
228 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
229 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
230
231 def tearDown(self):
232 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd.socket'])
233 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd.service'])
234 subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
235 subprocess.check_call(['ip', 'link', 'del', 'port1'])
236 subprocess.check_call(['ip', 'link', 'del', 'port2'])
237
238 def test_bridge_init(self):
239 self.assert_link_states(
240 port1='managed',
241 port2='managed',
242 mybridge='managed')
243
244 def test_bridge_port_priority(self):
245 self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
246 self.write_network_dropin('port1.network', 'priority', '''\
247 [Bridge]
248 Priority=28
249 ''')
250 subprocess.check_call(['ip', 'link', 'set', 'dev', 'port1', 'down'])
251 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
252 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
253 'port1', '--timeout=5'])
254 self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
255
256 def test_bridge_port_priority_set_zero(self):
257 """It should be possible to set the bridge port priority to 0"""
258 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
259 self.write_network_dropin('port2.network', 'priority', '''\
260 [Bridge]
261 Priority=0
262 ''')
263 subprocess.check_call(['ip', 'link', 'set', 'dev', 'port2', 'down'])
264 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
265 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
266 'port2', '--timeout=5'])
267 self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
268
269 def test_bridge_port_property(self):
270 """Test the "[Bridge]" section keys"""
271 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
272 self.write_network_dropin('port2.network', 'property', '''\
273 [Bridge]
274 UnicastFlood=true
275 HairPin=true
276 UseBPDU=true
277 FastLeave=true
278 AllowPortToBeRoot=true
279 Cost=555
280 Priority=23
281 ''')
282 subprocess.check_call(['ip', 'link', 'set', 'dev', 'port2', 'down'])
283 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
284 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
285 'port2', '--timeout=5'])
286
287 self.assertEqual(self.read_attr('port2', 'brport/priority'), '23')
288 self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1')
289 self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555')
290 self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1')
291 self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1')
292 self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1')
293 self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1')
294
295 class ClientTestBase(NetworkdTestingUtilities):
296 """Provide common methods for testing networkd against servers."""
297
298 @classmethod
299 def setUpClass(klass):
300 klass.orig_log_level = subprocess.check_output(
301 ['systemctl', 'show', '--value', '--property', 'LogLevel'],
302 universal_newlines=True).strip()
303 subprocess.check_call(['systemd-analyze', 'log-level', 'debug'])
304
305 @classmethod
306 def tearDownClass(klass):
307 subprocess.check_call(['systemd-analyze', 'log-level', klass.orig_log_level])
308
309 def setUp(self):
310 self.iface = 'test_eth42'
311 self.if_router = 'router_eth42'
312 self.workdir_obj = tempfile.TemporaryDirectory()
313 self.workdir = self.workdir_obj.name
314 self.config = 'test_eth42.network'
315
316 # get current journal cursor
317 subprocess.check_output(['journalctl', '--sync'])
318 out = subprocess.check_output(['journalctl', '-b', '--quiet',
319 '--no-pager', '-n0', '--show-cursor'],
320 universal_newlines=True)
321 self.assertTrue(out.startswith('-- cursor:'))
322 self.journal_cursor = out.split()[-1]
323
324 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
325
326 def tearDown(self):
327 self.shutdown_iface()
328 subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket'])
329 subprocess.call(['systemctl', 'stop', 'systemd-networkd.service'])
330 subprocess.call(['ip', 'link', 'del', 'dummy0'],
331 stderr=subprocess.DEVNULL)
332
333 def show_journal(self, unit):
334 '''Show journal of given unit since start of the test'''
335
336 print('---- {} ----'.format(unit))
337 subprocess.check_output(['journalctl', '--sync'])
338 sys.stdout.flush()
339 subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
340 '--cursor', self.journal_cursor, '-u', unit])
341
342 def create_iface(self, ipv6=False):
343 '''Create test interface with DHCP server behind it'''
344
345 raise NotImplementedError('must be implemented by a subclass')
346
347 def shutdown_iface(self):
348 '''Remove test interface and stop DHCP server'''
349
350 raise NotImplementedError('must be implemented by a subclass')
351
352 def print_server_log(self):
353 '''Print DHCP server log for debugging failures'''
354
355 raise NotImplementedError('must be implemented by a subclass')
356
357 def start_unit(self, unit):
358 try:
359 subprocess.check_call(['systemctl', 'start', unit])
360 except subprocess.CalledProcessError:
361 self.show_journal(unit)
362 raise
363
364 def do_test(self, coldplug=True, ipv6=False, extra_opts='',
365 online_timeout=10, dhcp_mode='yes'):
366 self.start_unit('systemd-resolved')
367 self.write_network(self.config, '''\
368 [Match]
369 Name={iface}
370 [Network]
371 DHCP={dhcp_mode}
372 {extra_opts}
373 '''.format(iface=self.iface, dhcp_mode=dhcp_mode, extra_opts=extra_opts))
374
375 if coldplug:
376 # create interface first, then start networkd
377 self.create_iface(ipv6=ipv6)
378 self.start_unit('systemd-networkd')
379 elif coldplug is not None:
380 # start networkd first, then create interface
381 self.start_unit('systemd-networkd')
382 self.create_iface(ipv6=ipv6)
383 else:
384 # "None" means test sets up interface by itself
385 self.start_unit('systemd-networkd')
386
387 try:
388 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
389 self.iface, '--timeout=%i' % online_timeout])
390
391 if ipv6:
392 # check iface state and IP 6 address; FIXME: we need to wait a bit
393 # longer, as the iface is "configured" already with IPv4 *or*
394 # IPv6, but we want to wait for both
395 for _ in range(10):
396 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
397 if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out and b'tentative' not in out:
398 break
399 time.sleep(1)
400 else:
401 self.fail('timed out waiting for IPv6 configuration')
402
403 self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
404 self.assertRegex(out, b'inet6 fe80::.* scope link')
405 else:
406 # should have link-local address on IPv6 only
407 out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
408 self.assertRegex(out, br'inet6 fe80::.* scope link')
409 self.assertNotIn(b'scope global', out)
410
411 # should have IPv4 address
412 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
413 self.assertIn(b'state UP', out)
414 self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
415
416 # check networkctl state
417 out = subprocess.check_output(['networkctl'])
418 self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
419 self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
420
421 out = subprocess.check_output(['networkctl', '-n', '0', 'status', self.iface])
422 self.assertRegex(out, br'Type:\s+ether')
423 self.assertRegex(out, br'State:\s+routable.*configured')
424 self.assertRegex(out, br'Online state:\s+online')
425 self.assertRegex(out, br'Address:\s+192.168.5.\d+')
426 if ipv6:
427 self.assertRegex(out, br'2600::')
428 else:
429 self.assertNotIn(br'2600::', out)
430 self.assertRegex(out, br'fe80::')
431 self.assertRegex(out, br'Gateway:\s+192.168.5.1')
432 self.assertRegex(out, br'DNS:\s+192.168.5.1')
433 except (AssertionError, subprocess.CalledProcessError):
434 # show networkd status, journal, and DHCP server log on failure
435 with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
436 print('\n---- {} ----\n{}'.format(self.config, f.read()))
437 print('---- interface status ----')
438 sys.stdout.flush()
439 subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
440 print('---- networkctl status {} ----'.format(self.iface))
441 sys.stdout.flush()
442 rc = subprocess.call(['networkctl', '-n', '0', 'status', self.iface])
443 if rc != 0:
444 print("'networkctl status' exited with an unexpected code {}".format(rc))
445 self.show_journal('systemd-networkd.service')
446 self.print_server_log()
447 raise
448
449 for timeout in range(50):
450 with open(RESOLV_CONF) as f:
451 contents = f.read()
452 if 'nameserver 192.168.5.1\n' in contents:
453 break
454 time.sleep(0.1)
455 else:
456 self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF)
457
458 if coldplug is False:
459 # check post-down.d hook
460 self.shutdown_iface()
461
462 def test_coldplug_dhcp_yes_ip4(self):
463 # we have a 12s timeout on RA, so we need to wait longer
464 self.do_test(coldplug=True, ipv6=False, online_timeout=15)
465
466 def test_coldplug_dhcp_yes_ip4_no_ra(self):
467 # with disabling RA explicitly things should be fast
468 self.do_test(coldplug=True, ipv6=False,
469 extra_opts='IPv6AcceptRA=False')
470
471 def test_coldplug_dhcp_ip4_only(self):
472 # we have a 12s timeout on RA, so we need to wait longer
473 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
474 online_timeout=15)
475
476 def test_coldplug_dhcp_ip4_only_no_ra(self):
477 # with disabling RA explicitly things should be fast
478 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
479 extra_opts='IPv6AcceptRA=False')
480
481 def test_coldplug_dhcp_ip6(self):
482 self.do_test(coldplug=True, ipv6=True)
483
484 def test_hotplug_dhcp_ip4(self):
485 # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
486 self.do_test(coldplug=False, ipv6=False, online_timeout=15)
487
488 def test_hotplug_dhcp_ip6(self):
489 self.do_test(coldplug=False, ipv6=True)
490
491 def test_route_only_dns(self):
492 self.write_network('myvpn.netdev', '''\
493 [NetDev]
494 Name=dummy0
495 Kind=dummy
496 MACAddress=12:34:56:78:9a:bc
497 ''')
498 self.write_network('myvpn.network', '''\
499 [Match]
500 Name=dummy0
501 [Network]
502 Address=192.168.42.100/24
503 DNS=192.168.42.1
504 Domains= ~company
505 ''')
506
507 try:
508 self.do_test(coldplug=True, ipv6=False,
509 extra_opts='IPv6AcceptRouterAdvertisements=False')
510 except subprocess.CalledProcessError as e:
511 # networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848
512 if IS_CONTAINER and e.cmd == ['systemctl', 'start', 'systemd-networkd']:
513 raise unittest.SkipTest('https://github.com/systemd/systemd/issues/11848')
514 else:
515 raise
516
517 with open(RESOLV_CONF) as f:
518 contents = f.read()
519 # ~company is not a search domain, only a routing domain
520 self.assertNotRegex(contents, 'search.*company')
521 # our global server should appear
522 self.assertIn('nameserver 192.168.5.1\n', contents)
523 # should not have domain-restricted server as global server
524 self.assertNotIn('nameserver 192.168.42.1\n', contents)
525
526 def test_route_only_dns_all_domains(self):
527 self.write_network('myvpn.netdev', '''[NetDev]
528 Name=dummy0
529 Kind=dummy
530 MACAddress=12:34:56:78:9a:bc
531 ''')
532 self.write_network('myvpn.network', '''[Match]
533 Name=dummy0
534 [Network]
535 Address=192.168.42.100/24
536 DNS=192.168.42.1
537 Domains= ~company ~.
538 ''')
539
540 try:
541 self.do_test(coldplug=True, ipv6=False,
542 extra_opts='IPv6AcceptRouterAdvertisements=False')
543 except subprocess.CalledProcessError as e:
544 # networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848
545 if IS_CONTAINER and e.cmd == ['systemctl', 'start', 'systemd-networkd']:
546 raise unittest.SkipTest('https://github.com/systemd/systemd/issues/11848')
547 else:
548 raise
549
550 with open(RESOLV_CONF) as f:
551 contents = f.read()
552
553 # ~company is not a search domain, only a routing domain
554 self.assertNotRegex(contents, 'search.*company')
555
556 # our global server should appear
557 self.assertIn('nameserver 192.168.5.1\n', contents)
558 # should have company server as global server due to ~.
559 self.assertIn('nameserver 192.168.42.1\n', contents)
560
561
562 @unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
563 class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
564 '''Test networkd client against dnsmasq'''
565
566 def setUp(self):
567 super().setUp()
568 self.dnsmasq = None
569 self.iface_mac = 'de:ad:be:ef:47:11'
570
571 def create_iface(self, ipv6=False, dnsmasq_opts=None):
572 '''Create test interface with DHCP server behind it'''
573
574 # add veth pair
575 subprocess.check_call(['ip', 'link', 'add', 'name', self.iface,
576 'address', self.iface_mac,
577 'type', 'veth', 'peer', 'name', self.if_router])
578
579 # give our router an IP
580 subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
581 subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
582 if ipv6:
583 subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
584 subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])
585
586 # add DHCP server
587 self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
588 lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
589 if ipv6:
590 extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
591 else:
592 extra_opts = []
593 if dnsmasq_opts:
594 extra_opts += dnsmasq_opts
595 self.dnsmasq = subprocess.Popen(
596 ['dnsmasq', '--keep-in-foreground', '--log-queries',
597 '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
598 '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
599 '--interface=' + self.if_router, '--except-interface=lo',
600 '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)
601
602 def shutdown_iface(self):
603 '''Remove test interface and stop DHCP server'''
604
605 if self.if_router:
606 subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
607 self.if_router = None
608 if self.dnsmasq:
609 self.dnsmasq.kill()
610 self.dnsmasq.wait()
611 self.dnsmasq = None
612
613 def print_server_log(self):
614 '''Print DHCP server log for debugging failures'''
615
616 with open(self.dnsmasq_log) as f:
617 sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
618
619 def test_resolved_domain_restricted_dns(self):
620 '''resolved: domain-restricted DNS servers'''
621
622 # enable DNSSEC in allow downgrade mode, and turn off stuff we don't want to test to make looking at logs easier
623 conf = '/run/systemd/resolved.conf.d/test-enable-dnssec.conf'
624 os.makedirs(os.path.dirname(conf), exist_ok=True)
625 with open(conf, 'w') as f:
626 f.write('[Resolve]\nDNSSEC=allow-downgrade\nLLMNR=no\nMulticastDNS=no\nDNSOverTLS=no\n')
627 self.addCleanup(os.remove, conf)
628
629 # create interface for generic connections; this will map all DNS names
630 # to 192.168.42.1
631 self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
632 self.write_network('general.network', '''\
633 [Match]
634 Name={}
635 [Network]
636 DHCP=ipv4
637 IPv6AcceptRA=False
638 DNSSECNegativeTrustAnchors=search.example.com
639 '''.format(self.iface))
640
641 # create second device/dnsmasq for a .company/.lab VPN interface
642 # static IPs for simplicity
643 self.add_veth_pair('testvpnclient', 'testvpnrouter')
644 subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
645 subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
646 subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
647
648 vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log')
649 vpn_dnsmasq = subprocess.Popen(
650 ['dnsmasq', '--keep-in-foreground', '--log-queries',
651 '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null',
652 '--dhcp-leasefile=/dev/null', '--bind-interfaces',
653 '--interface=testvpnrouter', '--except-interface=lo',
654 '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
655 self.addCleanup(vpn_dnsmasq.wait)
656 self.addCleanup(vpn_dnsmasq.kill)
657
658 self.write_network('vpn.network', '''\
659 [Match]
660 Name=testvpnclient
661 [Network]
662 IPv6AcceptRA=False
663 Address=10.241.3.2/24
664 DNS=10.241.3.1
665 Domains=~company ~lab
666 DNSSECNegativeTrustAnchors=company lab
667 ''')
668
669 self.start_unit('systemd-networkd')
670 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
671 '--interface=testvpnclient', '--timeout=20'])
672
673 # ensure we start fresh with every test
674 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved'])
675
676 # test vpnclient specific domains; these should *not* be answered by
677 # the general DNS
678 out = subprocess.check_output(['resolvectl', 'query', 'math.lab'])
679 self.assertIn(b'math.lab: 10.241.3.3', out)
680 out = subprocess.check_output(['resolvectl', 'query', 'kettle.cantina.company'])
681 self.assertIn(b'kettle.cantina.company: 10.241.4.4', out)
682
683 # test general domains
684 out = subprocess.check_output(['resolvectl', 'query', 'search.example.com'])
685 self.assertIn(b'search.example.com: 192.168.42.1', out)
686
687 with open(self.dnsmasq_log) as f:
688 general_log = f.read()
689 with open(vpn_dnsmasq_log) as f:
690 vpn_log = f.read()
691
692 # VPN domains should only be sent to VPN DNS
693 self.assertRegex(vpn_log, 'query.*math.lab')
694 self.assertRegex(vpn_log, 'query.*cantina.company')
695 self.assertNotIn('.lab', general_log)
696 self.assertNotIn('.company', general_log)
697
698 # general domains should not be sent to the VPN DNS
699 self.assertRegex(general_log, 'query.*search.example.com')
700 self.assertNotIn('search.example.com', vpn_log)
701
702 def test_resolved_etc_hosts(self):
703 '''resolved queries to /etc/hosts'''
704
705 # enabled DNSSEC in allow-downgrade mode
706 conf = '/run/systemd/resolved.conf.d/test-enable-dnssec.conf'
707 os.makedirs(os.path.dirname(conf), exist_ok=True)
708 with open(conf, 'w') as f:
709 f.write('[Resolve]\nDNSSEC=allow-downgrade\nLLMNR=no\nMulticastDNS=no\nDNSOverTLS=no\n')
710 self.addCleanup(os.remove, conf)
711
712 # Add example.com to NTA list for this test
713 negative = '/run/dnssec-trust-anchors.d/example.com.negative'
714 os.makedirs(os.path.dirname(negative), exist_ok=True)
715 with open(negative, 'w') as f:
716 f.write('example.com\n16.172.in-addr.arpa\n')
717 self.addCleanup(os.remove, negative)
718
719 # create /etc/hosts bind mount which resolves my.example.com for IPv4
720 hosts = os.path.join(self.workdir, 'hosts')
721 with open(hosts, 'w') as f:
722 f.write('172.16.99.99 my.example.com\n')
723 subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
724 self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
725 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved.service'])
726
727 # note: different IPv4 address here, so that it's easy to tell apart
728 # what resolved the query
729 self.create_iface(dnsmasq_opts=['--host-record=my.example.com,172.16.99.1,2600::99:99',
730 '--host-record=other.example.com,172.16.0.42,2600::42',
731 '--mx-host=example.com,mail.example.com'],
732 ipv6=True)
733 self.do_test(coldplug=None, ipv6=True)
734
735 try:
736 # family specific queries
737 out = subprocess.check_output(['resolvectl', 'query', '-4', 'my.example.com'])
738 self.assertIn(b'my.example.com: 172.16.99.99', out)
739 # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
740 # it's considered a sufficient source
741 self.assertNotEqual(subprocess.call(['resolvectl', 'query', '-6', 'my.example.com']), 0)
742 # "any family" query; IPv4 should come from /etc/hosts
743 out = subprocess.check_output(['resolvectl', 'query', 'my.example.com'])
744 self.assertIn(b'my.example.com: 172.16.99.99', out)
745 # IP → name lookup; again, takes the /etc/hosts one
746 out = subprocess.check_output(['resolvectl', 'query', '172.16.99.99'])
747 self.assertIn(b'172.16.99.99: my.example.com', out)
748
749 # non-address RRs should fall back to DNS
750 out = subprocess.check_output(['resolvectl', 'query', '--type=MX', 'example.com'])
751 self.assertIn(b'example.com IN MX 1 mail.example.com', out)
752
753 # other domains query DNS
754 out = subprocess.check_output(['resolvectl', 'query', 'other.example.com'])
755 self.assertIn(b'172.16.0.42', out)
756 out = subprocess.check_output(['resolvectl', 'query', '172.16.0.42'])
757 self.assertIn(b'172.16.0.42: other.example.com', out)
758 except (AssertionError, subprocess.CalledProcessError):
759 self.show_journal('systemd-resolved.service')
760 self.print_server_log()
761 raise
762
763 def test_transient_hostname(self):
764 '''networkd sets transient hostname from DHCP'''
765
766 orig_hostname = socket.gethostname()
767 self.addCleanup(socket.sethostname, orig_hostname)
768 # temporarily move /etc/hostname away; restart hostnamed to pick it up
769 if os.path.exists('/etc/hostname'):
770 subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
771 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
772 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
773 self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
774
775 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
776 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
777
778 try:
779 # should have received the fixed IP above
780 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
781 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
782 # should have set transient hostname in hostnamed; this is
783 # sometimes a bit lagging (issue #4753), so retry a few times
784 for retry in range(1, 6):
785 out = subprocess.check_output(['hostnamectl'])
786 if b'testgreen' in out:
787 break
788 time.sleep(5)
789 sys.stdout.write('[retry %i] ' % retry)
790 sys.stdout.flush()
791 else:
792 self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
793 # and also applied to the system
794 self.assertEqual(socket.gethostname(), 'testgreen')
795 except AssertionError:
796 self.show_journal('systemd-networkd.service')
797 self.show_journal('systemd-hostnamed.service')
798 self.print_server_log()
799 raise
800
801 def test_transient_hostname_with_static(self):
802 '''transient hostname is not applied if static hostname exists'''
803
804 orig_hostname = socket.gethostname()
805 self.addCleanup(socket.sethostname, orig_hostname)
806
807 if not os.path.exists('/etc/hostname'):
808 self.write_config('/etc/hostname', "foobarqux")
809 else:
810 self.write_config('/run/hostname.tmp', "foobarqux")
811 subprocess.check_call(['mount', '--bind', '/run/hostname.tmp', '/etc/hostname'])
812 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
813
814 socket.sethostname("foobarqux");
815
816 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
817 self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
818
819 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
820 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
821
822 try:
823 # should have received the fixed IP above
824 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
825 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
826 # static hostname wins over transient one, thus *not* applied
827 self.assertEqual(socket.gethostname(), "foobarqux")
828 except AssertionError:
829 self.show_journal('systemd-networkd.service')
830 self.show_journal('systemd-hostnamed.service')
831 self.print_server_log()
832 raise
833
834
835 class NetworkdClientTest(ClientTestBase, unittest.TestCase):
836 '''Test networkd client against networkd server'''
837
838 def setUp(self):
839 super().setUp()
840 self.dnsmasq = None
841
842 def create_iface(self, ipv6=False, dhcpserver_opts=None):
843 '''Create test interface with DHCP server behind it'''
844
845 # run "router-side" networkd in own mount namespace to shield it from
846 # "client-side" configuration and networkd
847 (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh')
848 self.addCleanup(os.remove, script)
849 with os.fdopen(fd, 'w+') as f:
850 f.write('''\
851 #!/bin/sh
852 set -eu
853 mkdir -p /run/systemd/network
854 mkdir -p /run/systemd/netif
855 mount -t tmpfs none /run/systemd/network
856 mount -t tmpfs none /run/systemd/netif
857 [ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
858 # create router/client veth pair
859 cat <<EOF >/run/systemd/network/test.netdev
860 [NetDev]
861 Name={ifr}
862 Kind=veth
863
864 [Peer]
865 Name={ifc}
866 EOF
867
868 cat <<EOF >/run/systemd/network/test.network
869 [Match]
870 Name={ifr}
871
872 [Network]
873 Address=192.168.5.1/24
874 {addr6}
875 DHCPServer=yes
876
877 [DHCPServer]
878 PoolOffset=10
879 PoolSize=50
880 DNS=192.168.5.1
881 {dhopts}
882 EOF
883
884 # run networkd as in systemd-networkd.service
885 exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ {{ s/^.*=//; s/^[@+-]//; s/^!*//; p}}')
886 '''.format(ifr=self.if_router,
887 ifc=self.iface,
888 addr6=('Address=2600::1/64' if ipv6 else ''),
889 dhopts=(dhcpserver_opts or '')))
890
891 os.fchmod(fd, 0o755)
892
893 subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service',
894 '-p', 'InaccessibleDirectories=-/etc/systemd/network',
895 '-p', 'InaccessibleDirectories=-/run/systemd/network',
896 '-p', 'InaccessibleDirectories=-/run/systemd/netif',
897 '--service-type=notify', script])
898
899 # wait until devices got created
900 for _ in range(50):
901 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
902 if b'state UP' in out and b'scope global' in out:
903 break
904 time.sleep(0.1)
905
906 def shutdown_iface(self):
907 '''Remove test interface and stop DHCP server'''
908
909 if self.if_router:
910 subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
911 # ensure failed transient unit does not stay around
912 subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
913 subprocess.call(['ip', 'link', 'del', 'dev', self.if_router])
914 self.if_router = None
915
916 def print_server_log(self):
917 '''Print DHCP server log for debugging failures'''
918
919 self.show_journal('networkd-test-router.service')
920
921 @unittest.skip('networkd does not have DHCPv6 server support')
922 def test_hotplug_dhcp_ip6(self):
923 pass
924
925 @unittest.skip('networkd does not have DHCPv6 server support')
926 def test_coldplug_dhcp_ip6(self):
927 pass
928
929 def test_search_domains(self):
930
931 # we don't use this interface for this test
932 self.if_router = None
933
934 self.write_network('test.netdev', '''\
935 [NetDev]
936 Name=dummy0
937 Kind=dummy
938 MACAddress=12:34:56:78:9a:bc
939 ''')
940 self.write_network('test.network', '''\
941 [Match]
942 Name=dummy0
943 [Network]
944 Address=192.168.42.100/24
945 DNS=192.168.42.1
946 Domains= one two three four five six seven eight nine ten
947 ''')
948
949 self.start_unit('systemd-networkd')
950
951 for timeout in range(50):
952 with open(RESOLV_CONF) as f:
953 contents = f.read()
954 if ' one' in contents:
955 break
956 time.sleep(0.1)
957 self.assertRegex(contents, 'search .*one two three four five six seven eight nine ten')
958
959 def test_dropin(self):
960 # we don't use this interface for this test
961 self.if_router = None
962
963 self.write_network('test.netdev', '''\
964 [NetDev]
965 Name=dummy0
966 Kind=dummy
967 MACAddress=12:34:56:78:9a:bc
968 ''')
969 self.write_network('test.network', '''\
970 [Match]
971 Name=dummy0
972 [Network]
973 Address=192.168.42.100/24
974 DNS=192.168.42.1
975 ''')
976 self.write_network_dropin('test.network', 'dns', '''\
977 [Network]
978 DNS=127.0.0.1
979 ''')
980
981 self.start_unit('systemd-resolved')
982 self.start_unit('systemd-networkd')
983
984 for timeout in range(50):
985 with open(RESOLV_CONF) as f:
986 contents = f.read()
987 if ' 127.0.0.1' in contents and '192.168.42.1' in contents:
988 break
989 time.sleep(0.1)
990 self.assertIn('nameserver 192.168.42.1\n', contents)
991 self.assertIn('nameserver 127.0.0.1\n', contents)
992
993 def test_dhcp_timezone(self):
994 '''networkd sets time zone from DHCP'''
995
996 def get_tz():
997 out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
998 '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
999 assert out.startswith(b's "')
1000 out = out.strip()
1001 assert out.endswith(b'"')
1002 return out[3:-1].decode()
1003
1004 orig_timezone = get_tz()
1005 self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone])
1006
1007 self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
1008 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4')
1009
1010 # should have applied the received timezone
1011 try:
1012 self.assertEqual(get_tz(), 'Pacific/Honolulu')
1013 except AssertionError:
1014 self.show_journal('systemd-networkd.service')
1015 self.show_journal('systemd-hostnamed.service')
1016 raise
1017
1018
1019 class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
1020 """Test [Match] sections in .network files.
1021
1022 Be aware that matching the test host's interfaces will wipe their
1023 configuration, so as a precaution, all network files should have a
1024 restrictive [Match] section to only ever interfere with the
1025 temporary veth interfaces created here.
1026 """
1027
1028 def tearDown(self):
1029 """Stop networkd."""
1030 subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket'])
1031 subprocess.call(['systemctl', 'stop', 'systemd-networkd.service'])
1032
1033 def test_basic_matching(self):
1034 """Verify the Name= line works throughout this class."""
1035 self.add_veth_pair('test_if1', 'fake_if2')
1036 self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
1037 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1038 self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
1039
1040 def test_inverted_matching(self):
1041 """Verify that a '!'-prefixed value inverts the match."""
1042 # Use a MAC address as the interfaces' common matching attribute
1043 # to avoid depending on udev, to support testing in containers.
1044 mac = '00:01:02:03:98:99'
1045 self.add_veth_pair('test_veth', 'test_peer',
1046 ['addr', mac], ['addr', mac])
1047 self.write_network('no-veth.network', """\
1048 [Match]
1049 MACAddress={}
1050 Name=!nonexistent *peer*
1051 [Network]""".format(mac))
1052 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1053 self.assert_link_states(test_veth='managed', test_peer='unmanaged')
1054
1055
1056 class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
1057 """Test if networkd manages the correct interfaces."""
1058
1059 def setUp(self):
1060 """Write .network files to match the named veth devices."""
1061 # Define the veth+peer pairs to be created.
1062 # Their pairing doesn't actually matter, only their names do.
1063 self.veths = {
1064 'm1def': 'm0unm',
1065 'm1man': 'm1unm',
1066 }
1067
1068 # Define the contents of .network files to be read in order.
1069 self.configs = (
1070 "[Match]\nName=m1def\n",
1071 "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
1072 "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
1073 )
1074
1075 # Write out the .network files to be cleaned up automatically.
1076 for i, config in enumerate(self.configs):
1077 self.write_network("%02d-test.network" % i, config)
1078
1079 def tearDown(self):
1080 """Stop networkd."""
1081 subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket'])
1082 subprocess.call(['systemctl', 'stop', 'systemd-networkd.service'])
1083
1084 def create_iface(self):
1085 """Create temporary veth pairs for interface matching."""
1086 for veth, peer in self.veths.items():
1087 self.add_veth_pair(veth, peer)
1088
1089 def test_unmanaged_setting(self):
1090 """Verify link states with Unmanaged= settings, hot-plug."""
1091 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1092 self.create_iface()
1093 self.assert_link_states(m1def='managed',
1094 m1man='managed',
1095 m1unm='unmanaged',
1096 m0unm='unmanaged')
1097
1098 def test_unmanaged_setting_coldplug(self):
1099 """Verify link states with Unmanaged= settings, cold-plug."""
1100 self.create_iface()
1101 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1102 self.assert_link_states(m1def='managed',
1103 m1man='managed',
1104 m1unm='unmanaged',
1105 m0unm='unmanaged')
1106
1107 def test_catchall_config(self):
1108 """Verify link states with a catch-all config, hot-plug."""
1109 # Don't actually catch ALL interfaces. It messes up the host.
1110 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1111 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1112 self.create_iface()
1113 self.assert_link_states(m1def='managed',
1114 m1man='managed',
1115 m1unm='unmanaged',
1116 m0unm='managed')
1117
1118 def test_catchall_config_coldplug(self):
1119 """Verify link states with a catch-all config, cold-plug."""
1120 # Don't actually catch ALL interfaces. It messes up the host.
1121 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1122 self.create_iface()
1123 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1124 self.assert_link_states(m1def='managed',
1125 m1man='managed',
1126 m1unm='unmanaged',
1127 m0unm='managed')
1128
1129
1130 if __name__ == '__main__':
1131 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
1132 verbosity=2))