]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/networkd-test.py
build-sys: bump package/library versions
[thirdparty/systemd.git] / test / networkd-test.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: LGPL-2.1+
3 #
4 # networkd integration test
5 # This uses temporary configuration in /run and temporary veth devices, and
6 # does not write anything on disk or change any system configuration;
7 # but it assumes (and checks at the beginning) that networkd is not currently
8 # running.
9 #
10 # This can be run on a normal installation, in QEMU, nspawn (with
11 # --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
12 # or LXC system containers. You need at least the "ip" tool from the iproute
13 # package; it is recommended to install dnsmasq too to get full test coverage.
14 #
15 # ATTENTION: This uses the *installed* networkd, not the one from the built
16 # source tree.
17 #
18 # © 2015 Canonical Ltd.
19 # Author: Martin Pitt <martin.pitt@ubuntu.com>
20
21 import errno
22 import os
23 import shutil
24 import socket
25 import subprocess
26 import sys
27 import tempfile
28 import time
29 import unittest
30
31 HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
32
33 NETWORK_UNITDIR = '/run/systemd/network'
34
35 NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
36 path='/usr/lib/systemd:/lib/systemd')
37
38 RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
39
40 tmpmounts = []
41 running_units = []
42 stopped_units = []
43
44
45 def setUpModule():
46 global tmpmounts
47
48 """Initialize the environment, and perform sanity checks on it."""
49 if NETWORKD_WAIT_ONLINE is None:
50 raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
51
52 # Do not run any tests if the system is using networkd already and it's not virtualized
53 if (subprocess.call(['systemctl', 'is-active', '--quiet', 'systemd-networkd.service']) == 0 and
54 subprocess.call(['systemd-detect-virt', '--quiet']) != 0):
55 raise unittest.SkipTest('not virtualized and networkd is already active')
56
57 # Ensure we don't mess with an existing networkd config
58 for u in ['systemd-networkd.socket', 'systemd-networkd', 'systemd-resolved']:
59 if subprocess.call(['systemctl', 'is-active', '--quiet', u]) == 0:
60 subprocess.call(['systemctl', 'stop', u])
61 running_units.append(u)
62 else:
63 stopped_units.append(u)
64
65 # create static systemd-network user for networkd-test-router.service (it
66 # needs to do some stuff as root and can't start as user; but networkd
67 # still insists on the user)
68 subprocess.call(['adduser', '--system', '--no-create-home', 'systemd-network'])
69
70 for d in ['/etc/systemd/network', '/run/systemd/network',
71 '/run/systemd/netif', '/run/systemd/resolve']:
72 if os.path.isdir(d):
73 subprocess.check_call(["mount", "-t", "tmpfs", "none", d])
74 tmpmounts.append(d)
75 if os.path.isdir('/run/systemd/resolve'):
76 os.chmod('/run/systemd/resolve', 0o755)
77 shutil.chown('/run/systemd/resolve', 'systemd-resolve', 'systemd-resolve')
78 if os.path.isdir('/run/systemd/netif'):
79 os.chmod('/run/systemd/netif', 0o755)
80 shutil.chown('/run/systemd/netif', 'systemd-network', 'systemd-network')
81
82 # Avoid "Failed to open /dev/tty" errors in containers.
83 os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
84
85 # Ensure the unit directory exists so tests can dump files into it.
86 os.makedirs(NETWORK_UNITDIR, exist_ok=True)
87
88
89 def tearDownModule():
90 global tmpmounts
91 for d in tmpmounts:
92 subprocess.check_call(["umount", d])
93 for u in stopped_units:
94 subprocess.call(["systemctl", "stop", u])
95 for u in running_units:
96 subprocess.call(["systemctl", "restart", u])
97
98
99 class NetworkdTestingUtilities:
100 """Provide a set of utility functions to facilitate networkd tests.
101
102 This class must be inherited along with unittest.TestCase to define
103 some required methods.
104 """
105
106 def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
107 """Add a veth interface pair, and queue them to be removed."""
108 subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
109 list(veth_options) +
110 ['type', 'veth', 'peer', 'name', peer] +
111 list(peer_options))
112 self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
113
114 def write_config(self, path, contents):
115 """"Write a configuration file, and queue it to be removed."""
116
117 with open(path, 'w') as f:
118 f.write(contents)
119
120 self.addCleanup(os.remove, path)
121
122 def write_network(self, unit_name, contents):
123 """Write a network unit file, and queue it to be removed."""
124 self.write_config(os.path.join(NETWORK_UNITDIR, unit_name), contents)
125
126 def write_network_dropin(self, unit_name, dropin_name, contents):
127 """Write a network unit drop-in, and queue it to be removed."""
128 dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
129 dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
130
131 os.makedirs(dropin_dir, exist_ok=True)
132 self.addCleanup(os.rmdir, dropin_dir)
133 with open(dropin_path, 'w') as dropin:
134 dropin.write(contents)
135 self.addCleanup(os.remove, dropin_path)
136
137 def read_attr(self, link, attribute):
138 """Read a link attributed from the sysfs."""
139 # Note we we don't want to check if interface `link' is managed, we
140 # want to evaluate link variable and pass the value of the link to
141 # assert_link_states e.g. eth0=managed.
142 self.assert_link_states(**{link:'managed'})
143 with open(os.path.join('/sys/class/net', link, attribute)) as f:
144 return f.readline().strip()
145
146 def assert_link_states(self, **kwargs):
147 """Match networkctl link states to the given ones.
148
149 Each keyword argument should be the name of a network interface
150 with its expected value of the "SETUP" column in output from
151 networkctl. The interfaces have five seconds to come online
152 before the check is performed. Every specified interface must
153 be present in the output, and any other interfaces found in the
154 output are ignored.
155
156 A special interface state "managed" is supported, which matches
157 any value in the "SETUP" column other than "unmanaged".
158 """
159 if not kwargs:
160 return
161 interfaces = set(kwargs)
162
163 # Wait for the requested interfaces, but don't fail for them.
164 subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
165 ['--interface={}'.format(iface) for iface in kwargs])
166
167 # Validate each link state found in the networkctl output.
168 out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
169 for line in out.decode('utf-8').split('\n'):
170 fields = line.split()
171 if len(fields) >= 5 and fields[1] in kwargs:
172 iface = fields[1]
173 expected = kwargs[iface]
174 actual = fields[-1]
175 if (actual != expected and
176 not (expected == 'managed' and actual != 'unmanaged')):
177 self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
178 interfaces.remove(iface)
179
180 # Ensure that all requested interfaces have been covered.
181 if interfaces:
182 self.fail("Missing links in status output: {}".format(interfaces))
183
184
185 class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
186 """Provide common methods for testing networkd against servers."""
187
188 def setUp(self):
189 self.write_network('port1.netdev', '''\
190 [NetDev]
191 Name=port1
192 Kind=dummy
193 MACAddress=12:34:56:78:9a:bc''')
194 self.write_network('port2.netdev', '''\
195 [NetDev]
196 Name=port2
197 Kind=dummy
198 MACAddress=12:34:56:78:9a:bd''')
199 self.write_network('mybridge.netdev', '''\
200 [NetDev]
201 Name=mybridge
202 Kind=bridge''')
203 self.write_network('port1.network', '''\
204 [Match]
205 Name=port1
206 [Network]
207 Bridge=mybridge''')
208 self.write_network('port2.network', '''\
209 [Match]
210 Name=port2
211 [Network]
212 Bridge=mybridge''')
213 self.write_network('mybridge.network', '''\
214 [Match]
215 Name=mybridge
216 [Network]
217 DNS=192.168.250.1
218 Address=192.168.250.33/24
219 Gateway=192.168.250.1''')
220 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
221 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
222
223 def tearDown(self):
224 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd'])
225 subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
226 subprocess.check_call(['ip', 'link', 'del', 'port1'])
227 subprocess.check_call(['ip', 'link', 'del', 'port2'])
228
229 def test_bridge_init(self):
230 self.assert_link_states(
231 port1='managed',
232 port2='managed',
233 mybridge='managed')
234
235 def test_bridge_port_priority(self):
236 self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
237 self.write_network_dropin('port1.network', 'priority', '''\
238 [Bridge]
239 Priority=28
240 ''')
241 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
242 self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
243
244 def test_bridge_port_priority_set_zero(self):
245 """It should be possible to set the bridge port priority to 0"""
246 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
247 self.write_network_dropin('port2.network', 'priority', '''\
248 [Bridge]
249 Priority=0
250 ''')
251 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
252 self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
253
254 def test_bridge_port_property(self):
255 """Test the "[Bridge]" section keys"""
256 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
257 self.write_network_dropin('port2.network', 'property', '''\
258 [Bridge]
259 UnicastFlood=true
260 HairPin=true
261 UseBPDU=true
262 FastLeave=true
263 AllowPortToBeRoot=true
264 Cost=555
265 Priority=23
266 ''')
267 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
268
269 self.assertEqual(self.read_attr('port2', 'brport/priority'), '23')
270 self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1')
271 self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555')
272 self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1')
273 self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1')
274 self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1')
275 self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1')
276
277 class ClientTestBase(NetworkdTestingUtilities):
278 """Provide common methods for testing networkd against servers."""
279
280 @classmethod
281 def setUpClass(klass):
282 klass.orig_log_level = subprocess.check_output(
283 ['systemctl', 'show', '--value', '--property', 'LogLevel'],
284 universal_newlines=True).strip()
285 subprocess.check_call(['systemd-analyze', 'set-log-level', 'debug'])
286
287 @classmethod
288 def tearDownClass(klass):
289 subprocess.check_call(['systemd-analyze', 'set-log-level', klass.orig_log_level])
290
291 def setUp(self):
292 self.iface = 'test_eth42'
293 self.if_router = 'router_eth42'
294 self.workdir_obj = tempfile.TemporaryDirectory()
295 self.workdir = self.workdir_obj.name
296 self.config = 'test_eth42.network'
297
298 # get current journal cursor
299 subprocess.check_output(['journalctl', '--sync'])
300 out = subprocess.check_output(['journalctl', '-b', '--quiet',
301 '--no-pager', '-n0', '--show-cursor'],
302 universal_newlines=True)
303 self.assertTrue(out.startswith('-- cursor:'))
304 self.journal_cursor = out.split()[-1]
305
306 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
307
308 def tearDown(self):
309 self.shutdown_iface()
310 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
311 subprocess.call(['ip', 'link', 'del', 'dummy0'],
312 stderr=subprocess.DEVNULL)
313
314 def show_journal(self, unit):
315 '''Show journal of given unit since start of the test'''
316
317 print('---- {} ----'.format(unit))
318 subprocess.check_output(['journalctl', '--sync'])
319 sys.stdout.flush()
320 subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
321 '--cursor', self.journal_cursor, '-u', unit])
322
323 def create_iface(self, ipv6=False):
324 '''Create test interface with DHCP server behind it'''
325
326 raise NotImplementedError('must be implemented by a subclass')
327
328 def shutdown_iface(self):
329 '''Remove test interface and stop DHCP server'''
330
331 raise NotImplementedError('must be implemented by a subclass')
332
333 def print_server_log(self):
334 '''Print DHCP server log for debugging failures'''
335
336 raise NotImplementedError('must be implemented by a subclass')
337
338 def do_test(self, coldplug=True, ipv6=False, extra_opts='',
339 online_timeout=10, dhcp_mode='yes'):
340 try:
341 subprocess.check_call(['systemctl', 'start', 'systemd-resolved'])
342 except subprocess.CalledProcessError:
343 self.show_journal('systemd-resolved.service')
344 raise
345 self.write_network(self.config, '''\
346 [Match]
347 Name={}
348 [Network]
349 DHCP={}
350 {}'''.format(self.iface, dhcp_mode, extra_opts))
351
352 if coldplug:
353 # create interface first, then start networkd
354 self.create_iface(ipv6=ipv6)
355 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
356 elif coldplug is not None:
357 # start networkd first, then create interface
358 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
359 self.create_iface(ipv6=ipv6)
360 else:
361 # "None" means test sets up interface by itself
362 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
363
364 try:
365 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
366 self.iface, '--timeout=%i' % online_timeout])
367
368 if ipv6:
369 # check iface state and IP 6 address; FIXME: we need to wait a bit
370 # longer, as the iface is "configured" already with IPv4 *or*
371 # IPv6, but we want to wait for both
372 for _ in range(10):
373 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
374 if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out:
375 break
376 time.sleep(1)
377 else:
378 self.fail('timed out waiting for IPv6 configuration')
379
380 self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
381 self.assertRegex(out, b'inet6 fe80::.* scope link')
382 else:
383 # should have link-local address on IPv6 only
384 out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
385 self.assertRegex(out, br'inet6 fe80::.* scope link')
386 self.assertNotIn(b'scope global', out)
387
388 # should have IPv4 address
389 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
390 self.assertIn(b'state UP', out)
391 self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
392
393 # check networkctl state
394 out = subprocess.check_output(['networkctl'])
395 self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
396 self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
397
398 out = subprocess.check_output(['networkctl', 'status', self.iface])
399 self.assertRegex(out, br'Type:\s+ether')
400 self.assertRegex(out, br'State:\s+routable.*configured')
401 self.assertRegex(out, br'Address:\s+192.168.5.\d+')
402 if ipv6:
403 self.assertRegex(out, br'2600::')
404 else:
405 self.assertNotIn(br'2600::', out)
406 self.assertRegex(out, br'fe80::')
407 self.assertRegex(out, br'Gateway:\s+192.168.5.1')
408 self.assertRegex(out, br'DNS:\s+192.168.5.1')
409 except (AssertionError, subprocess.CalledProcessError):
410 # show networkd status, journal, and DHCP server log on failure
411 with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
412 print('\n---- {} ----\n{}'.format(self.config, f.read()))
413 print('---- interface status ----')
414 sys.stdout.flush()
415 subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
416 print('---- networkctl status {} ----'.format(self.iface))
417 sys.stdout.flush()
418 subprocess.call(['networkctl', 'status', self.iface])
419 self.show_journal('systemd-networkd.service')
420 self.print_server_log()
421 raise
422
423 for timeout in range(50):
424 with open(RESOLV_CONF) as f:
425 contents = f.read()
426 if 'nameserver 192.168.5.1\n' in contents:
427 break
428 time.sleep(0.1)
429 else:
430 self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF)
431
432 if coldplug is False:
433 # check post-down.d hook
434 self.shutdown_iface()
435
436 def test_coldplug_dhcp_yes_ip4(self):
437 # we have a 12s timeout on RA, so we need to wait longer
438 self.do_test(coldplug=True, ipv6=False, online_timeout=15)
439
440 def test_coldplug_dhcp_yes_ip4_no_ra(self):
441 # with disabling RA explicitly things should be fast
442 self.do_test(coldplug=True, ipv6=False,
443 extra_opts='IPv6AcceptRA=False')
444
445 def test_coldplug_dhcp_ip4_only(self):
446 # we have a 12s timeout on RA, so we need to wait longer
447 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
448 online_timeout=15)
449
450 def test_coldplug_dhcp_ip4_only_no_ra(self):
451 # with disabling RA explicitly things should be fast
452 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
453 extra_opts='IPv6AcceptRA=False')
454
455 def test_coldplug_dhcp_ip6(self):
456 self.do_test(coldplug=True, ipv6=True)
457
458 def test_hotplug_dhcp_ip4(self):
459 # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
460 self.do_test(coldplug=False, ipv6=False, online_timeout=15)
461
462 def test_hotplug_dhcp_ip6(self):
463 self.do_test(coldplug=False, ipv6=True)
464
465 def test_route_only_dns(self):
466 self.write_network('myvpn.netdev', '''\
467 [NetDev]
468 Name=dummy0
469 Kind=dummy
470 MACAddress=12:34:56:78:9a:bc''')
471 self.write_network('myvpn.network', '''\
472 [Match]
473 Name=dummy0
474 [Network]
475 Address=192.168.42.100
476 DNS=192.168.42.1
477 Domains= ~company''')
478
479 self.do_test(coldplug=True, ipv6=False,
480 extra_opts='IPv6AcceptRouterAdvertisements=False')
481
482 with open(RESOLV_CONF) as f:
483 contents = f.read()
484 # ~company is not a search domain, only a routing domain
485 self.assertNotRegex(contents, 'search.*company')
486 # our global server should appear
487 self.assertIn('nameserver 192.168.5.1\n', contents)
488 # should not have domain-restricted server as global server
489 self.assertNotIn('nameserver 192.168.42.1\n', contents)
490
491 def test_route_only_dns_all_domains(self):
492 self.write_network('myvpn.netdev', '''[NetDev]
493 Name=dummy0
494 Kind=dummy
495 MACAddress=12:34:56:78:9a:bc''')
496 self.write_network('myvpn.network', '''[Match]
497 Name=dummy0
498 [Network]
499 Address=192.168.42.100
500 DNS=192.168.42.1
501 Domains= ~company ~.''')
502
503 self.do_test(coldplug=True, ipv6=False,
504 extra_opts='IPv6AcceptRouterAdvertisements=False')
505
506 with open(RESOLV_CONF) as f:
507 contents = f.read()
508
509 # ~company is not a search domain, only a routing domain
510 self.assertNotRegex(contents, 'search.*company')
511
512 # our global server should appear
513 self.assertIn('nameserver 192.168.5.1\n', contents)
514 # should have company server as global server due to ~.
515 self.assertIn('nameserver 192.168.42.1\n', contents)
516
517
518 @unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
519 class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
520 '''Test networkd client against dnsmasq'''
521
522 def setUp(self):
523 super().setUp()
524 self.dnsmasq = None
525 self.iface_mac = 'de:ad:be:ef:47:11'
526
527 def create_iface(self, ipv6=False, dnsmasq_opts=None):
528 '''Create test interface with DHCP server behind it'''
529
530 # add veth pair
531 subprocess.check_call(['ip', 'link', 'add', 'name', self.iface,
532 'address', self.iface_mac,
533 'type', 'veth', 'peer', 'name', self.if_router])
534
535 # give our router an IP
536 subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
537 subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
538 if ipv6:
539 subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
540 subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])
541
542 # add DHCP server
543 self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
544 lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
545 if ipv6:
546 extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
547 else:
548 extra_opts = []
549 if dnsmasq_opts:
550 extra_opts += dnsmasq_opts
551 self.dnsmasq = subprocess.Popen(
552 ['dnsmasq', '--keep-in-foreground', '--log-queries',
553 '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
554 '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
555 '--interface=' + self.if_router, '--except-interface=lo',
556 '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)
557
558 def shutdown_iface(self):
559 '''Remove test interface and stop DHCP server'''
560
561 if self.if_router:
562 subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
563 self.if_router = None
564 if self.dnsmasq:
565 self.dnsmasq.kill()
566 self.dnsmasq.wait()
567 self.dnsmasq = None
568
569 def print_server_log(self):
570 '''Print DHCP server log for debugging failures'''
571
572 with open(self.dnsmasq_log) as f:
573 sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
574
575 def test_resolved_domain_restricted_dns(self):
576 '''resolved: domain-restricted DNS servers'''
577
578 # create interface for generic connections; this will map all DNS names
579 # to 192.168.42.1
580 self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
581 self.write_network('general.network', '''\
582 [Match]
583 Name={}
584 [Network]
585 DHCP=ipv4
586 IPv6AcceptRA=False'''.format(self.iface))
587
588 # create second device/dnsmasq for a .company/.lab VPN interface
589 # static IPs for simplicity
590 self.add_veth_pair('testvpnclient', 'testvpnrouter')
591 subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
592 subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
593 subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
594
595 vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log')
596 vpn_dnsmasq = subprocess.Popen(
597 ['dnsmasq', '--keep-in-foreground', '--log-queries',
598 '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null',
599 '--dhcp-leasefile=/dev/null', '--bind-interfaces',
600 '--interface=testvpnrouter', '--except-interface=lo',
601 '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
602 self.addCleanup(vpn_dnsmasq.wait)
603 self.addCleanup(vpn_dnsmasq.kill)
604
605 self.write_network('vpn.network', '''\
606 [Match]
607 Name=testvpnclient
608 [Network]
609 IPv6AcceptRA=False
610 Address=10.241.3.2/24
611 DNS=10.241.3.1
612 Domains= ~company ~lab''')
613
614 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
615 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
616 '--interface=testvpnclient', '--timeout=20'])
617
618 # ensure we start fresh with every test
619 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved'])
620
621 # test vpnclient specific domains; these should *not* be answered by
622 # the general DNS
623 out = subprocess.check_output(['resolvectl', 'query', 'math.lab'])
624 self.assertIn(b'math.lab: 10.241.3.3', out)
625 out = subprocess.check_output(['resolvectl', 'query', 'kettle.cantina.company'])
626 self.assertIn(b'kettle.cantina.company: 10.241.4.4', out)
627
628 # test general domains
629 out = subprocess.check_output(['resolvectl', 'query', 'megasearch.net'])
630 self.assertIn(b'megasearch.net: 192.168.42.1', out)
631
632 with open(self.dnsmasq_log) as f:
633 general_log = f.read()
634 with open(vpn_dnsmasq_log) as f:
635 vpn_log = f.read()
636
637 # VPN domains should only be sent to VPN DNS
638 self.assertRegex(vpn_log, 'query.*math.lab')
639 self.assertRegex(vpn_log, 'query.*cantina.company')
640 self.assertNotIn('.lab', general_log)
641 self.assertNotIn('.company', general_log)
642
643 # general domains should not be sent to the VPN DNS
644 self.assertRegex(general_log, 'query.*megasearch.net')
645 self.assertNotIn('megasearch.net', vpn_log)
646
647 def test_resolved_etc_hosts(self):
648 '''resolved queries to /etc/hosts'''
649
650 # FIXME: -t MX query fails with enabled DNSSEC (even when using
651 # the known negative trust anchor .internal instead of .example)
652 conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
653 os.makedirs(os.path.dirname(conf), exist_ok=True)
654 with open(conf, 'w') as f:
655 f.write('[Resolve]\nDNSSEC=no')
656 self.addCleanup(os.remove, conf)
657
658 # create /etc/hosts bind mount which resolves my.example for IPv4
659 hosts = os.path.join(self.workdir, 'hosts')
660 with open(hosts, 'w') as f:
661 f.write('172.16.99.99 my.example\n')
662 subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
663 self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
664 subprocess.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
665
666 # note: different IPv4 address here, so that it's easy to tell apart
667 # what resolved the query
668 self.create_iface(dnsmasq_opts=['--host-record=my.example,172.16.99.1,2600::99:99',
669 '--host-record=other.example,172.16.0.42,2600::42',
670 '--mx-host=example,mail.example'],
671 ipv6=True)
672 self.do_test(coldplug=None, ipv6=True)
673
674 try:
675 # family specific queries
676 out = subprocess.check_output(['resolvectl', 'query', '-4', 'my.example'])
677 self.assertIn(b'my.example: 172.16.99.99', out)
678 # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
679 # it's considered a sufficient source
680 self.assertNotEqual(subprocess.call(['resolvectl', 'query', '-6', 'my.example']), 0)
681 # "any family" query; IPv4 should come from /etc/hosts
682 out = subprocess.check_output(['resolvectl', 'query', 'my.example'])
683 self.assertIn(b'my.example: 172.16.99.99', out)
684 # IP → name lookup; again, takes the /etc/hosts one
685 out = subprocess.check_output(['resolvectl', 'query', '172.16.99.99'])
686 self.assertIn(b'172.16.99.99: my.example', out)
687
688 # non-address RRs should fall back to DNS
689 out = subprocess.check_output(['resolvectl', 'query', '--type=MX', 'example'])
690 self.assertIn(b'example IN MX 1 mail.example', out)
691
692 # other domains query DNS
693 out = subprocess.check_output(['resolvectl', 'query', 'other.example'])
694 self.assertIn(b'172.16.0.42', out)
695 out = subprocess.check_output(['resolvectl', 'query', '172.16.0.42'])
696 self.assertIn(b'172.16.0.42: other.example', out)
697 except (AssertionError, subprocess.CalledProcessError):
698 self.show_journal('systemd-resolved.service')
699 self.print_server_log()
700 raise
701
702 def test_transient_hostname(self):
703 '''networkd sets transient hostname from DHCP'''
704
705 orig_hostname = socket.gethostname()
706 self.addCleanup(socket.sethostname, orig_hostname)
707 # temporarily move /etc/hostname away; restart hostnamed to pick it up
708 if os.path.exists('/etc/hostname'):
709 subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
710 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
711 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
712 self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
713
714 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
715 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
716
717 try:
718 # should have received the fixed IP above
719 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
720 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
721 # should have set transient hostname in hostnamed; this is
722 # sometimes a bit lagging (issue #4753), so retry a few times
723 for retry in range(1, 6):
724 out = subprocess.check_output(['hostnamectl'])
725 if b'testgreen' in out:
726 break
727 time.sleep(5)
728 sys.stdout.write('[retry %i] ' % retry)
729 sys.stdout.flush()
730 else:
731 self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
732 # and also applied to the system
733 self.assertEqual(socket.gethostname(), 'testgreen')
734 except AssertionError:
735 self.show_journal('systemd-networkd.service')
736 self.show_journal('systemd-hostnamed.service')
737 self.print_server_log()
738 raise
739
740 def test_transient_hostname_with_static(self):
741 '''transient hostname is not applied if static hostname exists'''
742
743 orig_hostname = socket.gethostname()
744 self.addCleanup(socket.sethostname, orig_hostname)
745
746 if not os.path.exists('/etc/hostname'):
747 self.write_config('/etc/hostname', "foobarqux")
748 else:
749 self.write_config('/run/hostname.tmp', "foobarqux")
750 subprocess.check_call(['mount', '--bind', '/run/hostname.tmp', '/etc/hostname'])
751 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
752
753 socket.sethostname("foobarqux");
754
755 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
756 self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
757
758 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
759 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
760
761 try:
762 # should have received the fixed IP above
763 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
764 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
765 # static hostname wins over transient one, thus *not* applied
766 self.assertEqual(socket.gethostname(), "foobarqux")
767 except AssertionError:
768 self.show_journal('systemd-networkd.service')
769 self.show_journal('systemd-hostnamed.service')
770 self.print_server_log()
771 raise
772
773
774 class NetworkdClientTest(ClientTestBase, unittest.TestCase):
775 '''Test networkd client against networkd server'''
776
777 def setUp(self):
778 super().setUp()
779 self.dnsmasq = None
780
781 def create_iface(self, ipv6=False, dhcpserver_opts=None):
782 '''Create test interface with DHCP server behind it'''
783
784 # run "router-side" networkd in own mount namespace to shield it from
785 # "client-side" configuration and networkd
786 (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh')
787 self.addCleanup(os.remove, script)
788 with os.fdopen(fd, 'w+') as f:
789 f.write('''\
790 #!/bin/sh
791 set -eu
792 mkdir -p /run/systemd/network
793 mkdir -p /run/systemd/netif
794 mount -t tmpfs none /run/systemd/network
795 mount -t tmpfs none /run/systemd/netif
796 [ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
797 # create router/client veth pair
798 cat << EOF > /run/systemd/network/test.netdev
799 [NetDev]
800 Name=%(ifr)s
801 Kind=veth
802
803 [Peer]
804 Name=%(ifc)s
805 EOF
806
807 cat << EOF > /run/systemd/network/test.network
808 [Match]
809 Name=%(ifr)s
810
811 [Network]
812 Address=192.168.5.1/24
813 %(addr6)s
814 DHCPServer=yes
815
816 [DHCPServer]
817 PoolOffset=10
818 PoolSize=50
819 DNS=192.168.5.1
820 %(dhopts)s
821 EOF
822
823 # run networkd as in systemd-networkd.service
824 exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
825 ''' % {'ifr': self.if_router, 'ifc': self.iface, 'addr6': ipv6 and 'Address=2600::1/64' or '',
826 'dhopts': dhcpserver_opts or ''})
827
828 os.fchmod(fd, 0o755)
829
830 subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service',
831 '-p', 'InaccessibleDirectories=-/etc/systemd/network',
832 '-p', 'InaccessibleDirectories=-/run/systemd/network',
833 '-p', 'InaccessibleDirectories=-/run/systemd/netif',
834 '--service-type=notify', script])
835
836 # wait until devices got created
837 for _ in range(50):
838 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
839 if b'state UP' in out and b'scope global' in out:
840 break
841 time.sleep(0.1)
842
843 def shutdown_iface(self):
844 '''Remove test interface and stop DHCP server'''
845
846 if self.if_router:
847 subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
848 # ensure failed transient unit does not stay around
849 subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
850 subprocess.call(['ip', 'link', 'del', 'dev', self.if_router])
851 self.if_router = None
852
853 def print_server_log(self):
854 '''Print DHCP server log for debugging failures'''
855
856 self.show_journal('networkd-test-router.service')
857
858 @unittest.skip('networkd does not have DHCPv6 server support')
859 def test_hotplug_dhcp_ip6(self):
860 pass
861
862 @unittest.skip('networkd does not have DHCPv6 server support')
863 def test_coldplug_dhcp_ip6(self):
864 pass
865
866 def test_search_domains(self):
867
868 # we don't use this interface for this test
869 self.if_router = None
870
871 self.write_network('test.netdev', '''\
872 [NetDev]
873 Name=dummy0
874 Kind=dummy
875 MACAddress=12:34:56:78:9a:bc''')
876 self.write_network('test.network', '''\
877 [Match]
878 Name=dummy0
879 [Network]
880 Address=192.168.42.100
881 DNS=192.168.42.1
882 Domains= one two three four five six seven eight nine ten''')
883
884 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
885
886 for timeout in range(50):
887 with open(RESOLV_CONF) as f:
888 contents = f.read()
889 if ' one' in contents:
890 break
891 time.sleep(0.1)
892 self.assertRegex(contents, 'search .*one two three four')
893 self.assertNotIn('seven\n', contents)
894 self.assertIn('# Too many search domains configured, remaining ones ignored.\n', contents)
895
896 def test_search_domains_too_long(self):
897
898 # we don't use this interface for this test
899 self.if_router = None
900
901 name_prefix = 'a' * 60
902
903 self.write_network('test.netdev', '''\
904 [NetDev]
905 Name=dummy0
906 Kind=dummy
907 MACAddress=12:34:56:78:9a:bc''')
908 self.write_network('test.network', '''\
909 [Match]
910 Name=dummy0
911 [Network]
912 Address=192.168.42.100
913 DNS=192.168.42.1
914 Domains={p}0 {p}1 {p}2 {p}3 {p}4'''.format(p=name_prefix))
915
916 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
917
918 for timeout in range(50):
919 with open(RESOLV_CONF) as f:
920 contents = f.read()
921 if ' one' in contents:
922 break
923 time.sleep(0.1)
924 self.assertRegex(contents, 'search .*{p}0 {p}1 {p}2'.format(p=name_prefix))
925 self.assertIn('# Total length of all search domains is too long, remaining ones ignored.', contents)
926
927 def test_dropin(self):
928 # we don't use this interface for this test
929 self.if_router = None
930
931 self.write_network('test.netdev', '''\
932 [NetDev]
933 Name=dummy0
934 Kind=dummy
935 MACAddress=12:34:56:78:9a:bc''')
936 self.write_network('test.network', '''\
937 [Match]
938 Name=dummy0
939 [Network]
940 Address=192.168.42.100
941 DNS=192.168.42.1''')
942 self.write_network_dropin('test.network', 'dns', '''\
943 [Network]
944 DNS=127.0.0.1''')
945
946 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
947
948 for timeout in range(50):
949 with open(RESOLV_CONF) as f:
950 contents = f.read()
951 if ' 127.0.0.1' in contents:
952 break
953 time.sleep(0.1)
954 self.assertIn('nameserver 192.168.42.1\n', contents)
955 self.assertIn('nameserver 127.0.0.1\n', contents)
956
957 def test_dhcp_timezone(self):
958 '''networkd sets time zone from DHCP'''
959
960 def get_tz():
961 out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
962 '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
963 assert out.startswith(b's "')
964 out = out.strip()
965 assert out.endswith(b'"')
966 return out[3:-1].decode()
967
968 orig_timezone = get_tz()
969 self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone])
970
971 self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
972 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4')
973
974 # should have applied the received timezone
975 try:
976 self.assertEqual(get_tz(), 'Pacific/Honolulu')
977 except AssertionError:
978 self.show_journal('systemd-networkd.service')
979 self.show_journal('systemd-hostnamed.service')
980 raise
981
982
983 class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
984 """Test [Match] sections in .network files.
985
986 Be aware that matching the test host's interfaces will wipe their
987 configuration, so as a precaution, all network files should have a
988 restrictive [Match] section to only ever interfere with the
989 temporary veth interfaces created here.
990 """
991
992 def tearDown(self):
993 """Stop networkd."""
994 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
995
996 def test_basic_matching(self):
997 """Verify the Name= line works throughout this class."""
998 self.add_veth_pair('test_if1', 'fake_if2')
999 self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
1000 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1001 self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
1002
1003 def test_inverted_matching(self):
1004 """Verify that a '!'-prefixed value inverts the match."""
1005 # Use a MAC address as the interfaces' common matching attribute
1006 # to avoid depending on udev, to support testing in containers.
1007 mac = '00:01:02:03:98:99'
1008 self.add_veth_pair('test_veth', 'test_peer',
1009 ['addr', mac], ['addr', mac])
1010 self.write_network('no-veth.network', """\
1011 [Match]
1012 MACAddress={}
1013 Name=!nonexistent *peer*
1014 [Network]""".format(mac))
1015 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1016 self.assert_link_states(test_veth='managed', test_peer='unmanaged')
1017
1018
1019 class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
1020 """Test if networkd manages the correct interfaces."""
1021
1022 def setUp(self):
1023 """Write .network files to match the named veth devices."""
1024 # Define the veth+peer pairs to be created.
1025 # Their pairing doesn't actually matter, only their names do.
1026 self.veths = {
1027 'm1def': 'm0unm',
1028 'm1man': 'm1unm',
1029 }
1030
1031 # Define the contents of .network files to be read in order.
1032 self.configs = (
1033 "[Match]\nName=m1def\n",
1034 "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
1035 "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
1036 )
1037
1038 # Write out the .network files to be cleaned up automatically.
1039 for i, config in enumerate(self.configs):
1040 self.write_network("%02d-test.network" % i, config)
1041
1042 def tearDown(self):
1043 """Stop networkd."""
1044 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
1045
1046 def create_iface(self):
1047 """Create temporary veth pairs for interface matching."""
1048 for veth, peer in self.veths.items():
1049 self.add_veth_pair(veth, peer)
1050
1051 def test_unmanaged_setting(self):
1052 """Verify link states with Unmanaged= settings, hot-plug."""
1053 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1054 self.create_iface()
1055 self.assert_link_states(m1def='managed',
1056 m1man='managed',
1057 m1unm='unmanaged',
1058 m0unm='unmanaged')
1059
1060 def test_unmanaged_setting_coldplug(self):
1061 """Verify link states with Unmanaged= settings, cold-plug."""
1062 self.create_iface()
1063 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1064 self.assert_link_states(m1def='managed',
1065 m1man='managed',
1066 m1unm='unmanaged',
1067 m0unm='unmanaged')
1068
1069 def test_catchall_config(self):
1070 """Verify link states with a catch-all config, hot-plug."""
1071 # Don't actually catch ALL interfaces. It messes up the host.
1072 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1073 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1074 self.create_iface()
1075 self.assert_link_states(m1def='managed',
1076 m1man='managed',
1077 m1unm='unmanaged',
1078 m0unm='managed')
1079
1080 def test_catchall_config_coldplug(self):
1081 """Verify link states with a catch-all config, cold-plug."""
1082 # Don't actually catch ALL interfaces. It messes up the host.
1083 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1084 self.create_iface()
1085 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1086 self.assert_link_states(m1def='managed',
1087 m1man='managed',
1088 m1unm='unmanaged',
1089 m0unm='managed')
1090
1091
1092 if __name__ == '__main__':
1093 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
1094 verbosity=2))