]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/networkd-test.py
networkd-test: set right access modes for /run/systemd/netif
[thirdparty/systemd.git] / test / networkd-test.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: LGPL-2.1+
3 #
4 # networkd integration test
5 # This uses temporary configuration in /run and temporary veth devices, and
6 # does not write anything on disk or change any system configuration;
7 # but it assumes (and checks at the beginning) that networkd is not currently
8 # running.
9 #
10 # This can be run on a normal installation, in QEMU, nspawn (with
11 # --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
12 # or LXC system containers. You need at least the "ip" tool from the iproute
13 # package; it is recommended to install dnsmasq too to get full test coverage.
14 #
15 # ATTENTION: This uses the *installed* networkd, not the one from the built
16 # source tree.
17 #
18 # © 2015 Canonical Ltd.
19 # Author: Martin Pitt <martin.pitt@ubuntu.com>
20
21 import errno
22 import os
23 import shutil
24 import socket
25 import subprocess
26 import sys
27 import tempfile
28 import time
29 import unittest
30
31 HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
32
33 NETWORK_UNITDIR = '/run/systemd/network'
34
35 NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
36 path='/usr/lib/systemd:/lib/systemd')
37
38 RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
39
40 tmpmounts = []
41 running_units = []
42 stopped_units = []
43
44
45 def setUpModule():
46 global tmpmounts
47
48 """Initialize the environment, and perform sanity checks on it."""
49 if NETWORKD_WAIT_ONLINE is None:
50 raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
51
52 # Do not run any tests if the system is using networkd already and it's not virtualized
53 if (subprocess.call(['systemctl', 'is-active', '--quiet', 'systemd-networkd.service']) == 0 and
54 subprocess.call(['systemd-detect-virt', '--quiet']) != 0):
55 raise unittest.SkipTest('not virtualized and networkd is already active')
56
57 # Ensure we don't mess with an existing networkd config
58 for u in ['systemd-networkd.socket', 'systemd-networkd', 'systemd-resolved']:
59 if subprocess.call(['systemctl', 'is-active', '--quiet', u]) == 0:
60 subprocess.call(['systemctl', 'stop', u])
61 running_units.append(u)
62 else:
63 stopped_units.append(u)
64
65 # create static systemd-network user for networkd-test-router.service (it
66 # needs to do some stuff as root and can't start as user; but networkd
67 # still insists on the user)
68 subprocess.call(['adduser', '--system', '--no-create-home', 'systemd-network'])
69
70 for d in ['/etc/systemd/network', '/run/systemd/network',
71 '/run/systemd/netif', '/run/systemd/resolve']:
72 if os.path.isdir(d):
73 subprocess.check_call(["mount", "-t", "tmpfs", "none", d])
74 tmpmounts.append(d)
75 if os.path.isdir('/run/systemd/resolve'):
76 os.chmod('/run/systemd/resolve', 0o755)
77 shutil.chown('/run/systemd/resolve', 'systemd-resolve', 'systemd-resolve')
78 if os.path.isdir('/run/systemd/netif'):
79 os.chmod('/run/systemd/netif', 0o755)
80 shutil.chown('/run/systemd/netif', 'systemd-network', 'systemd-network')
81
82 # Avoid "Failed to open /dev/tty" errors in containers.
83 os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
84
85 # Ensure the unit directory exists so tests can dump files into it.
86 os.makedirs(NETWORK_UNITDIR, exist_ok=True)
87
88
89 def tearDownModule():
90 global tmpmounts
91 for d in tmpmounts:
92 subprocess.check_call(["umount", d])
93 for u in stopped_units:
94 subprocess.call(["systemctl", "stop", u])
95 for u in running_units:
96 subprocess.call(["systemctl", "restart", u])
97
98
99 class NetworkdTestingUtilities:
100 """Provide a set of utility functions to facilitate networkd tests.
101
102 This class must be inherited along with unittest.TestCase to define
103 some required methods.
104 """
105
106 def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
107 """Add a veth interface pair, and queue them to be removed."""
108 subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
109 list(veth_options) +
110 ['type', 'veth', 'peer', 'name', peer] +
111 list(peer_options))
112 self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
113
114 def write_network(self, unit_name, contents):
115 """Write a network unit file, and queue it to be removed."""
116 unit_path = os.path.join(NETWORK_UNITDIR, unit_name)
117
118 with open(unit_path, 'w') as unit:
119 unit.write(contents)
120 self.addCleanup(os.remove, unit_path)
121
122 def write_network_dropin(self, unit_name, dropin_name, contents):
123 """Write a network unit drop-in, and queue it to be removed."""
124 dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
125 dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
126
127 os.makedirs(dropin_dir, exist_ok=True)
128 self.addCleanup(os.rmdir, dropin_dir)
129 with open(dropin_path, 'w') as dropin:
130 dropin.write(contents)
131 self.addCleanup(os.remove, dropin_path)
132
133 def read_attr(self, link, attribute):
134 """Read a link attributed from the sysfs."""
135 # Note we we don't want to check if interface `link' is managed, we
136 # want to evaluate link variable and pass the value of the link to
137 # assert_link_states e.g. eth0=managed.
138 self.assert_link_states(**{link:'managed'})
139 with open(os.path.join('/sys/class/net', link, attribute)) as f:
140 return f.readline().strip()
141
142 def assert_link_states(self, **kwargs):
143 """Match networkctl link states to the given ones.
144
145 Each keyword argument should be the name of a network interface
146 with its expected value of the "SETUP" column in output from
147 networkctl. The interfaces have five seconds to come online
148 before the check is performed. Every specified interface must
149 be present in the output, and any other interfaces found in the
150 output are ignored.
151
152 A special interface state "managed" is supported, which matches
153 any value in the "SETUP" column other than "unmanaged".
154 """
155 if not kwargs:
156 return
157 interfaces = set(kwargs)
158
159 # Wait for the requested interfaces, but don't fail for them.
160 subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
161 ['--interface={}'.format(iface) for iface in kwargs])
162
163 # Validate each link state found in the networkctl output.
164 out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
165 for line in out.decode('utf-8').split('\n'):
166 fields = line.split()
167 if len(fields) >= 5 and fields[1] in kwargs:
168 iface = fields[1]
169 expected = kwargs[iface]
170 actual = fields[-1]
171 if (actual != expected and
172 not (expected == 'managed' and actual != 'unmanaged')):
173 self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
174 interfaces.remove(iface)
175
176 # Ensure that all requested interfaces have been covered.
177 if interfaces:
178 self.fail("Missing links in status output: {}".format(interfaces))
179
180
181 class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
182 """Provide common methods for testing networkd against servers."""
183
184 def setUp(self):
185 self.write_network('port1.netdev', '''\
186 [NetDev]
187 Name=port1
188 Kind=dummy
189 MACAddress=12:34:56:78:9a:bc''')
190 self.write_network('port2.netdev', '''\
191 [NetDev]
192 Name=port2
193 Kind=dummy
194 MACAddress=12:34:56:78:9a:bd''')
195 self.write_network('mybridge.netdev', '''\
196 [NetDev]
197 Name=mybridge
198 Kind=bridge''')
199 self.write_network('port1.network', '''\
200 [Match]
201 Name=port1
202 [Network]
203 Bridge=mybridge''')
204 self.write_network('port2.network', '''\
205 [Match]
206 Name=port2
207 [Network]
208 Bridge=mybridge''')
209 self.write_network('mybridge.network', '''\
210 [Match]
211 Name=mybridge
212 [Network]
213 DNS=192.168.250.1
214 Address=192.168.250.33/24
215 Gateway=192.168.250.1''')
216 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
217 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
218
219 def tearDown(self):
220 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd'])
221 subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
222 subprocess.check_call(['ip', 'link', 'del', 'port1'])
223 subprocess.check_call(['ip', 'link', 'del', 'port2'])
224
225 def test_bridge_init(self):
226 self.assert_link_states(
227 port1='managed',
228 port2='managed',
229 mybridge='managed')
230
231 def test_bridge_port_priority(self):
232 self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
233 self.write_network_dropin('port1.network', 'priority', '''\
234 [Bridge]
235 Priority=28
236 ''')
237 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
238 self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
239
240 def test_bridge_port_priority_set_zero(self):
241 """It should be possible to set the bridge port priority to 0"""
242 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
243 self.write_network_dropin('port2.network', 'priority', '''\
244 [Bridge]
245 Priority=0
246 ''')
247 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
248 self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
249
250 def test_bridge_port_property(self):
251 """Test the "[Bridge]" section keys"""
252 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
253 self.write_network_dropin('port2.network', 'property', '''\
254 [Bridge]
255 UnicastFlood=true
256 HairPin=true
257 UseBPDU=true
258 FastLeave=true
259 AllowPortToBeRoot=true
260 Cost=555
261 Priority=23
262 ''')
263 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
264
265 self.assertEqual(self.read_attr('port2', 'brport/priority'), '23')
266 self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1')
267 self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555')
268 self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1')
269 self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1')
270 self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1')
271 self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1')
272
273 class ClientTestBase(NetworkdTestingUtilities):
274 """Provide common methods for testing networkd against servers."""
275
276 @classmethod
277 def setUpClass(klass):
278 klass.orig_log_level = subprocess.check_output(
279 ['systemctl', 'show', '--value', '--property', 'LogLevel'],
280 universal_newlines=True).strip()
281 subprocess.check_call(['systemd-analyze', 'set-log-level', 'debug'])
282
283 @classmethod
284 def tearDownClass(klass):
285 subprocess.check_call(['systemd-analyze', 'set-log-level', klass.orig_log_level])
286
287 def setUp(self):
288 self.iface = 'test_eth42'
289 self.if_router = 'router_eth42'
290 self.workdir_obj = tempfile.TemporaryDirectory()
291 self.workdir = self.workdir_obj.name
292 self.config = 'test_eth42.network'
293
294 # get current journal cursor
295 subprocess.check_output(['journalctl', '--sync'])
296 out = subprocess.check_output(['journalctl', '-b', '--quiet',
297 '--no-pager', '-n0', '--show-cursor'],
298 universal_newlines=True)
299 self.assertTrue(out.startswith('-- cursor:'))
300 self.journal_cursor = out.split()[-1]
301
302 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
303
304 def tearDown(self):
305 self.shutdown_iface()
306 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
307 subprocess.call(['ip', 'link', 'del', 'dummy0'],
308 stderr=subprocess.DEVNULL)
309
310 def show_journal(self, unit):
311 '''Show journal of given unit since start of the test'''
312
313 print('---- {} ----'.format(unit))
314 subprocess.check_output(['journalctl', '--sync'])
315 sys.stdout.flush()
316 subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
317 '--cursor', self.journal_cursor, '-u', unit])
318
319 def create_iface(self, ipv6=False):
320 '''Create test interface with DHCP server behind it'''
321
322 raise NotImplementedError('must be implemented by a subclass')
323
324 def shutdown_iface(self):
325 '''Remove test interface and stop DHCP server'''
326
327 raise NotImplementedError('must be implemented by a subclass')
328
329 def print_server_log(self):
330 '''Print DHCP server log for debugging failures'''
331
332 raise NotImplementedError('must be implemented by a subclass')
333
334 def do_test(self, coldplug=True, ipv6=False, extra_opts='',
335 online_timeout=10, dhcp_mode='yes'):
336 try:
337 subprocess.check_call(['systemctl', 'start', 'systemd-resolved'])
338 except subprocess.CalledProcessError:
339 self.show_journal('systemd-resolved.service')
340 raise
341 self.write_network(self.config, '''\
342 [Match]
343 Name={}
344 [Network]
345 DHCP={}
346 {}'''.format(self.iface, dhcp_mode, extra_opts))
347
348 if coldplug:
349 # create interface first, then start networkd
350 self.create_iface(ipv6=ipv6)
351 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
352 elif coldplug is not None:
353 # start networkd first, then create interface
354 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
355 self.create_iface(ipv6=ipv6)
356 else:
357 # "None" means test sets up interface by itself
358 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
359
360 try:
361 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
362 self.iface, '--timeout=%i' % online_timeout])
363
364 if ipv6:
365 # check iface state and IP 6 address; FIXME: we need to wait a bit
366 # longer, as the iface is "configured" already with IPv4 *or*
367 # IPv6, but we want to wait for both
368 for _ in range(10):
369 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
370 if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out:
371 break
372 time.sleep(1)
373 else:
374 self.fail('timed out waiting for IPv6 configuration')
375
376 self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
377 self.assertRegex(out, b'inet6 fe80::.* scope link')
378 else:
379 # should have link-local address on IPv6 only
380 out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
381 self.assertRegex(out, br'inet6 fe80::.* scope link')
382 self.assertNotIn(b'scope global', out)
383
384 # should have IPv4 address
385 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
386 self.assertIn(b'state UP', out)
387 self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
388
389 # check networkctl state
390 out = subprocess.check_output(['networkctl'])
391 self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
392 self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
393
394 out = subprocess.check_output(['networkctl', 'status', self.iface])
395 self.assertRegex(out, br'Type:\s+ether')
396 self.assertRegex(out, br'State:\s+routable.*configured')
397 self.assertRegex(out, br'Address:\s+192.168.5.\d+')
398 if ipv6:
399 self.assertRegex(out, br'2600::')
400 else:
401 self.assertNotIn(br'2600::', out)
402 self.assertRegex(out, br'fe80::')
403 self.assertRegex(out, br'Gateway:\s+192.168.5.1')
404 self.assertRegex(out, br'DNS:\s+192.168.5.1')
405 except (AssertionError, subprocess.CalledProcessError):
406 # show networkd status, journal, and DHCP server log on failure
407 with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
408 print('\n---- {} ----\n{}'.format(self.config, f.read()))
409 print('---- interface status ----')
410 sys.stdout.flush()
411 subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
412 print('---- networkctl status {} ----'.format(self.iface))
413 sys.stdout.flush()
414 subprocess.call(['networkctl', 'status', self.iface])
415 self.show_journal('systemd-networkd.service')
416 self.print_server_log()
417 raise
418
419 for timeout in range(50):
420 with open(RESOLV_CONF) as f:
421 contents = f.read()
422 if 'nameserver 192.168.5.1\n' in contents:
423 break
424 time.sleep(0.1)
425 else:
426 self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF)
427
428 if coldplug is False:
429 # check post-down.d hook
430 self.shutdown_iface()
431
432 def test_coldplug_dhcp_yes_ip4(self):
433 # we have a 12s timeout on RA, so we need to wait longer
434 self.do_test(coldplug=True, ipv6=False, online_timeout=15)
435
436 def test_coldplug_dhcp_yes_ip4_no_ra(self):
437 # with disabling RA explicitly things should be fast
438 self.do_test(coldplug=True, ipv6=False,
439 extra_opts='IPv6AcceptRA=False')
440
441 def test_coldplug_dhcp_ip4_only(self):
442 # we have a 12s timeout on RA, so we need to wait longer
443 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
444 online_timeout=15)
445
446 def test_coldplug_dhcp_ip4_only_no_ra(self):
447 # with disabling RA explicitly things should be fast
448 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
449 extra_opts='IPv6AcceptRA=False')
450
451 def test_coldplug_dhcp_ip6(self):
452 self.do_test(coldplug=True, ipv6=True)
453
454 def test_hotplug_dhcp_ip4(self):
455 # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
456 self.do_test(coldplug=False, ipv6=False, online_timeout=15)
457
458 def test_hotplug_dhcp_ip6(self):
459 self.do_test(coldplug=False, ipv6=True)
460
461 def test_route_only_dns(self):
462 self.write_network('myvpn.netdev', '''\
463 [NetDev]
464 Name=dummy0
465 Kind=dummy
466 MACAddress=12:34:56:78:9a:bc''')
467 self.write_network('myvpn.network', '''\
468 [Match]
469 Name=dummy0
470 [Network]
471 Address=192.168.42.100
472 DNS=192.168.42.1
473 Domains= ~company''')
474
475 self.do_test(coldplug=True, ipv6=False,
476 extra_opts='IPv6AcceptRouterAdvertisements=False')
477
478 with open(RESOLV_CONF) as f:
479 contents = f.read()
480 # ~company is not a search domain, only a routing domain
481 self.assertNotRegex(contents, 'search.*company')
482 # our global server should appear
483 self.assertIn('nameserver 192.168.5.1\n', contents)
484 # should not have domain-restricted server as global server
485 self.assertNotIn('nameserver 192.168.42.1\n', contents)
486
487 def test_route_only_dns_all_domains(self):
488 self.write_network('myvpn.netdev', '''[NetDev]
489 Name=dummy0
490 Kind=dummy
491 MACAddress=12:34:56:78:9a:bc''')
492 self.write_network('myvpn.network', '''[Match]
493 Name=dummy0
494 [Network]
495 Address=192.168.42.100
496 DNS=192.168.42.1
497 Domains= ~company ~.''')
498
499 self.do_test(coldplug=True, ipv6=False,
500 extra_opts='IPv6AcceptRouterAdvertisements=False')
501
502 with open(RESOLV_CONF) as f:
503 contents = f.read()
504
505 # ~company is not a search domain, only a routing domain
506 self.assertNotRegex(contents, 'search.*company')
507
508 # our global server should appear
509 self.assertIn('nameserver 192.168.5.1\n', contents)
510 # should have company server as global server due to ~.
511 self.assertIn('nameserver 192.168.42.1\n', contents)
512
513
514 @unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
515 class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
516 '''Test networkd client against dnsmasq'''
517
518 def setUp(self):
519 super().setUp()
520 self.dnsmasq = None
521 self.iface_mac = 'de:ad:be:ef:47:11'
522
523 def create_iface(self, ipv6=False, dnsmasq_opts=None):
524 '''Create test interface with DHCP server behind it'''
525
526 # add veth pair
527 subprocess.check_call(['ip', 'link', 'add', 'name', self.iface,
528 'address', self.iface_mac,
529 'type', 'veth', 'peer', 'name', self.if_router])
530
531 # give our router an IP
532 subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
533 subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
534 if ipv6:
535 subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
536 subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])
537
538 # add DHCP server
539 self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
540 lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
541 if ipv6:
542 extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
543 else:
544 extra_opts = []
545 if dnsmasq_opts:
546 extra_opts += dnsmasq_opts
547 self.dnsmasq = subprocess.Popen(
548 ['dnsmasq', '--keep-in-foreground', '--log-queries',
549 '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
550 '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
551 '--interface=' + self.if_router, '--except-interface=lo',
552 '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)
553
554 def shutdown_iface(self):
555 '''Remove test interface and stop DHCP server'''
556
557 if self.if_router:
558 subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
559 self.if_router = None
560 if self.dnsmasq:
561 self.dnsmasq.kill()
562 self.dnsmasq.wait()
563 self.dnsmasq = None
564
565 def print_server_log(self):
566 '''Print DHCP server log for debugging failures'''
567
568 with open(self.dnsmasq_log) as f:
569 sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
570
571 def test_resolved_domain_restricted_dns(self):
572 '''resolved: domain-restricted DNS servers'''
573
574 # create interface for generic connections; this will map all DNS names
575 # to 192.168.42.1
576 self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
577 self.write_network('general.network', '''\
578 [Match]
579 Name={}
580 [Network]
581 DHCP=ipv4
582 IPv6AcceptRA=False'''.format(self.iface))
583
584 # create second device/dnsmasq for a .company/.lab VPN interface
585 # static IPs for simplicity
586 self.add_veth_pair('testvpnclient', 'testvpnrouter')
587 subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
588 subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
589 subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
590
591 vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log')
592 vpn_dnsmasq = subprocess.Popen(
593 ['dnsmasq', '--keep-in-foreground', '--log-queries',
594 '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null',
595 '--dhcp-leasefile=/dev/null', '--bind-interfaces',
596 '--interface=testvpnrouter', '--except-interface=lo',
597 '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
598 self.addCleanup(vpn_dnsmasq.wait)
599 self.addCleanup(vpn_dnsmasq.kill)
600
601 self.write_network('vpn.network', '''\
602 [Match]
603 Name=testvpnclient
604 [Network]
605 IPv6AcceptRA=False
606 Address=10.241.3.2/24
607 DNS=10.241.3.1
608 Domains= ~company ~lab''')
609
610 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
611 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
612 '--interface=testvpnclient', '--timeout=20'])
613
614 # ensure we start fresh with every test
615 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved'])
616
617 # test vpnclient specific domains; these should *not* be answered by
618 # the general DNS
619 out = subprocess.check_output(['resolvectl', 'query', 'math.lab'])
620 self.assertIn(b'math.lab: 10.241.3.3', out)
621 out = subprocess.check_output(['resolvectl', 'query', 'kettle.cantina.company'])
622 self.assertIn(b'kettle.cantina.company: 10.241.4.4', out)
623
624 # test general domains
625 out = subprocess.check_output(['resolvectl', 'query', 'megasearch.net'])
626 self.assertIn(b'megasearch.net: 192.168.42.1', out)
627
628 with open(self.dnsmasq_log) as f:
629 general_log = f.read()
630 with open(vpn_dnsmasq_log) as f:
631 vpn_log = f.read()
632
633 # VPN domains should only be sent to VPN DNS
634 self.assertRegex(vpn_log, 'query.*math.lab')
635 self.assertRegex(vpn_log, 'query.*cantina.company')
636 self.assertNotIn('.lab', general_log)
637 self.assertNotIn('.company', general_log)
638
639 # general domains should not be sent to the VPN DNS
640 self.assertRegex(general_log, 'query.*megasearch.net')
641 self.assertNotIn('megasearch.net', vpn_log)
642
643 def test_resolved_etc_hosts(self):
644 '''resolved queries to /etc/hosts'''
645
646 # FIXME: -t MX query fails with enabled DNSSEC (even when using
647 # the known negative trust anchor .internal instead of .example)
648 conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
649 os.makedirs(os.path.dirname(conf), exist_ok=True)
650 with open(conf, 'w') as f:
651 f.write('[Resolve]\nDNSSEC=no')
652 self.addCleanup(os.remove, conf)
653
654 # create /etc/hosts bind mount which resolves my.example for IPv4
655 hosts = os.path.join(self.workdir, 'hosts')
656 with open(hosts, 'w') as f:
657 f.write('172.16.99.99 my.example\n')
658 subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
659 self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
660 subprocess.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
661
662 # note: different IPv4 address here, so that it's easy to tell apart
663 # what resolved the query
664 self.create_iface(dnsmasq_opts=['--host-record=my.example,172.16.99.1,2600::99:99',
665 '--host-record=other.example,172.16.0.42,2600::42',
666 '--mx-host=example,mail.example'],
667 ipv6=True)
668 self.do_test(coldplug=None, ipv6=True)
669
670 try:
671 # family specific queries
672 out = subprocess.check_output(['resolvectl', 'query', '-4', 'my.example'])
673 self.assertIn(b'my.example: 172.16.99.99', out)
674 # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
675 # it's considered a sufficient source
676 self.assertNotEqual(subprocess.call(['resolvectl', 'query', '-6', 'my.example']), 0)
677 # "any family" query; IPv4 should come from /etc/hosts
678 out = subprocess.check_output(['resolvectl', 'query', 'my.example'])
679 self.assertIn(b'my.example: 172.16.99.99', out)
680 # IP → name lookup; again, takes the /etc/hosts one
681 out = subprocess.check_output(['resolvectl', 'query', '172.16.99.99'])
682 self.assertIn(b'172.16.99.99: my.example', out)
683
684 # non-address RRs should fall back to DNS
685 out = subprocess.check_output(['resolvectl', 'query', '--type=MX', 'example'])
686 self.assertIn(b'example IN MX 1 mail.example', out)
687
688 # other domains query DNS
689 out = subprocess.check_output(['resolvectl', 'query', 'other.example'])
690 self.assertIn(b'172.16.0.42', out)
691 out = subprocess.check_output(['resolvectl', 'query', '172.16.0.42'])
692 self.assertIn(b'172.16.0.42: other.example', out)
693 except (AssertionError, subprocess.CalledProcessError):
694 self.show_journal('systemd-resolved.service')
695 self.print_server_log()
696 raise
697
698 def test_transient_hostname(self):
699 '''networkd sets transient hostname from DHCP'''
700
701 orig_hostname = socket.gethostname()
702 self.addCleanup(socket.sethostname, orig_hostname)
703 # temporarily move /etc/hostname away; restart hostnamed to pick it up
704 if os.path.exists('/etc/hostname'):
705 subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
706 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
707 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
708
709 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
710 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
711
712 try:
713 # should have received the fixed IP above
714 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
715 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
716 # should have set transient hostname in hostnamed; this is
717 # sometimes a bit lagging (issue #4753), so retry a few times
718 for retry in range(1, 6):
719 out = subprocess.check_output(['hostnamectl'])
720 if b'testgreen' in out:
721 break
722 time.sleep(5)
723 sys.stdout.write('[retry %i] ' % retry)
724 sys.stdout.flush()
725 else:
726 self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
727 # and also applied to the system
728 self.assertEqual(socket.gethostname(), 'testgreen')
729 except AssertionError:
730 self.show_journal('systemd-networkd.service')
731 self.show_journal('systemd-hostnamed.service')
732 self.print_server_log()
733 raise
734
735 def test_transient_hostname_with_static(self):
736 '''transient hostname is not applied if static hostname exists'''
737
738 orig_hostname = socket.gethostname()
739 self.addCleanup(socket.sethostname, orig_hostname)
740 if not os.path.exists('/etc/hostname'):
741 self.writeConfig('/etc/hostname', orig_hostname)
742 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
743
744 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
745 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
746
747 try:
748 # should have received the fixed IP above
749 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
750 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
751 # static hostname wins over transient one, thus *not* applied
752 self.assertEqual(socket.gethostname(), orig_hostname)
753 except AssertionError:
754 self.show_journal('systemd-networkd.service')
755 self.show_journal('systemd-hostnamed.service')
756 self.print_server_log()
757 raise
758
759
760 class NetworkdClientTest(ClientTestBase, unittest.TestCase):
761 '''Test networkd client against networkd server'''
762
763 def setUp(self):
764 super().setUp()
765 self.dnsmasq = None
766
767 def create_iface(self, ipv6=False, dhcpserver_opts=None):
768 '''Create test interface with DHCP server behind it'''
769
770 # run "router-side" networkd in own mount namespace to shield it from
771 # "client-side" configuration and networkd
772 (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh')
773 self.addCleanup(os.remove, script)
774 with os.fdopen(fd, 'w+') as f:
775 f.write('''\
776 #!/bin/sh
777 set -eu
778 mkdir -p /run/systemd/network
779 mkdir -p /run/systemd/netif
780 mount -t tmpfs none /run/systemd/network
781 mount -t tmpfs none /run/systemd/netif
782 [ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
783 # create router/client veth pair
784 cat << EOF > /run/systemd/network/test.netdev
785 [NetDev]
786 Name=%(ifr)s
787 Kind=veth
788
789 [Peer]
790 Name=%(ifc)s
791 EOF
792
793 cat << EOF > /run/systemd/network/test.network
794 [Match]
795 Name=%(ifr)s
796
797 [Network]
798 Address=192.168.5.1/24
799 %(addr6)s
800 DHCPServer=yes
801
802 [DHCPServer]
803 PoolOffset=10
804 PoolSize=50
805 DNS=192.168.5.1
806 %(dhopts)s
807 EOF
808
809 # run networkd as in systemd-networkd.service
810 exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
811 ''' % {'ifr': self.if_router, 'ifc': self.iface, 'addr6': ipv6 and 'Address=2600::1/64' or '',
812 'dhopts': dhcpserver_opts or ''})
813
814 os.fchmod(fd, 0o755)
815
816 subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service',
817 '-p', 'InaccessibleDirectories=-/etc/systemd/network',
818 '-p', 'InaccessibleDirectories=-/run/systemd/network',
819 '-p', 'InaccessibleDirectories=-/run/systemd/netif',
820 '--service-type=notify', script])
821
822 # wait until devices got created
823 for _ in range(50):
824 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
825 if b'state UP' in out and b'scope global' in out:
826 break
827 time.sleep(0.1)
828
829 def shutdown_iface(self):
830 '''Remove test interface and stop DHCP server'''
831
832 if self.if_router:
833 subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
834 # ensure failed transient unit does not stay around
835 subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
836 subprocess.call(['ip', 'link', 'del', 'dev', self.if_router])
837 self.if_router = None
838
839 def print_server_log(self):
840 '''Print DHCP server log for debugging failures'''
841
842 self.show_journal('networkd-test-router.service')
843
844 @unittest.skip('networkd does not have DHCPv6 server support')
845 def test_hotplug_dhcp_ip6(self):
846 pass
847
848 @unittest.skip('networkd does not have DHCPv6 server support')
849 def test_coldplug_dhcp_ip6(self):
850 pass
851
852 def test_search_domains(self):
853
854 # we don't use this interface for this test
855 self.if_router = None
856
857 self.write_network('test.netdev', '''\
858 [NetDev]
859 Name=dummy0
860 Kind=dummy
861 MACAddress=12:34:56:78:9a:bc''')
862 self.write_network('test.network', '''\
863 [Match]
864 Name=dummy0
865 [Network]
866 Address=192.168.42.100
867 DNS=192.168.42.1
868 Domains= one two three four five six seven eight nine ten''')
869
870 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
871
872 for timeout in range(50):
873 with open(RESOLV_CONF) as f:
874 contents = f.read()
875 if ' one' in contents:
876 break
877 time.sleep(0.1)
878 self.assertRegex(contents, 'search .*one two three four')
879 self.assertNotIn('seven\n', contents)
880 self.assertIn('# Too many search domains configured, remaining ones ignored.\n', contents)
881
882 def test_search_domains_too_long(self):
883
884 # we don't use this interface for this test
885 self.if_router = None
886
887 name_prefix = 'a' * 60
888
889 self.write_network('test.netdev', '''\
890 [NetDev]
891 Name=dummy0
892 Kind=dummy
893 MACAddress=12:34:56:78:9a:bc''')
894 self.write_network('test.network', '''\
895 [Match]
896 Name=dummy0
897 [Network]
898 Address=192.168.42.100
899 DNS=192.168.42.1
900 Domains={p}0 {p}1 {p}2 {p}3 {p}4'''.format(p=name_prefix))
901
902 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
903
904 for timeout in range(50):
905 with open(RESOLV_CONF) as f:
906 contents = f.read()
907 if ' one' in contents:
908 break
909 time.sleep(0.1)
910 self.assertRegex(contents, 'search .*{p}0 {p}1 {p}2'.format(p=name_prefix))
911 self.assertIn('# Total length of all search domains is too long, remaining ones ignored.', contents)
912
913 def test_dropin(self):
914 # we don't use this interface for this test
915 self.if_router = None
916
917 self.write_network('test.netdev', '''\
918 [NetDev]
919 Name=dummy0
920 Kind=dummy
921 MACAddress=12:34:56:78:9a:bc''')
922 self.write_network('test.network', '''\
923 [Match]
924 Name=dummy0
925 [Network]
926 Address=192.168.42.100
927 DNS=192.168.42.1''')
928 self.write_network_dropin('test.network', 'dns', '''\
929 [Network]
930 DNS=127.0.0.1''')
931
932 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
933
934 for timeout in range(50):
935 with open(RESOLV_CONF) as f:
936 contents = f.read()
937 if ' 127.0.0.1' in contents:
938 break
939 time.sleep(0.1)
940 self.assertIn('nameserver 192.168.42.1\n', contents)
941 self.assertIn('nameserver 127.0.0.1\n', contents)
942
943 def test_dhcp_timezone(self):
944 '''networkd sets time zone from DHCP'''
945
946 def get_tz():
947 out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
948 '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
949 assert out.startswith(b's "')
950 out = out.strip()
951 assert out.endswith(b'"')
952 return out[3:-1].decode()
953
954 orig_timezone = get_tz()
955 self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone])
956
957 self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
958 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4')
959
960 # should have applied the received timezone
961 try:
962 self.assertEqual(get_tz(), 'Pacific/Honolulu')
963 except AssertionError:
964 self.show_journal('systemd-networkd.service')
965 self.show_journal('systemd-hostnamed.service')
966 raise
967
968
969 class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
970 """Test [Match] sections in .network files.
971
972 Be aware that matching the test host's interfaces will wipe their
973 configuration, so as a precaution, all network files should have a
974 restrictive [Match] section to only ever interfere with the
975 temporary veth interfaces created here.
976 """
977
978 def tearDown(self):
979 """Stop networkd."""
980 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
981
982 def test_basic_matching(self):
983 """Verify the Name= line works throughout this class."""
984 self.add_veth_pair('test_if1', 'fake_if2')
985 self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
986 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
987 self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
988
989 def test_inverted_matching(self):
990 """Verify that a '!'-prefixed value inverts the match."""
991 # Use a MAC address as the interfaces' common matching attribute
992 # to avoid depending on udev, to support testing in containers.
993 mac = '00:01:02:03:98:99'
994 self.add_veth_pair('test_veth', 'test_peer',
995 ['addr', mac], ['addr', mac])
996 self.write_network('no-veth.network', """\
997 [Match]
998 MACAddress={}
999 Name=!nonexistent *peer*
1000 [Network]""".format(mac))
1001 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1002 self.assert_link_states(test_veth='managed', test_peer='unmanaged')
1003
1004
1005 class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
1006 """Test if networkd manages the correct interfaces."""
1007
1008 def setUp(self):
1009 """Write .network files to match the named veth devices."""
1010 # Define the veth+peer pairs to be created.
1011 # Their pairing doesn't actually matter, only their names do.
1012 self.veths = {
1013 'm1def': 'm0unm',
1014 'm1man': 'm1unm',
1015 }
1016
1017 # Define the contents of .network files to be read in order.
1018 self.configs = (
1019 "[Match]\nName=m1def\n",
1020 "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
1021 "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
1022 )
1023
1024 # Write out the .network files to be cleaned up automatically.
1025 for i, config in enumerate(self.configs):
1026 self.write_network("%02d-test.network" % i, config)
1027
1028 def tearDown(self):
1029 """Stop networkd."""
1030 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
1031
1032 def create_iface(self):
1033 """Create temporary veth pairs for interface matching."""
1034 for veth, peer in self.veths.items():
1035 self.add_veth_pair(veth, peer)
1036
1037 def test_unmanaged_setting(self):
1038 """Verify link states with Unmanaged= settings, hot-plug."""
1039 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1040 self.create_iface()
1041 self.assert_link_states(m1def='managed',
1042 m1man='managed',
1043 m1unm='unmanaged',
1044 m0unm='unmanaged')
1045
1046 def test_unmanaged_setting_coldplug(self):
1047 """Verify link states with Unmanaged= settings, cold-plug."""
1048 self.create_iface()
1049 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1050 self.assert_link_states(m1def='managed',
1051 m1man='managed',
1052 m1unm='unmanaged',
1053 m0unm='unmanaged')
1054
1055 def test_catchall_config(self):
1056 """Verify link states with a catch-all config, hot-plug."""
1057 # Don't actually catch ALL interfaces. It messes up the host.
1058 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1059 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1060 self.create_iface()
1061 self.assert_link_states(m1def='managed',
1062 m1man='managed',
1063 m1unm='unmanaged',
1064 m0unm='managed')
1065
1066 def test_catchall_config_coldplug(self):
1067 """Verify link states with a catch-all config, cold-plug."""
1068 # Don't actually catch ALL interfaces. It messes up the host.
1069 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1070 self.create_iface()
1071 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1072 self.assert_link_states(m1def='managed',
1073 m1man='managed',
1074 m1unm='unmanaged',
1075 m0unm='managed')
1076
1077
1078 if __name__ == '__main__':
1079 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
1080 verbosity=2))