]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/networkd-test.py
Merge pull request #11827 from keszybz/pkgconfig-variables
[thirdparty/systemd.git] / test / networkd-test.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: LGPL-2.1+
3 #
4 # networkd integration test
5 # This uses temporary configuration in /run and temporary veth devices, and
6 # does not write anything on disk or change any system configuration;
7 # but it assumes (and checks at the beginning) that networkd is not currently
8 # running.
9 #
10 # This can be run on a normal installation, in QEMU, nspawn (with
11 # --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
12 # or LXC system containers. You need at least the "ip" tool from the iproute
13 # package; it is recommended to install dnsmasq too to get full test coverage.
14 #
15 # ATTENTION: This uses the *installed* networkd, not the one from the built
16 # source tree.
17 #
18 # © 2015 Canonical Ltd.
19 # Author: Martin Pitt <martin.pitt@ubuntu.com>
20
21 import errno
22 import os
23 import shutil
24 import socket
25 import subprocess
26 import sys
27 import tempfile
28 import time
29 import unittest
30
31 HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
32
33 NETWORK_UNITDIR = '/run/systemd/network'
34
35 NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
36 path='/usr/lib/systemd:/lib/systemd')
37
38 RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
39
40 tmpmounts = []
41 running_units = []
42 stopped_units = []
43
44
45 def setUpModule():
46 global tmpmounts
47
48 """Initialize the environment, and perform sanity checks on it."""
49 if NETWORKD_WAIT_ONLINE is None:
50 raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
51
52 # Do not run any tests if the system is using networkd already and it's not virtualized
53 if (subprocess.call(['systemctl', 'is-active', '--quiet', 'systemd-networkd.service']) == 0 and
54 subprocess.call(['systemd-detect-virt', '--quiet']) != 0):
55 raise unittest.SkipTest('not virtualized and networkd is already active')
56
57 # Ensure we don't mess with an existing networkd config
58 for u in ['systemd-networkd.socket', 'systemd-networkd', 'systemd-resolved']:
59 if subprocess.call(['systemctl', 'is-active', '--quiet', u]) == 0:
60 subprocess.call(['systemctl', 'stop', u])
61 running_units.append(u)
62 else:
63 stopped_units.append(u)
64
65 # create static systemd-network user for networkd-test-router.service (it
66 # needs to do some stuff as root and can't start as user; but networkd
67 # still insists on the user)
68 subprocess.call(['adduser', '--system', '--no-create-home', 'systemd-network'])
69
70 for d in ['/etc/systemd/network', '/run/systemd/network',
71 '/run/systemd/netif', '/run/systemd/resolve']:
72 if os.path.isdir(d):
73 subprocess.check_call(["mount", "-t", "tmpfs", "none", d])
74 tmpmounts.append(d)
75 if os.path.isdir('/run/systemd/resolve'):
76 os.chmod('/run/systemd/resolve', 0o755)
77 shutil.chown('/run/systemd/resolve', 'systemd-resolve', 'systemd-resolve')
78 if os.path.isdir('/run/systemd/netif'):
79 os.chmod('/run/systemd/netif', 0o755)
80 shutil.chown('/run/systemd/netif', 'systemd-network', 'systemd-network')
81
82 # Avoid "Failed to open /dev/tty" errors in containers.
83 os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
84
85 # Ensure the unit directory exists so tests can dump files into it.
86 os.makedirs(NETWORK_UNITDIR, exist_ok=True)
87
88
89 def tearDownModule():
90 global tmpmounts
91 for d in tmpmounts:
92 subprocess.check_call(["umount", d])
93 for u in stopped_units:
94 subprocess.call(["systemctl", "stop", u])
95 for u in running_units:
96 subprocess.call(["systemctl", "restart", u])
97
98
99 class NetworkdTestingUtilities:
100 """Provide a set of utility functions to facilitate networkd tests.
101
102 This class must be inherited along with unittest.TestCase to define
103 some required methods.
104 """
105
106 def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
107 """Add a veth interface pair, and queue them to be removed."""
108 subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
109 list(veth_options) +
110 ['type', 'veth', 'peer', 'name', peer] +
111 list(peer_options))
112 self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
113
114 def write_config(self, path, contents):
115 """"Write a configuration file, and queue it to be removed."""
116
117 with open(path, 'w') as f:
118 f.write(contents)
119
120 self.addCleanup(os.remove, path)
121
122 def write_network(self, unit_name, contents):
123 """Write a network unit file, and queue it to be removed."""
124 self.write_config(os.path.join(NETWORK_UNITDIR, unit_name), contents)
125
126 def write_network_dropin(self, unit_name, dropin_name, contents):
127 """Write a network unit drop-in, and queue it to be removed."""
128 dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
129 dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
130
131 os.makedirs(dropin_dir, exist_ok=True)
132 self.addCleanup(os.rmdir, dropin_dir)
133 with open(dropin_path, 'w') as dropin:
134 dropin.write(contents)
135 self.addCleanup(os.remove, dropin_path)
136
137 def read_attr(self, link, attribute):
138 """Read a link attributed from the sysfs."""
139 # Note we we don't want to check if interface `link' is managed, we
140 # want to evaluate link variable and pass the value of the link to
141 # assert_link_states e.g. eth0=managed.
142 self.assert_link_states(**{link:'managed'})
143 with open(os.path.join('/sys/class/net', link, attribute)) as f:
144 return f.readline().strip()
145
146 def assert_link_states(self, **kwargs):
147 """Match networkctl link states to the given ones.
148
149 Each keyword argument should be the name of a network interface
150 with its expected value of the "SETUP" column in output from
151 networkctl. The interfaces have five seconds to come online
152 before the check is performed. Every specified interface must
153 be present in the output, and any other interfaces found in the
154 output are ignored.
155
156 A special interface state "managed" is supported, which matches
157 any value in the "SETUP" column other than "unmanaged".
158 """
159 if not kwargs:
160 return
161 interfaces = set(kwargs)
162
163 # Wait for the requested interfaces, but don't fail for them.
164 subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
165 ['--interface={}'.format(iface) for iface in kwargs])
166
167 # Validate each link state found in the networkctl output.
168 out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
169 for line in out.decode('utf-8').split('\n'):
170 fields = line.split()
171 if len(fields) >= 5 and fields[1] in kwargs:
172 iface = fields[1]
173 expected = kwargs[iface]
174 actual = fields[-1]
175 if (actual != expected and
176 not (expected == 'managed' and actual != 'unmanaged')):
177 self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
178 interfaces.remove(iface)
179
180 # Ensure that all requested interfaces have been covered.
181 if interfaces:
182 self.fail("Missing links in status output: {}".format(interfaces))
183
184
185 class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
186 """Provide common methods for testing networkd against servers."""
187
188 def setUp(self):
189 self.write_network('port1.netdev', '''\
190 [NetDev]
191 Name=port1
192 Kind=dummy
193 MACAddress=12:34:56:78:9a:bc''')
194 self.write_network('port2.netdev', '''\
195 [NetDev]
196 Name=port2
197 Kind=dummy
198 MACAddress=12:34:56:78:9a:bd''')
199 self.write_network('mybridge.netdev', '''\
200 [NetDev]
201 Name=mybridge
202 Kind=bridge''')
203 self.write_network('port1.network', '''\
204 [Match]
205 Name=port1
206 [Network]
207 Bridge=mybridge''')
208 self.write_network('port2.network', '''\
209 [Match]
210 Name=port2
211 [Network]
212 Bridge=mybridge''')
213 self.write_network('mybridge.network', '''\
214 [Match]
215 Name=mybridge
216 [Network]
217 DNS=192.168.250.1
218 Address=192.168.250.33/24
219 Gateway=192.168.250.1''')
220 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
221 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
222
223 def tearDown(self):
224 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd'])
225 subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
226 subprocess.check_call(['ip', 'link', 'del', 'port1'])
227 subprocess.check_call(['ip', 'link', 'del', 'port2'])
228
229 def test_bridge_init(self):
230 self.assert_link_states(
231 port1='managed',
232 port2='managed',
233 mybridge='managed')
234
235 def test_bridge_port_priority(self):
236 self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
237 self.write_network_dropin('port1.network', 'priority', '''\
238 [Bridge]
239 Priority=28
240 ''')
241 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
242 self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
243
244 def test_bridge_port_priority_set_zero(self):
245 """It should be possible to set the bridge port priority to 0"""
246 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
247 self.write_network_dropin('port2.network', 'priority', '''\
248 [Bridge]
249 Priority=0
250 ''')
251 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
252 self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
253
254 def test_bridge_port_property(self):
255 """Test the "[Bridge]" section keys"""
256 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
257 self.write_network_dropin('port2.network', 'property', '''\
258 [Bridge]
259 UnicastFlood=true
260 HairPin=true
261 UseBPDU=true
262 FastLeave=true
263 AllowPortToBeRoot=true
264 Cost=555
265 Priority=23
266 ''')
267 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
268
269 self.assertEqual(self.read_attr('port2', 'brport/priority'), '23')
270 self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1')
271 self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555')
272 self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1')
273 self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1')
274 self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1')
275 self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1')
276
277 class ClientTestBase(NetworkdTestingUtilities):
278 """Provide common methods for testing networkd against servers."""
279
280 @classmethod
281 def setUpClass(klass):
282 klass.orig_log_level = subprocess.check_output(
283 ['systemctl', 'show', '--value', '--property', 'LogLevel'],
284 universal_newlines=True).strip()
285 subprocess.check_call(['systemd-analyze', 'set-log-level', 'debug'])
286
287 @classmethod
288 def tearDownClass(klass):
289 subprocess.check_call(['systemd-analyze', 'set-log-level', klass.orig_log_level])
290
291 def setUp(self):
292 self.iface = 'test_eth42'
293 self.if_router = 'router_eth42'
294 self.workdir_obj = tempfile.TemporaryDirectory()
295 self.workdir = self.workdir_obj.name
296 self.config = 'test_eth42.network'
297
298 # get current journal cursor
299 subprocess.check_output(['journalctl', '--sync'])
300 out = subprocess.check_output(['journalctl', '-b', '--quiet',
301 '--no-pager', '-n0', '--show-cursor'],
302 universal_newlines=True)
303 self.assertTrue(out.startswith('-- cursor:'))
304 self.journal_cursor = out.split()[-1]
305
306 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
307
308 def tearDown(self):
309 self.shutdown_iface()
310 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
311 subprocess.call(['ip', 'link', 'del', 'dummy0'],
312 stderr=subprocess.DEVNULL)
313
314 def show_journal(self, unit):
315 '''Show journal of given unit since start of the test'''
316
317 print('---- {} ----'.format(unit))
318 subprocess.check_output(['journalctl', '--sync'])
319 sys.stdout.flush()
320 subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
321 '--cursor', self.journal_cursor, '-u', unit])
322
323 def create_iface(self, ipv6=False):
324 '''Create test interface with DHCP server behind it'''
325
326 raise NotImplementedError('must be implemented by a subclass')
327
328 def shutdown_iface(self):
329 '''Remove test interface and stop DHCP server'''
330
331 raise NotImplementedError('must be implemented by a subclass')
332
333 def print_server_log(self):
334 '''Print DHCP server log for debugging failures'''
335
336 raise NotImplementedError('must be implemented by a subclass')
337
338 def do_test(self, coldplug=True, ipv6=False, extra_opts='',
339 online_timeout=10, dhcp_mode='yes'):
340 try:
341 subprocess.check_call(['systemctl', 'start', 'systemd-resolved'])
342 except subprocess.CalledProcessError:
343 self.show_journal('systemd-resolved.service')
344 raise
345 self.write_network(self.config, '''\
346 [Match]
347 Name={}
348 [Network]
349 DHCP={}
350 {}'''.format(self.iface, dhcp_mode, extra_opts))
351
352 if coldplug:
353 # create interface first, then start networkd
354 self.create_iface(ipv6=ipv6)
355 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
356 elif coldplug is not None:
357 # start networkd first, then create interface
358 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
359 self.create_iface(ipv6=ipv6)
360 else:
361 # "None" means test sets up interface by itself
362 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
363
364 try:
365 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
366 self.iface, '--timeout=%i' % online_timeout])
367
368 if ipv6:
369 # check iface state and IP 6 address; FIXME: we need to wait a bit
370 # longer, as the iface is "configured" already with IPv4 *or*
371 # IPv6, but we want to wait for both
372 for _ in range(10):
373 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
374 if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out:
375 break
376 time.sleep(1)
377 else:
378 self.fail('timed out waiting for IPv6 configuration')
379
380 self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
381 self.assertRegex(out, b'inet6 fe80::.* scope link')
382 else:
383 # should have link-local address on IPv6 only
384 out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
385 self.assertRegex(out, br'inet6 fe80::.* scope link')
386 self.assertNotIn(b'scope global', out)
387
388 # should have IPv4 address
389 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
390 self.assertIn(b'state UP', out)
391 self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
392
393 # check networkctl state
394 out = subprocess.check_output(['networkctl'])
395 self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
396 self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
397
398 out = subprocess.check_output(['networkctl', 'status', self.iface])
399 self.assertRegex(out, br'Type:\s+ether')
400 self.assertRegex(out, br'State:\s+routable.*configured')
401 self.assertRegex(out, br'Address:\s+192.168.5.\d+')
402 if ipv6:
403 self.assertRegex(out, br'2600::')
404 else:
405 self.assertNotIn(br'2600::', out)
406 self.assertRegex(out, br'fe80::')
407 self.assertRegex(out, br'Gateway:\s+192.168.5.1')
408 self.assertRegex(out, br'DNS:\s+192.168.5.1')
409 except (AssertionError, subprocess.CalledProcessError):
410 # show networkd status, journal, and DHCP server log on failure
411 with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
412 print('\n---- {} ----\n{}'.format(self.config, f.read()))
413 print('---- interface status ----')
414 sys.stdout.flush()
415 subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
416 print('---- networkctl status {} ----'.format(self.iface))
417 sys.stdout.flush()
418 subprocess.call(['networkctl', 'status', self.iface])
419 self.show_journal('systemd-networkd.service')
420 self.print_server_log()
421 raise
422
423 for timeout in range(50):
424 with open(RESOLV_CONF) as f:
425 contents = f.read()
426 if 'nameserver 192.168.5.1\n' in contents:
427 break
428 time.sleep(0.1)
429 else:
430 self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF)
431
432 if coldplug is False:
433 # check post-down.d hook
434 self.shutdown_iface()
435
436 def test_coldplug_dhcp_yes_ip4(self):
437 # we have a 12s timeout on RA, so we need to wait longer
438 self.do_test(coldplug=True, ipv6=False, online_timeout=15)
439
440 def test_coldplug_dhcp_yes_ip4_no_ra(self):
441 # with disabling RA explicitly things should be fast
442 self.do_test(coldplug=True, ipv6=False,
443 extra_opts='IPv6AcceptRA=False')
444
445 def test_coldplug_dhcp_ip4_only(self):
446 # we have a 12s timeout on RA, so we need to wait longer
447 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
448 online_timeout=15)
449
450 def test_coldplug_dhcp_ip4_only_no_ra(self):
451 # with disabling RA explicitly things should be fast
452 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
453 extra_opts='IPv6AcceptRA=False')
454
455 def test_coldplug_dhcp_ip6(self):
456 self.do_test(coldplug=True, ipv6=True)
457
458 def test_hotplug_dhcp_ip4(self):
459 # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
460 self.do_test(coldplug=False, ipv6=False, online_timeout=15)
461
462 def test_hotplug_dhcp_ip6(self):
463 self.do_test(coldplug=False, ipv6=True)
464
465 def test_route_only_dns(self):
466 self.write_network('myvpn.netdev', '''\
467 [NetDev]
468 Name=dummy0
469 Kind=dummy
470 MACAddress=12:34:56:78:9a:bc''')
471 self.write_network('myvpn.network', '''\
472 [Match]
473 Name=dummy0
474 [Network]
475 Address=192.168.42.100
476 DNS=192.168.42.1
477 Domains= ~company''')
478
479 self.do_test(coldplug=True, ipv6=False,
480 extra_opts='IPv6AcceptRouterAdvertisements=False')
481
482 with open(RESOLV_CONF) as f:
483 contents = f.read()
484 # ~company is not a search domain, only a routing domain
485 self.assertNotRegex(contents, 'search.*company')
486 # our global server should appear
487 self.assertIn('nameserver 192.168.5.1\n', contents)
488 # should not have domain-restricted server as global server
489 self.assertNotIn('nameserver 192.168.42.1\n', contents)
490
491 def test_route_only_dns_all_domains(self):
492 self.write_network('myvpn.netdev', '''[NetDev]
493 Name=dummy0
494 Kind=dummy
495 MACAddress=12:34:56:78:9a:bc''')
496 self.write_network('myvpn.network', '''[Match]
497 Name=dummy0
498 [Network]
499 Address=192.168.42.100
500 DNS=192.168.42.1
501 Domains= ~company ~.''')
502
503 self.do_test(coldplug=True, ipv6=False,
504 extra_opts='IPv6AcceptRouterAdvertisements=False')
505
506 with open(RESOLV_CONF) as f:
507 contents = f.read()
508
509 # ~company is not a search domain, only a routing domain
510 self.assertNotRegex(contents, 'search.*company')
511
512 # our global server should appear
513 self.assertIn('nameserver 192.168.5.1\n', contents)
514 # should have company server as global server due to ~.
515 self.assertIn('nameserver 192.168.42.1\n', contents)
516
517
518 @unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
519 class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
520 '''Test networkd client against dnsmasq'''
521
522 def setUp(self):
523 super().setUp()
524 self.dnsmasq = None
525 self.iface_mac = 'de:ad:be:ef:47:11'
526
527 def create_iface(self, ipv6=False, dnsmasq_opts=None):
528 '''Create test interface with DHCP server behind it'''
529
530 # add veth pair
531 subprocess.check_call(['ip', 'link', 'add', 'name', self.iface,
532 'address', self.iface_mac,
533 'type', 'veth', 'peer', 'name', self.if_router])
534
535 # give our router an IP
536 subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
537 subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
538 if ipv6:
539 subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
540 subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])
541
542 # add DHCP server
543 self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
544 lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
545 if ipv6:
546 extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
547 else:
548 extra_opts = []
549 if dnsmasq_opts:
550 extra_opts += dnsmasq_opts
551 self.dnsmasq = subprocess.Popen(
552 ['dnsmasq', '--keep-in-foreground', '--log-queries',
553 '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
554 '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
555 '--interface=' + self.if_router, '--except-interface=lo',
556 '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)
557
558 def shutdown_iface(self):
559 '''Remove test interface and stop DHCP server'''
560
561 if self.if_router:
562 subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
563 self.if_router = None
564 if self.dnsmasq:
565 self.dnsmasq.kill()
566 self.dnsmasq.wait()
567 self.dnsmasq = None
568
569 def print_server_log(self):
570 '''Print DHCP server log for debugging failures'''
571
572 with open(self.dnsmasq_log) as f:
573 sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
574
575 def test_resolved_domain_restricted_dns(self):
576 '''resolved: domain-restricted DNS servers'''
577
578 # FIXME: resolvectl query fails with enabled DNSSEC against our dnsmasq
579 conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
580 os.makedirs(os.path.dirname(conf), exist_ok=True)
581 with open(conf, 'w') as f:
582 f.write('[Resolve]\nDNSSEC=no\n')
583 self.addCleanup(os.remove, conf)
584
585 # create interface for generic connections; this will map all DNS names
586 # to 192.168.42.1
587 self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
588 self.write_network('general.network', '''\
589 [Match]
590 Name={}
591 [Network]
592 DHCP=ipv4
593 IPv6AcceptRA=False'''.format(self.iface))
594
595 # create second device/dnsmasq for a .company/.lab VPN interface
596 # static IPs for simplicity
597 self.add_veth_pair('testvpnclient', 'testvpnrouter')
598 subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
599 subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
600 subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
601
602 vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log')
603 vpn_dnsmasq = subprocess.Popen(
604 ['dnsmasq', '--keep-in-foreground', '--log-queries',
605 '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null',
606 '--dhcp-leasefile=/dev/null', '--bind-interfaces',
607 '--interface=testvpnrouter', '--except-interface=lo',
608 '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
609 self.addCleanup(vpn_dnsmasq.wait)
610 self.addCleanup(vpn_dnsmasq.kill)
611
612 self.write_network('vpn.network', '''\
613 [Match]
614 Name=testvpnclient
615 [Network]
616 IPv6AcceptRA=False
617 Address=10.241.3.2/24
618 DNS=10.241.3.1
619 Domains= ~company ~lab''')
620
621 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
622 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
623 '--interface=testvpnclient', '--timeout=20'])
624
625 # ensure we start fresh with every test
626 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved'])
627
628 # test vpnclient specific domains; these should *not* be answered by
629 # the general DNS
630 out = subprocess.check_output(['resolvectl', 'query', 'math.lab'])
631 self.assertIn(b'math.lab: 10.241.3.3', out)
632 out = subprocess.check_output(['resolvectl', 'query', 'kettle.cantina.company'])
633 self.assertIn(b'kettle.cantina.company: 10.241.4.4', out)
634
635 # test general domains
636 out = subprocess.check_output(['resolvectl', 'query', 'megasearch.net'])
637 self.assertIn(b'megasearch.net: 192.168.42.1', out)
638
639 with open(self.dnsmasq_log) as f:
640 general_log = f.read()
641 with open(vpn_dnsmasq_log) as f:
642 vpn_log = f.read()
643
644 # VPN domains should only be sent to VPN DNS
645 self.assertRegex(vpn_log, 'query.*math.lab')
646 self.assertRegex(vpn_log, 'query.*cantina.company')
647 self.assertNotIn('.lab', general_log)
648 self.assertNotIn('.company', general_log)
649
650 # general domains should not be sent to the VPN DNS
651 self.assertRegex(general_log, 'query.*megasearch.net')
652 self.assertNotIn('megasearch.net', vpn_log)
653
654 def test_resolved_etc_hosts(self):
655 '''resolved queries to /etc/hosts'''
656
657 # FIXME: -t MX query fails with enabled DNSSEC (even when using
658 # the known negative trust anchor .internal instead of .example.com)
659 conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
660 os.makedirs(os.path.dirname(conf), exist_ok=True)
661 with open(conf, 'w') as f:
662 f.write('[Resolve]\nDNSSEC=no\nLLMNR=no\nMulticastDNS=no\n')
663 self.addCleanup(os.remove, conf)
664
665 # create /etc/hosts bind mount which resolves my.example.com for IPv4
666 hosts = os.path.join(self.workdir, 'hosts')
667 with open(hosts, 'w') as f:
668 f.write('172.16.99.99 my.example.com\n')
669 subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
670 self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
671 subprocess.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
672
673 # note: different IPv4 address here, so that it's easy to tell apart
674 # what resolved the query
675 self.create_iface(dnsmasq_opts=['--host-record=my.example.com,172.16.99.1,2600::99:99',
676 '--host-record=other.example.com,172.16.0.42,2600::42',
677 '--mx-host=example.com,mail.example.com'],
678 ipv6=True)
679 self.do_test(coldplug=None, ipv6=True)
680
681 try:
682 # family specific queries
683 out = subprocess.check_output(['resolvectl', 'query', '-4', 'my.example.com'])
684 self.assertIn(b'my.example.com: 172.16.99.99', out)
685 # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
686 # it's considered a sufficient source
687 self.assertNotEqual(subprocess.call(['resolvectl', 'query', '-6', 'my.example.com']), 0)
688 # "any family" query; IPv4 should come from /etc/hosts
689 out = subprocess.check_output(['resolvectl', 'query', 'my.example.com'])
690 self.assertIn(b'my.example.com: 172.16.99.99', out)
691 # IP → name lookup; again, takes the /etc/hosts one
692 out = subprocess.check_output(['resolvectl', 'query', '172.16.99.99'])
693 self.assertIn(b'172.16.99.99: my.example.com', out)
694
695 # non-address RRs should fall back to DNS
696 out = subprocess.check_output(['resolvectl', 'query', '--type=MX', 'example.com'])
697 self.assertIn(b'example.com IN MX 1 mail.example.com', out)
698
699 # other domains query DNS
700 out = subprocess.check_output(['resolvectl', 'query', 'other.example.com'])
701 self.assertIn(b'172.16.0.42', out)
702 out = subprocess.check_output(['resolvectl', 'query', '172.16.0.42'])
703 self.assertIn(b'172.16.0.42: other.example.com', out)
704 except (AssertionError, subprocess.CalledProcessError):
705 self.show_journal('systemd-resolved.service')
706 self.print_server_log()
707 raise
708
709 def test_transient_hostname(self):
710 '''networkd sets transient hostname from DHCP'''
711
712 orig_hostname = socket.gethostname()
713 self.addCleanup(socket.sethostname, orig_hostname)
714 # temporarily move /etc/hostname away; restart hostnamed to pick it up
715 if os.path.exists('/etc/hostname'):
716 subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
717 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
718 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
719 self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
720
721 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
722 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
723
724 try:
725 # should have received the fixed IP above
726 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
727 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
728 # should have set transient hostname in hostnamed; this is
729 # sometimes a bit lagging (issue #4753), so retry a few times
730 for retry in range(1, 6):
731 out = subprocess.check_output(['hostnamectl'])
732 if b'testgreen' in out:
733 break
734 time.sleep(5)
735 sys.stdout.write('[retry %i] ' % retry)
736 sys.stdout.flush()
737 else:
738 self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
739 # and also applied to the system
740 self.assertEqual(socket.gethostname(), 'testgreen')
741 except AssertionError:
742 self.show_journal('systemd-networkd.service')
743 self.show_journal('systemd-hostnamed.service')
744 self.print_server_log()
745 raise
746
747 def test_transient_hostname_with_static(self):
748 '''transient hostname is not applied if static hostname exists'''
749
750 orig_hostname = socket.gethostname()
751 self.addCleanup(socket.sethostname, orig_hostname)
752
753 if not os.path.exists('/etc/hostname'):
754 self.write_config('/etc/hostname', "foobarqux")
755 else:
756 self.write_config('/run/hostname.tmp', "foobarqux")
757 subprocess.check_call(['mount', '--bind', '/run/hostname.tmp', '/etc/hostname'])
758 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
759
760 socket.sethostname("foobarqux");
761
762 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
763 self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
764
765 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
766 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
767
768 try:
769 # should have received the fixed IP above
770 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
771 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
772 # static hostname wins over transient one, thus *not* applied
773 self.assertEqual(socket.gethostname(), "foobarqux")
774 except AssertionError:
775 self.show_journal('systemd-networkd.service')
776 self.show_journal('systemd-hostnamed.service')
777 self.print_server_log()
778 raise
779
780
781 class NetworkdClientTest(ClientTestBase, unittest.TestCase):
782 '''Test networkd client against networkd server'''
783
784 def setUp(self):
785 super().setUp()
786 self.dnsmasq = None
787
788 def create_iface(self, ipv6=False, dhcpserver_opts=None):
789 '''Create test interface with DHCP server behind it'''
790
791 # run "router-side" networkd in own mount namespace to shield it from
792 # "client-side" configuration and networkd
793 (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh')
794 self.addCleanup(os.remove, script)
795 with os.fdopen(fd, 'w+') as f:
796 f.write('''\
797 #!/bin/sh
798 set -eu
799 mkdir -p /run/systemd/network
800 mkdir -p /run/systemd/netif
801 mount -t tmpfs none /run/systemd/network
802 mount -t tmpfs none /run/systemd/netif
803 [ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
804 # create router/client veth pair
805 cat << EOF > /run/systemd/network/test.netdev
806 [NetDev]
807 Name=%(ifr)s
808 Kind=veth
809
810 [Peer]
811 Name=%(ifc)s
812 EOF
813
814 cat << EOF > /run/systemd/network/test.network
815 [Match]
816 Name=%(ifr)s
817
818 [Network]
819 Address=192.168.5.1/24
820 %(addr6)s
821 DHCPServer=yes
822
823 [DHCPServer]
824 PoolOffset=10
825 PoolSize=50
826 DNS=192.168.5.1
827 %(dhopts)s
828 EOF
829
830 # run networkd as in systemd-networkd.service
831 exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
832 ''' % {'ifr': self.if_router, 'ifc': self.iface, 'addr6': ipv6 and 'Address=2600::1/64' or '',
833 'dhopts': dhcpserver_opts or ''})
834
835 os.fchmod(fd, 0o755)
836
837 subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service',
838 '-p', 'InaccessibleDirectories=-/etc/systemd/network',
839 '-p', 'InaccessibleDirectories=-/run/systemd/network',
840 '-p', 'InaccessibleDirectories=-/run/systemd/netif',
841 '--service-type=notify', script])
842
843 # wait until devices got created
844 for _ in range(50):
845 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
846 if b'state UP' in out and b'scope global' in out:
847 break
848 time.sleep(0.1)
849
850 def shutdown_iface(self):
851 '''Remove test interface and stop DHCP server'''
852
853 if self.if_router:
854 subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
855 # ensure failed transient unit does not stay around
856 subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
857 subprocess.call(['ip', 'link', 'del', 'dev', self.if_router])
858 self.if_router = None
859
860 def print_server_log(self):
861 '''Print DHCP server log for debugging failures'''
862
863 self.show_journal('networkd-test-router.service')
864
865 @unittest.skip('networkd does not have DHCPv6 server support')
866 def test_hotplug_dhcp_ip6(self):
867 pass
868
869 @unittest.skip('networkd does not have DHCPv6 server support')
870 def test_coldplug_dhcp_ip6(self):
871 pass
872
873 def test_search_domains(self):
874
875 # we don't use this interface for this test
876 self.if_router = None
877
878 self.write_network('test.netdev', '''\
879 [NetDev]
880 Name=dummy0
881 Kind=dummy
882 MACAddress=12:34:56:78:9a:bc''')
883 self.write_network('test.network', '''\
884 [Match]
885 Name=dummy0
886 [Network]
887 Address=192.168.42.100
888 DNS=192.168.42.1
889 Domains= one two three four five six seven eight nine ten''')
890
891 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
892
893 for timeout in range(50):
894 with open(RESOLV_CONF) as f:
895 contents = f.read()
896 if ' one' in contents:
897 break
898 time.sleep(0.1)
899 self.assertRegex(contents, 'search .*one two three four')
900 self.assertNotIn('seven\n', contents)
901 self.assertIn('# Too many search domains configured, remaining ones ignored.\n', contents)
902
903 def test_search_domains_too_long(self):
904
905 # we don't use this interface for this test
906 self.if_router = None
907
908 name_prefix = 'a' * 60
909
910 self.write_network('test.netdev', '''\
911 [NetDev]
912 Name=dummy0
913 Kind=dummy
914 MACAddress=12:34:56:78:9a:bc''')
915 self.write_network('test.network', '''\
916 [Match]
917 Name=dummy0
918 [Network]
919 Address=192.168.42.100
920 DNS=192.168.42.1
921 Domains={p}0 {p}1 {p}2 {p}3 {p}4'''.format(p=name_prefix))
922
923 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
924
925 for timeout in range(50):
926 with open(RESOLV_CONF) as f:
927 contents = f.read()
928 if ' one' in contents:
929 break
930 time.sleep(0.1)
931 self.assertRegex(contents, 'search .*{p}0 {p}1 {p}2'.format(p=name_prefix))
932 self.assertIn('# Total length of all search domains is too long, remaining ones ignored.', contents)
933
934 def test_dropin(self):
935 # we don't use this interface for this test
936 self.if_router = None
937
938 self.write_network('test.netdev', '''\
939 [NetDev]
940 Name=dummy0
941 Kind=dummy
942 MACAddress=12:34:56:78:9a:bc''')
943 self.write_network('test.network', '''\
944 [Match]
945 Name=dummy0
946 [Network]
947 Address=192.168.42.100
948 DNS=192.168.42.1''')
949 self.write_network_dropin('test.network', 'dns', '''\
950 [Network]
951 DNS=127.0.0.1''')
952
953 subprocess.check_call(['systemctl', 'start', 'systemd-resolved', 'systemd-networkd'])
954
955 for timeout in range(50):
956 with open(RESOLV_CONF) as f:
957 contents = f.read()
958 if ' 127.0.0.1' in contents and '192.168.42.1' in contents:
959 break
960 time.sleep(0.1)
961 self.assertIn('nameserver 192.168.42.1\n', contents)
962 self.assertIn('nameserver 127.0.0.1\n', contents)
963
964 def test_dhcp_timezone(self):
965 '''networkd sets time zone from DHCP'''
966
967 def get_tz():
968 out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
969 '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
970 assert out.startswith(b's "')
971 out = out.strip()
972 assert out.endswith(b'"')
973 return out[3:-1].decode()
974
975 orig_timezone = get_tz()
976 self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone])
977
978 self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
979 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4')
980
981 # should have applied the received timezone
982 try:
983 self.assertEqual(get_tz(), 'Pacific/Honolulu')
984 except AssertionError:
985 self.show_journal('systemd-networkd.service')
986 self.show_journal('systemd-hostnamed.service')
987 raise
988
989
990 class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
991 """Test [Match] sections in .network files.
992
993 Be aware that matching the test host's interfaces will wipe their
994 configuration, so as a precaution, all network files should have a
995 restrictive [Match] section to only ever interfere with the
996 temporary veth interfaces created here.
997 """
998
999 def tearDown(self):
1000 """Stop networkd."""
1001 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
1002
1003 def test_basic_matching(self):
1004 """Verify the Name= line works throughout this class."""
1005 self.add_veth_pair('test_if1', 'fake_if2')
1006 self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
1007 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1008 self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
1009
1010 def test_inverted_matching(self):
1011 """Verify that a '!'-prefixed value inverts the match."""
1012 # Use a MAC address as the interfaces' common matching attribute
1013 # to avoid depending on udev, to support testing in containers.
1014 mac = '00:01:02:03:98:99'
1015 self.add_veth_pair('test_veth', 'test_peer',
1016 ['addr', mac], ['addr', mac])
1017 self.write_network('no-veth.network', """\
1018 [Match]
1019 MACAddress={}
1020 Name=!nonexistent *peer*
1021 [Network]""".format(mac))
1022 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1023 self.assert_link_states(test_veth='managed', test_peer='unmanaged')
1024
1025
1026 class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
1027 """Test if networkd manages the correct interfaces."""
1028
1029 def setUp(self):
1030 """Write .network files to match the named veth devices."""
1031 # Define the veth+peer pairs to be created.
1032 # Their pairing doesn't actually matter, only their names do.
1033 self.veths = {
1034 'm1def': 'm0unm',
1035 'm1man': 'm1unm',
1036 }
1037
1038 # Define the contents of .network files to be read in order.
1039 self.configs = (
1040 "[Match]\nName=m1def\n",
1041 "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
1042 "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
1043 )
1044
1045 # Write out the .network files to be cleaned up automatically.
1046 for i, config in enumerate(self.configs):
1047 self.write_network("%02d-test.network" % i, config)
1048
1049 def tearDown(self):
1050 """Stop networkd."""
1051 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
1052
1053 def create_iface(self):
1054 """Create temporary veth pairs for interface matching."""
1055 for veth, peer in self.veths.items():
1056 self.add_veth_pair(veth, peer)
1057
1058 def test_unmanaged_setting(self):
1059 """Verify link states with Unmanaged= settings, hot-plug."""
1060 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1061 self.create_iface()
1062 self.assert_link_states(m1def='managed',
1063 m1man='managed',
1064 m1unm='unmanaged',
1065 m0unm='unmanaged')
1066
1067 def test_unmanaged_setting_coldplug(self):
1068 """Verify link states with Unmanaged= settings, cold-plug."""
1069 self.create_iface()
1070 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1071 self.assert_link_states(m1def='managed',
1072 m1man='managed',
1073 m1unm='unmanaged',
1074 m0unm='unmanaged')
1075
1076 def test_catchall_config(self):
1077 """Verify link states with a catch-all config, hot-plug."""
1078 # Don't actually catch ALL interfaces. It messes up the host.
1079 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1080 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1081 self.create_iface()
1082 self.assert_link_states(m1def='managed',
1083 m1man='managed',
1084 m1unm='unmanaged',
1085 m0unm='managed')
1086
1087 def test_catchall_config_coldplug(self):
1088 """Verify link states with a catch-all config, cold-plug."""
1089 # Don't actually catch ALL interfaces. It messes up the host.
1090 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1091 self.create_iface()
1092 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1093 self.assert_link_states(m1def='managed',
1094 m1man='managed',
1095 m1unm='unmanaged',
1096 m0unm='managed')
1097
1098
1099 if __name__ == '__main__':
1100 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
1101 verbosity=2))