]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/networkd-test.py
pkgconfig: define variables relative to ${prefix}/${rootprefix}/${sysconfdir}
[thirdparty/systemd.git] / test / networkd-test.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: LGPL-2.1+
3 #
4 # networkd integration test
5 # This uses temporary configuration in /run and temporary veth devices, and
6 # does not write anything on disk or change any system configuration;
7 # but it assumes (and checks at the beginning) that networkd is not currently
8 # running.
9 #
10 # This can be run on a normal installation, in QEMU, nspawn (with
11 # --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
12 # or LXC system containers. You need at least the "ip" tool from the iproute
13 # package; it is recommended to install dnsmasq too to get full test coverage.
14 #
15 # ATTENTION: This uses the *installed* networkd, not the one from the built
16 # source tree.
17 #
18 # © 2015 Canonical Ltd.
19 # Author: Martin Pitt <martin.pitt@ubuntu.com>
20
21 import errno
22 import os
23 import shutil
24 import socket
25 import subprocess
26 import sys
27 import tempfile
28 import time
29 import unittest
30
31 HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
32
33 NETWORK_UNITDIR = '/run/systemd/network'
34
35 NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
36 path='/usr/lib/systemd:/lib/systemd')
37
38 RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
39
40 tmpmounts = []
41 running_units = []
42 stopped_units = []
43
44
45 def setUpModule():
46 global tmpmounts
47
48 """Initialize the environment, and perform sanity checks on it."""
49 if NETWORKD_WAIT_ONLINE is None:
50 raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
51
52 # Do not run any tests if the system is using networkd already and it's not virtualized
53 if (subprocess.call(['systemctl', 'is-active', '--quiet', 'systemd-networkd.service']) == 0 and
54 subprocess.call(['systemd-detect-virt', '--quiet']) != 0):
55 raise unittest.SkipTest('not virtualized and networkd is already active')
56 # Ensure we don't mess with an existing networkd config
57 for u in ['systemd-networkd.socket', 'systemd-networkd', 'systemd-resolved']:
58 if subprocess.call(['systemctl', 'is-active', '--quiet', u]) == 0:
59 subprocess.call(['systemctl', 'stop', u])
60 running_units.append(u)
61 else:
62 stopped_units.append(u)
63 for d in ['/etc/systemd/network', '/run/systemd/network',
64 '/run/systemd/netif', '/run/systemd/resolve']:
65 if os.path.isdir(d):
66 subprocess.check_call(["mount", "-t", "tmpfs", "none", d])
67 tmpmounts.append(d)
68 if os.path.isdir('/run/systemd/resolve'):
69 os.chmod('/run/systemd/resolve', 0o755)
70 shutil.chown('/run/systemd/resolve', 'systemd-resolve', 'systemd-resolve')
71
72 # Avoid "Failed to open /dev/tty" errors in containers.
73 os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
74
75 # Ensure the unit directory exists so tests can dump files into it.
76 os.makedirs(NETWORK_UNITDIR, exist_ok=True)
77
78 # create static systemd-network user for networkd-test-router.service (it
79 # needs to do some stuff as root and can't start as user; but networkd
80 # still insists on the user)
81 subprocess.check_call(['adduser', '--system', '--no-create-home', 'systemd-network'])
82
83
84 def tearDownModule():
85 global tmpmounts
86 for d in tmpmounts:
87 subprocess.check_call(["umount", d])
88 for u in stopped_units:
89 subprocess.call(["systemctl", "stop", u])
90 for u in running_units:
91 subprocess.call(["systemctl", "restart", u])
92
93
94 class NetworkdTestingUtilities:
95 """Provide a set of utility functions to facilitate networkd tests.
96
97 This class must be inherited along with unittest.TestCase to define
98 some required methods.
99 """
100
101 def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
102 """Add a veth interface pair, and queue them to be removed."""
103 subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
104 list(veth_options) +
105 ['type', 'veth', 'peer', 'name', peer] +
106 list(peer_options))
107 self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
108
109 def write_network(self, unit_name, contents):
110 """Write a network unit file, and queue it to be removed."""
111 unit_path = os.path.join(NETWORK_UNITDIR, unit_name)
112
113 with open(unit_path, 'w') as unit:
114 unit.write(contents)
115 self.addCleanup(os.remove, unit_path)
116
117 def write_network_dropin(self, unit_name, dropin_name, contents):
118 """Write a network unit drop-in, and queue it to be removed."""
119 dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
120 dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
121
122 os.makedirs(dropin_dir, exist_ok=True)
123 self.addCleanup(os.rmdir, dropin_dir)
124 with open(dropin_path, 'w') as dropin:
125 dropin.write(contents)
126 self.addCleanup(os.remove, dropin_path)
127
128 def read_attr(self, link, attribute):
129 """Read a link attributed from the sysfs."""
130 # Note we we don't want to check if interface `link' is managed, we
131 # want to evaluate link variable and pass the value of the link to
132 # assert_link_states e.g. eth0=managed.
133 self.assert_link_states(**{link:'managed'})
134 with open(os.path.join('/sys/class/net', link, attribute)) as f:
135 return f.readline().strip()
136
137 def assert_link_states(self, **kwargs):
138 """Match networkctl link states to the given ones.
139
140 Each keyword argument should be the name of a network interface
141 with its expected value of the "SETUP" column in output from
142 networkctl. The interfaces have five seconds to come online
143 before the check is performed. Every specified interface must
144 be present in the output, and any other interfaces found in the
145 output are ignored.
146
147 A special interface state "managed" is supported, which matches
148 any value in the "SETUP" column other than "unmanaged".
149 """
150 if not kwargs:
151 return
152 interfaces = set(kwargs)
153
154 # Wait for the requested interfaces, but don't fail for them.
155 subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
156 ['--interface={}'.format(iface) for iface in kwargs])
157
158 # Validate each link state found in the networkctl output.
159 out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
160 for line in out.decode('utf-8').split('\n'):
161 fields = line.split()
162 if len(fields) >= 5 and fields[1] in kwargs:
163 iface = fields[1]
164 expected = kwargs[iface]
165 actual = fields[-1]
166 if (actual != expected and
167 not (expected == 'managed' and actual != 'unmanaged')):
168 self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
169 interfaces.remove(iface)
170
171 # Ensure that all requested interfaces have been covered.
172 if interfaces:
173 self.fail("Missing links in status output: {}".format(interfaces))
174
175
176 class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
177 """Provide common methods for testing networkd against servers."""
178
179 def setUp(self):
180 self.write_network('port1.netdev', '''\
181 [NetDev]
182 Name=port1
183 Kind=dummy
184 MACAddress=12:34:56:78:9a:bc''')
185 self.write_network('port2.netdev', '''\
186 [NetDev]
187 Name=port2
188 Kind=dummy
189 MACAddress=12:34:56:78:9a:bd''')
190 self.write_network('mybridge.netdev', '''\
191 [NetDev]
192 Name=mybridge
193 Kind=bridge''')
194 self.write_network('port1.network', '''\
195 [Match]
196 Name=port1
197 [Network]
198 Bridge=mybridge''')
199 self.write_network('port2.network', '''\
200 [Match]
201 Name=port2
202 [Network]
203 Bridge=mybridge''')
204 self.write_network('mybridge.network', '''\
205 [Match]
206 Name=mybridge
207 [Network]
208 DNS=192.168.250.1
209 Address=192.168.250.33/24
210 Gateway=192.168.250.1''')
211 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
212 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
213
214 def tearDown(self):
215 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd'])
216 subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
217 subprocess.check_call(['ip', 'link', 'del', 'port1'])
218 subprocess.check_call(['ip', 'link', 'del', 'port2'])
219
220 def test_bridge_init(self):
221 self.assert_link_states(
222 port1='managed',
223 port2='managed',
224 mybridge='managed')
225
226 def test_bridge_port_priority(self):
227 self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
228 self.write_network_dropin('port1.network', 'priority', '''\
229 [Bridge]
230 Priority=28
231 ''')
232 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
233 self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
234
235 def test_bridge_port_priority_set_zero(self):
236 """It should be possible to set the bridge port priority to 0"""
237 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
238 self.write_network_dropin('port2.network', 'priority', '''\
239 [Bridge]
240 Priority=0
241 ''')
242 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
243 self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
244
245 def test_bridge_port_property(self):
246 """Test the "[Bridge]" section keys"""
247 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
248 self.write_network_dropin('port2.network', 'property', '''\
249 [Bridge]
250 UnicastFlood=true
251 HairPin=true
252 UseBPDU=true
253 FastLeave=true
254 AllowPortToBeRoot=true
255 Cost=555
256 Priority=23
257 ''')
258 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
259
260 self.assertEqual(self.read_attr('port2', 'brport/priority'), '23')
261 self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1')
262 self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555')
263 self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1')
264 self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1')
265 self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1')
266 self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1')
267
268 class ClientTestBase(NetworkdTestingUtilities):
269 """Provide common methods for testing networkd against servers."""
270
271 @classmethod
272 def setUpClass(klass):
273 klass.orig_log_level = subprocess.check_output(
274 ['systemctl', 'show', '--value', '--property', 'LogLevel'],
275 universal_newlines=True).strip()
276 subprocess.check_call(['systemd-analyze', 'set-log-level', 'debug'])
277
278 @classmethod
279 def tearDownClass(klass):
280 subprocess.check_call(['systemd-analyze', 'set-log-level', klass.orig_log_level])
281
282 def setUp(self):
283 self.iface = 'test_eth42'
284 self.if_router = 'router_eth42'
285 self.workdir_obj = tempfile.TemporaryDirectory()
286 self.workdir = self.workdir_obj.name
287 self.config = 'test_eth42.network'
288
289 # get current journal cursor
290 subprocess.check_output(['journalctl', '--sync'])
291 out = subprocess.check_output(['journalctl', '-b', '--quiet',
292 '--no-pager', '-n0', '--show-cursor'],
293 universal_newlines=True)
294 self.assertTrue(out.startswith('-- cursor:'))
295 self.journal_cursor = out.split()[-1]
296
297 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
298
299 def tearDown(self):
300 self.shutdown_iface()
301 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
302 subprocess.call(['ip', 'link', 'del', 'dummy0'],
303 stderr=subprocess.DEVNULL)
304
305 def show_journal(self, unit):
306 '''Show journal of given unit since start of the test'''
307
308 print('---- {} ----'.format(unit))
309 subprocess.check_output(['journalctl', '--sync'])
310 sys.stdout.flush()
311 subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
312 '--cursor', self.journal_cursor, '-u', unit])
313
314 def create_iface(self, ipv6=False):
315 '''Create test interface with DHCP server behind it'''
316
317 raise NotImplementedError('must be implemented by a subclass')
318
319 def shutdown_iface(self):
320 '''Remove test interface and stop DHCP server'''
321
322 raise NotImplementedError('must be implemented by a subclass')
323
324 def print_server_log(self):
325 '''Print DHCP server log for debugging failures'''
326
327 raise NotImplementedError('must be implemented by a subclass')
328
329 def do_test(self, coldplug=True, ipv6=False, extra_opts='',
330 online_timeout=10, dhcp_mode='yes'):
331 try:
332 subprocess.check_call(['systemctl', 'start', 'systemd-resolved'])
333 except subprocess.CalledProcessError:
334 self.show_journal('systemd-resolved.service')
335 raise
336 self.write_network(self.config, '''\
337 [Match]
338 Name={}
339 [Network]
340 DHCP={}
341 {}'''.format(self.iface, dhcp_mode, extra_opts))
342
343 if coldplug:
344 # create interface first, then start networkd
345 self.create_iface(ipv6=ipv6)
346 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
347 elif coldplug is not None:
348 # start networkd first, then create interface
349 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
350 self.create_iface(ipv6=ipv6)
351 else:
352 # "None" means test sets up interface by itself
353 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
354
355 try:
356 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
357 self.iface, '--timeout=%i' % online_timeout])
358
359 if ipv6:
360 # check iface state and IP 6 address; FIXME: we need to wait a bit
361 # longer, as the iface is "configured" already with IPv4 *or*
362 # IPv6, but we want to wait for both
363 for _ in range(10):
364 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
365 if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out:
366 break
367 time.sleep(1)
368 else:
369 self.fail('timed out waiting for IPv6 configuration')
370
371 self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
372 self.assertRegex(out, b'inet6 fe80::.* scope link')
373 else:
374 # should have link-local address on IPv6 only
375 out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
376 self.assertRegex(out, br'inet6 fe80::.* scope link')
377 self.assertNotIn(b'scope global', out)
378
379 # should have IPv4 address
380 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
381 self.assertIn(b'state UP', out)
382 self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
383
384 # check networkctl state
385 out = subprocess.check_output(['networkctl'])
386 self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
387 self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
388
389 out = subprocess.check_output(['networkctl', 'status', self.iface])
390 self.assertRegex(out, br'Type:\s+ether')
391 self.assertRegex(out, br'State:\s+routable.*configured')
392 self.assertRegex(out, br'Address:\s+192.168.5.\d+')
393 if ipv6:
394 self.assertRegex(out, br'2600::')
395 else:
396 self.assertNotIn(br'2600::', out)
397 self.assertRegex(out, br'fe80::')
398 self.assertRegex(out, br'Gateway:\s+192.168.5.1')
399 self.assertRegex(out, br'DNS:\s+192.168.5.1')
400 except (AssertionError, subprocess.CalledProcessError):
401 # show networkd status, journal, and DHCP server log on failure
402 with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
403 print('\n---- {} ----\n{}'.format(self.config, f.read()))
404 print('---- interface status ----')
405 sys.stdout.flush()
406 subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
407 print('---- networkctl status {} ----'.format(self.iface))
408 sys.stdout.flush()
409 subprocess.call(['networkctl', 'status', self.iface])
410 self.show_journal('systemd-networkd.service')
411 self.print_server_log()
412 raise
413
414 for timeout in range(50):
415 with open(RESOLV_CONF) as f:
416 contents = f.read()
417 if 'nameserver 192.168.5.1\n' in contents:
418 break
419 time.sleep(0.1)
420 else:
421 self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF)
422
423 if coldplug is False:
424 # check post-down.d hook
425 self.shutdown_iface()
426
427 def test_coldplug_dhcp_yes_ip4(self):
428 # we have a 12s timeout on RA, so we need to wait longer
429 self.do_test(coldplug=True, ipv6=False, online_timeout=15)
430
431 def test_coldplug_dhcp_yes_ip4_no_ra(self):
432 # with disabling RA explicitly things should be fast
433 self.do_test(coldplug=True, ipv6=False,
434 extra_opts='IPv6AcceptRA=False')
435
436 def test_coldplug_dhcp_ip4_only(self):
437 # we have a 12s timeout on RA, so we need to wait longer
438 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
439 online_timeout=15)
440
441 def test_coldplug_dhcp_ip4_only_no_ra(self):
442 # with disabling RA explicitly things should be fast
443 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
444 extra_opts='IPv6AcceptRA=False')
445
446 def test_coldplug_dhcp_ip6(self):
447 self.do_test(coldplug=True, ipv6=True)
448
449 def test_hotplug_dhcp_ip4(self):
450 # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
451 self.do_test(coldplug=False, ipv6=False, online_timeout=15)
452
453 def test_hotplug_dhcp_ip6(self):
454 self.do_test(coldplug=False, ipv6=True)
455
456 def test_route_only_dns(self):
457 self.write_network('myvpn.netdev', '''\
458 [NetDev]
459 Name=dummy0
460 Kind=dummy
461 MACAddress=12:34:56:78:9a:bc''')
462 self.write_network('myvpn.network', '''\
463 [Match]
464 Name=dummy0
465 [Network]
466 Address=192.168.42.100
467 DNS=192.168.42.1
468 Domains= ~company''')
469
470 self.do_test(coldplug=True, ipv6=False,
471 extra_opts='IPv6AcceptRouterAdvertisements=False')
472
473 with open(RESOLV_CONF) as f:
474 contents = f.read()
475 # ~company is not a search domain, only a routing domain
476 self.assertNotRegex(contents, 'search.*company')
477 # our global server should appear
478 self.assertIn('nameserver 192.168.5.1\n', contents)
479 # should not have domain-restricted server as global server
480 self.assertNotIn('nameserver 192.168.42.1\n', contents)
481
482 def test_route_only_dns_all_domains(self):
483 self.write_network('myvpn.netdev', '''[NetDev]
484 Name=dummy0
485 Kind=dummy
486 MACAddress=12:34:56:78:9a:bc''')
487 self.write_network('myvpn.network', '''[Match]
488 Name=dummy0
489 [Network]
490 Address=192.168.42.100
491 DNS=192.168.42.1
492 Domains= ~company ~.''')
493
494 self.do_test(coldplug=True, ipv6=False,
495 extra_opts='IPv6AcceptRouterAdvertisements=False')
496
497 with open(RESOLV_CONF) as f:
498 contents = f.read()
499
500 # ~company is not a search domain, only a routing domain
501 self.assertNotRegex(contents, 'search.*company')
502
503 # our global server should appear
504 self.assertIn('nameserver 192.168.5.1\n', contents)
505 # should have company server as global server due to ~.
506 self.assertIn('nameserver 192.168.42.1\n', contents)
507
508
509 @unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
510 class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
511 '''Test networkd client against dnsmasq'''
512
513 def setUp(self):
514 super().setUp()
515 self.dnsmasq = None
516 self.iface_mac = 'de:ad:be:ef:47:11'
517
518 def create_iface(self, ipv6=False, dnsmasq_opts=None):
519 '''Create test interface with DHCP server behind it'''
520
521 # add veth pair
522 subprocess.check_call(['ip', 'link', 'add', 'name', self.iface,
523 'address', self.iface_mac,
524 'type', 'veth', 'peer', 'name', self.if_router])
525
526 # give our router an IP
527 subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
528 subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
529 if ipv6:
530 subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
531 subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])
532
533 # add DHCP server
534 self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
535 lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
536 if ipv6:
537 extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
538 else:
539 extra_opts = []
540 if dnsmasq_opts:
541 extra_opts += dnsmasq_opts
542 self.dnsmasq = subprocess.Popen(
543 ['dnsmasq', '--keep-in-foreground', '--log-queries',
544 '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
545 '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
546 '--interface=' + self.if_router, '--except-interface=lo',
547 '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)
548
549 def shutdown_iface(self):
550 '''Remove test interface and stop DHCP server'''
551
552 if self.if_router:
553 subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
554 self.if_router = None
555 if self.dnsmasq:
556 self.dnsmasq.kill()
557 self.dnsmasq.wait()
558 self.dnsmasq = None
559
560 def print_server_log(self):
561 '''Print DHCP server log for debugging failures'''
562
563 with open(self.dnsmasq_log) as f:
564 sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
565
566 def test_resolved_domain_restricted_dns(self):
567 '''resolved: domain-restricted DNS servers'''
568
569 # create interface for generic connections; this will map all DNS names
570 # to 192.168.42.1
571 self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
572 self.write_network('general.network', '''\
573 [Match]
574 Name={}
575 [Network]
576 DHCP=ipv4
577 IPv6AcceptRA=False'''.format(self.iface))
578
579 # create second device/dnsmasq for a .company/.lab VPN interface
580 # static IPs for simplicity
581 self.add_veth_pair('testvpnclient', 'testvpnrouter')
582 subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
583 subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
584 subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
585
586 vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log')
587 vpn_dnsmasq = subprocess.Popen(
588 ['dnsmasq', '--keep-in-foreground', '--log-queries',
589 '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null',
590 '--dhcp-leasefile=/dev/null', '--bind-interfaces',
591 '--interface=testvpnrouter', '--except-interface=lo',
592 '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
593 self.addCleanup(vpn_dnsmasq.wait)
594 self.addCleanup(vpn_dnsmasq.kill)
595
596 self.write_network('vpn.network', '''\
597 [Match]
598 Name=testvpnclient
599 [Network]
600 IPv6AcceptRA=False
601 Address=10.241.3.2/24
602 DNS=10.241.3.1
603 Domains= ~company ~lab''')
604
605 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
606 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
607 '--interface=testvpnclient', '--timeout=20'])
608
609 # ensure we start fresh with every test
610 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved'])
611
612 # test vpnclient specific domains; these should *not* be answered by
613 # the general DNS
614 out = subprocess.check_output(['resolvectl', 'query', 'math.lab'])
615 self.assertIn(b'math.lab: 10.241.3.3', out)
616 out = subprocess.check_output(['resolvectl', 'query', 'kettle.cantina.company'])
617 self.assertIn(b'kettle.cantina.company: 10.241.4.4', out)
618
619 # test general domains
620 out = subprocess.check_output(['resolvectl', 'query', 'megasearch.net'])
621 self.assertIn(b'megasearch.net: 192.168.42.1', out)
622
623 with open(self.dnsmasq_log) as f:
624 general_log = f.read()
625 with open(vpn_dnsmasq_log) as f:
626 vpn_log = f.read()
627
628 # VPN domains should only be sent to VPN DNS
629 self.assertRegex(vpn_log, 'query.*math.lab')
630 self.assertRegex(vpn_log, 'query.*cantina.company')
631 self.assertNotIn('.lab', general_log)
632 self.assertNotIn('.company', general_log)
633
634 # general domains should not be sent to the VPN DNS
635 self.assertRegex(general_log, 'query.*megasearch.net')
636 self.assertNotIn('megasearch.net', vpn_log)
637
638 def test_resolved_etc_hosts(self):
639 '''resolved queries to /etc/hosts'''
640
641 # FIXME: -t MX query fails with enabled DNSSEC (even when using
642 # the known negative trust anchor .internal instead of .example)
643 conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
644 os.makedirs(os.path.dirname(conf), exist_ok=True)
645 with open(conf, 'w') as f:
646 f.write('[Resolve]\nDNSSEC=no')
647 self.addCleanup(os.remove, conf)
648
649 # create /etc/hosts bind mount which resolves my.example for IPv4
650 hosts = os.path.join(self.workdir, 'hosts')
651 with open(hosts, 'w') as f:
652 f.write('172.16.99.99 my.example\n')
653 subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
654 self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
655 subprocess.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
656
657 # note: different IPv4 address here, so that it's easy to tell apart
658 # what resolved the query
659 self.create_iface(dnsmasq_opts=['--host-record=my.example,172.16.99.1,2600::99:99',
660 '--host-record=other.example,172.16.0.42,2600::42',
661 '--mx-host=example,mail.example'],
662 ipv6=True)
663 self.do_test(coldplug=None, ipv6=True)
664
665 try:
666 # family specific queries
667 out = subprocess.check_output(['resolvectl', 'query', '-4', 'my.example'])
668 self.assertIn(b'my.example: 172.16.99.99', out)
669 # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
670 # it's considered a sufficient source
671 self.assertNotEqual(subprocess.call(['resolvectl', 'query', '-6', 'my.example']), 0)
672 # "any family" query; IPv4 should come from /etc/hosts
673 out = subprocess.check_output(['resolvectl', 'query', 'my.example'])
674 self.assertIn(b'my.example: 172.16.99.99', out)
675 # IP → name lookup; again, takes the /etc/hosts one
676 out = subprocess.check_output(['resolvectl', 'query', '172.16.99.99'])
677 self.assertIn(b'172.16.99.99: my.example', out)
678
679 # non-address RRs should fall back to DNS
680 out = subprocess.check_output(['resolvectl', 'query', '--type=MX', 'example'])
681 self.assertIn(b'example IN MX 1 mail.example', out)
682
683 # other domains query DNS
684 out = subprocess.check_output(['resolvectl', 'query', 'other.example'])
685 self.assertIn(b'172.16.0.42', out)
686 out = subprocess.check_output(['resolvectl', 'query', '172.16.0.42'])
687 self.assertIn(b'172.16.0.42: other.example', out)
688 except (AssertionError, subprocess.CalledProcessError):
689 self.show_journal('systemd-resolved.service')
690 self.print_server_log()
691 raise
692
693 def test_transient_hostname(self):
694 '''networkd sets transient hostname from DHCP'''
695
696 orig_hostname = socket.gethostname()
697 self.addCleanup(socket.sethostname, orig_hostname)
698 # temporarily move /etc/hostname away; restart hostnamed to pick it up
699 if os.path.exists('/etc/hostname'):
700 subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
701 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
702 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
703
704 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
705 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
706
707 try:
708 # should have received the fixed IP above
709 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
710 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
711 # should have set transient hostname in hostnamed; this is
712 # sometimes a bit lagging (issue #4753), so retry a few times
713 for retry in range(1, 6):
714 out = subprocess.check_output(['hostnamectl'])
715 if b'testgreen' in out:
716 break
717 time.sleep(5)
718 sys.stdout.write('[retry %i] ' % retry)
719 sys.stdout.flush()
720 else:
721 self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
722 # and also applied to the system
723 self.assertEqual(socket.gethostname(), 'testgreen')
724 except AssertionError:
725 self.show_journal('systemd-networkd.service')
726 self.show_journal('systemd-hostnamed.service')
727 self.print_server_log()
728 raise
729
730 def test_transient_hostname_with_static(self):
731 '''transient hostname is not applied if static hostname exists'''
732
733 orig_hostname = socket.gethostname()
734 self.addCleanup(socket.sethostname, orig_hostname)
735 if not os.path.exists('/etc/hostname'):
736 self.writeConfig('/etc/hostname', orig_hostname)
737 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
738
739 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
740 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
741
742 try:
743 # should have received the fixed IP above
744 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
745 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
746 # static hostname wins over transient one, thus *not* applied
747 self.assertEqual(socket.gethostname(), orig_hostname)
748 except AssertionError:
749 self.show_journal('systemd-networkd.service')
750 self.show_journal('systemd-hostnamed.service')
751 self.print_server_log()
752 raise
753
754
755 class NetworkdClientTest(ClientTestBase, unittest.TestCase):
756 '''Test networkd client against networkd server'''
757
758 def setUp(self):
759 super().setUp()
760 self.dnsmasq = None
761
762 def create_iface(self, ipv6=False, dhcpserver_opts=None):
763 '''Create test interface with DHCP server behind it'''
764
765 # run "router-side" networkd in own mount namespace to shield it from
766 # "client-side" configuration and networkd
767 (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh')
768 self.addCleanup(os.remove, script)
769 with os.fdopen(fd, 'w+') as f:
770 f.write('''\
771 #!/bin/sh
772 set -eu
773 mkdir -p /run/systemd/network
774 mkdir -p /run/systemd/netif
775 mount -t tmpfs none /run/systemd/network
776 mount -t tmpfs none /run/systemd/netif
777 [ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
778 # create router/client veth pair
779 cat << EOF > /run/systemd/network/test.netdev
780 [NetDev]
781 Name=%(ifr)s
782 Kind=veth
783
784 [Peer]
785 Name=%(ifc)s
786 EOF
787
788 cat << EOF > /run/systemd/network/test.network
789 [Match]
790 Name=%(ifr)s
791
792 [Network]
793 Address=192.168.5.1/24
794 %(addr6)s
795 DHCPServer=yes
796
797 [DHCPServer]
798 PoolOffset=10
799 PoolSize=50
800 DNS=192.168.5.1
801 %(dhopts)s
802 EOF
803
804 # run networkd as in systemd-networkd.service
805 exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
806 ''' % {'ifr': self.if_router, 'ifc': self.iface, 'addr6': ipv6 and 'Address=2600::1/64' or '',
807 'dhopts': dhcpserver_opts or ''})
808
809 os.fchmod(fd, 0o755)
810
811 subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service',
812 '-p', 'InaccessibleDirectories=-/etc/systemd/network',
813 '-p', 'InaccessibleDirectories=-/run/systemd/network',
814 '-p', 'InaccessibleDirectories=-/run/systemd/netif',
815 '--service-type=notify', script])
816
817 # wait until devices got created
818 for _ in range(50):
819 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
820 if b'state UP' in out and b'scope global' in out:
821 break
822 time.sleep(0.1)
823
824 def shutdown_iface(self):
825 '''Remove test interface and stop DHCP server'''
826
827 if self.if_router:
828 subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
829 # ensure failed transient unit does not stay around
830 subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
831 subprocess.call(['ip', 'link', 'del', 'dev', self.if_router])
832 self.if_router = None
833
834 def print_server_log(self):
835 '''Print DHCP server log for debugging failures'''
836
837 self.show_journal('networkd-test-router.service')
838
839 @unittest.skip('networkd does not have DHCPv6 server support')
840 def test_hotplug_dhcp_ip6(self):
841 pass
842
843 @unittest.skip('networkd does not have DHCPv6 server support')
844 def test_coldplug_dhcp_ip6(self):
845 pass
846
847 def test_search_domains(self):
848
849 # we don't use this interface for this test
850 self.if_router = None
851
852 self.write_network('test.netdev', '''\
853 [NetDev]
854 Name=dummy0
855 Kind=dummy
856 MACAddress=12:34:56:78:9a:bc''')
857 self.write_network('test.network', '''\
858 [Match]
859 Name=dummy0
860 [Network]
861 Address=192.168.42.100
862 DNS=192.168.42.1
863 Domains= one two three four five six seven eight nine ten''')
864
865 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
866
867 for timeout in range(50):
868 with open(RESOLV_CONF) as f:
869 contents = f.read()
870 if ' one' in contents:
871 break
872 time.sleep(0.1)
873 self.assertRegex(contents, 'search .*one two three four')
874 self.assertNotIn('seven\n', contents)
875 self.assertIn('# Too many search domains configured, remaining ones ignored.\n', contents)
876
877 def test_search_domains_too_long(self):
878
879 # we don't use this interface for this test
880 self.if_router = None
881
882 name_prefix = 'a' * 60
883
884 self.write_network('test.netdev', '''\
885 [NetDev]
886 Name=dummy0
887 Kind=dummy
888 MACAddress=12:34:56:78:9a:bc''')
889 self.write_network('test.network', '''\
890 [Match]
891 Name=dummy0
892 [Network]
893 Address=192.168.42.100
894 DNS=192.168.42.1
895 Domains={p}0 {p}1 {p}2 {p}3 {p}4'''.format(p=name_prefix))
896
897 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
898
899 for timeout in range(50):
900 with open(RESOLV_CONF) as f:
901 contents = f.read()
902 if ' one' in contents:
903 break
904 time.sleep(0.1)
905 self.assertRegex(contents, 'search .*{p}0 {p}1 {p}2'.format(p=name_prefix))
906 self.assertIn('# Total length of all search domains is too long, remaining ones ignored.', contents)
907
908 def test_dropin(self):
909 # we don't use this interface for this test
910 self.if_router = None
911
912 self.write_network('test.netdev', '''\
913 [NetDev]
914 Name=dummy0
915 Kind=dummy
916 MACAddress=12:34:56:78:9a:bc''')
917 self.write_network('test.network', '''\
918 [Match]
919 Name=dummy0
920 [Network]
921 Address=192.168.42.100
922 DNS=192.168.42.1''')
923 self.write_network_dropin('test.network', 'dns', '''\
924 [Network]
925 DNS=127.0.0.1''')
926
927 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
928
929 for timeout in range(50):
930 with open(RESOLV_CONF) as f:
931 contents = f.read()
932 if ' 127.0.0.1' in contents:
933 break
934 time.sleep(0.1)
935 self.assertIn('nameserver 192.168.42.1\n', contents)
936 self.assertIn('nameserver 127.0.0.1\n', contents)
937
938 def test_dhcp_timezone(self):
939 '''networkd sets time zone from DHCP'''
940
941 def get_tz():
942 out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
943 '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
944 assert out.startswith(b's "')
945 out = out.strip()
946 assert out.endswith(b'"')
947 return out[3:-1].decode()
948
949 orig_timezone = get_tz()
950 self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone])
951
952 self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
953 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4')
954
955 # should have applied the received timezone
956 try:
957 self.assertEqual(get_tz(), 'Pacific/Honolulu')
958 except AssertionError:
959 self.show_journal('systemd-networkd.service')
960 self.show_journal('systemd-hostnamed.service')
961 raise
962
963
964 class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
965 """Test [Match] sections in .network files.
966
967 Be aware that matching the test host's interfaces will wipe their
968 configuration, so as a precaution, all network files should have a
969 restrictive [Match] section to only ever interfere with the
970 temporary veth interfaces created here.
971 """
972
973 def tearDown(self):
974 """Stop networkd."""
975 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
976
977 def test_basic_matching(self):
978 """Verify the Name= line works throughout this class."""
979 self.add_veth_pair('test_if1', 'fake_if2')
980 self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
981 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
982 self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
983
984 def test_inverted_matching(self):
985 """Verify that a '!'-prefixed value inverts the match."""
986 # Use a MAC address as the interfaces' common matching attribute
987 # to avoid depending on udev, to support testing in containers.
988 mac = '00:01:02:03:98:99'
989 self.add_veth_pair('test_veth', 'test_peer',
990 ['addr', mac], ['addr', mac])
991 self.write_network('no-veth.network', """\
992 [Match]
993 MACAddress={}
994 Name=!nonexistent *peer*
995 [Network]""".format(mac))
996 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
997 self.assert_link_states(test_veth='managed', test_peer='unmanaged')
998
999
1000 class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
1001 """Test if networkd manages the correct interfaces."""
1002
1003 def setUp(self):
1004 """Write .network files to match the named veth devices."""
1005 # Define the veth+peer pairs to be created.
1006 # Their pairing doesn't actually matter, only their names do.
1007 self.veths = {
1008 'm1def': 'm0unm',
1009 'm1man': 'm1unm',
1010 }
1011
1012 # Define the contents of .network files to be read in order.
1013 self.configs = (
1014 "[Match]\nName=m1def\n",
1015 "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
1016 "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
1017 )
1018
1019 # Write out the .network files to be cleaned up automatically.
1020 for i, config in enumerate(self.configs):
1021 self.write_network("%02d-test.network" % i, config)
1022
1023 def tearDown(self):
1024 """Stop networkd."""
1025 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
1026
1027 def create_iface(self):
1028 """Create temporary veth pairs for interface matching."""
1029 for veth, peer in self.veths.items():
1030 self.add_veth_pair(veth, peer)
1031
1032 def test_unmanaged_setting(self):
1033 """Verify link states with Unmanaged= settings, hot-plug."""
1034 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1035 self.create_iface()
1036 self.assert_link_states(m1def='managed',
1037 m1man='managed',
1038 m1unm='unmanaged',
1039 m0unm='unmanaged')
1040
1041 def test_unmanaged_setting_coldplug(self):
1042 """Verify link states with Unmanaged= settings, cold-plug."""
1043 self.create_iface()
1044 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1045 self.assert_link_states(m1def='managed',
1046 m1man='managed',
1047 m1unm='unmanaged',
1048 m0unm='unmanaged')
1049
1050 def test_catchall_config(self):
1051 """Verify link states with a catch-all config, hot-plug."""
1052 # Don't actually catch ALL interfaces. It messes up the host.
1053 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1054 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1055 self.create_iface()
1056 self.assert_link_states(m1def='managed',
1057 m1man='managed',
1058 m1unm='unmanaged',
1059 m0unm='managed')
1060
1061 def test_catchall_config_coldplug(self):
1062 """Verify link states with a catch-all config, cold-plug."""
1063 # Don't actually catch ALL interfaces. It messes up the host.
1064 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1065 self.create_iface()
1066 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1067 self.assert_link_states(m1def='managed',
1068 m1man='managed',
1069 m1unm='unmanaged',
1070 m0unm='managed')
1071
1072
1073 if __name__ == '__main__':
1074 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
1075 verbosity=2))