]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/networkd-test.py
sd-bus: let's better not invade stdio territory when duplicating fds
[thirdparty/systemd.git] / test / networkd-test.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: LGPL-2.1+
3 #
4 # networkd integration test
5 # This uses temporary configuration in /run and temporary veth devices, and
6 # does not write anything on disk or change any system configuration;
7 # but it assumes (and checks at the beginning) that networkd is not currently
8 # running.
9 #
10 # This can be run on a normal installation, in QEMU, nspawn (with
11 # --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
12 # or LXC system containers. You need at least the "ip" tool from the iproute
13 # package; it is recommended to install dnsmasq too to get full test coverage.
14 #
15 # ATTENTION: This uses the *installed* networkd, not the one from the built
16 # source tree.
17 #
18 # (C) 2015 Canonical Ltd.
19 # Author: Martin Pitt <martin.pitt@ubuntu.com>
20 #
21 # systemd is free software; you can redistribute it and/or modify it
22 # under the terms of the GNU Lesser General Public License as published by
23 # the Free Software Foundation; either version 2.1 of the License, or
24 # (at your option) any later version.
25
26 # systemd is distributed in the hope that it will be useful, but
27 # WITHOUT ANY WARRANTY; without even the implied warranty of
28 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
29 # Lesser General Public License for more details.
30 #
31 # You should have received a copy of the GNU Lesser General Public License
32 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
33
34 import errno
35 import os
36 import shutil
37 import socket
38 import subprocess
39 import sys
40 import tempfile
41 import time
42 import unittest
43
44 HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
45
46 NETWORK_UNITDIR = '/run/systemd/network'
47
48 NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
49 path='/usr/lib/systemd:/lib/systemd')
50
51 RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
52
53
54 def setUpModule():
55 """Initialize the environment, and perform sanity checks on it."""
56 if NETWORKD_WAIT_ONLINE is None:
57 raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
58
59 # Do not run any tests if the system is using networkd already.
60 if subprocess.call(['systemctl', 'is-active', '--quiet',
61 'systemd-networkd.service']) == 0:
62 raise unittest.SkipTest('networkd is already active')
63
64 # Avoid "Failed to open /dev/tty" errors in containers.
65 os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
66
67 # Ensure the unit directory exists so tests can dump files into it.
68 os.makedirs(NETWORK_UNITDIR, exist_ok=True)
69
70
71 class NetworkdTestingUtilities:
72 """Provide a set of utility functions to facilitate networkd tests.
73
74 This class must be inherited along with unittest.TestCase to define
75 some required methods.
76 """
77
78 def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
79 """Add a veth interface pair, and queue them to be removed."""
80 subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
81 list(veth_options) +
82 ['type', 'veth', 'peer', 'name', peer] +
83 list(peer_options))
84 self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
85
86 def write_network(self, unit_name, contents):
87 """Write a network unit file, and queue it to be removed."""
88 unit_path = os.path.join(NETWORK_UNITDIR, unit_name)
89
90 with open(unit_path, 'w') as unit:
91 unit.write(contents)
92 self.addCleanup(os.remove, unit_path)
93
94 def write_network_dropin(self, unit_name, dropin_name, contents):
95 """Write a network unit drop-in, and queue it to be removed."""
96 dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
97 dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
98
99 os.makedirs(dropin_dir, exist_ok=True)
100 self.addCleanup(os.rmdir, dropin_dir)
101 with open(dropin_path, 'w') as dropin:
102 dropin.write(contents)
103 self.addCleanup(os.remove, dropin_path)
104
105 def read_attr(self, link, attribute):
106 """Read a link attributed from the sysfs."""
107 # Note we we don't want to check if interface `link' is managed, we
108 # want to evaluate link variable and pass the value of the link to
109 # assert_link_states e.g. eth0=managed.
110 self.assert_link_states(**{link:'managed'})
111 with open(os.path.join('/sys/class/net', link, attribute)) as f:
112 return f.readline().strip()
113
114 def assert_link_states(self, **kwargs):
115 """Match networkctl link states to the given ones.
116
117 Each keyword argument should be the name of a network interface
118 with its expected value of the "SETUP" column in output from
119 networkctl. The interfaces have five seconds to come online
120 before the check is performed. Every specified interface must
121 be present in the output, and any other interfaces found in the
122 output are ignored.
123
124 A special interface state "managed" is supported, which matches
125 any value in the "SETUP" column other than "unmanaged".
126 """
127 if not kwargs:
128 return
129 interfaces = set(kwargs)
130
131 # Wait for the requested interfaces, but don't fail for them.
132 subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
133 ['--interface={}'.format(iface) for iface in kwargs])
134
135 # Validate each link state found in the networkctl output.
136 out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
137 for line in out.decode('utf-8').split('\n'):
138 fields = line.split()
139 if len(fields) >= 5 and fields[1] in kwargs:
140 iface = fields[1]
141 expected = kwargs[iface]
142 actual = fields[-1]
143 if (actual != expected and
144 not (expected == 'managed' and actual != 'unmanaged')):
145 self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
146 interfaces.remove(iface)
147
148 # Ensure that all requested interfaces have been covered.
149 if interfaces:
150 self.fail("Missing links in status output: {}".format(interfaces))
151
152
153 class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
154 """Provide common methods for testing networkd against servers."""
155
156 def setUp(self):
157 self.write_network('port1.netdev', '''\
158 [NetDev]
159 Name=port1
160 Kind=dummy
161 MACAddress=12:34:56:78:9a:bc''')
162 self.write_network('port2.netdev', '''\
163 [NetDev]
164 Name=port2
165 Kind=dummy
166 MACAddress=12:34:56:78:9a:bd''')
167 self.write_network('mybridge.netdev', '''\
168 [NetDev]
169 Name=mybridge
170 Kind=bridge''')
171 self.write_network('port1.network', '''\
172 [Match]
173 Name=port1
174 [Network]
175 Bridge=mybridge''')
176 self.write_network('port2.network', '''\
177 [Match]
178 Name=port2
179 [Network]
180 Bridge=mybridge''')
181 self.write_network('mybridge.network', '''\
182 [Match]
183 Name=mybridge
184 [Network]
185 DNS=192.168.250.1
186 Address=192.168.250.33/24
187 Gateway=192.168.250.1''')
188 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
189
190 def tearDown(self):
191 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd'])
192 subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
193 subprocess.check_call(['ip', 'link', 'del', 'port1'])
194 subprocess.check_call(['ip', 'link', 'del', 'port2'])
195
196 def test_bridge_init(self):
197 self.assert_link_states(
198 port1='managed',
199 port2='managed',
200 mybridge='managed')
201
202 def test_bridge_port_priority(self):
203 self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
204 self.write_network_dropin('port1.network', 'priority', '''\
205 [Bridge]
206 Priority=28
207 ''')
208 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
209 self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
210
211 def test_bridge_port_priority_set_zero(self):
212 """It should be possible to set the bridge port priority to 0"""
213 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
214 self.write_network_dropin('port2.network', 'priority', '''\
215 [Bridge]
216 Priority=0
217 ''')
218 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
219 self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
220
221 class ClientTestBase(NetworkdTestingUtilities):
222 """Provide common methods for testing networkd against servers."""
223
224 @classmethod
225 def setUpClass(klass):
226 klass.orig_log_level = subprocess.check_output(
227 ['systemctl', 'show', '--value', '--property', 'LogLevel'],
228 universal_newlines=True).strip()
229 subprocess.check_call(['systemd-analyze', 'set-log-level', 'debug'])
230
231 @classmethod
232 def tearDownClass(klass):
233 subprocess.check_call(['systemd-analyze', 'set-log-level', klass.orig_log_level])
234
235 def setUp(self):
236 self.iface = 'test_eth42'
237 self.if_router = 'router_eth42'
238 self.workdir_obj = tempfile.TemporaryDirectory()
239 self.workdir = self.workdir_obj.name
240 self.config = 'test_eth42.network'
241
242 # get current journal cursor
243 subprocess.check_output(['journalctl', '--sync'])
244 out = subprocess.check_output(['journalctl', '-b', '--quiet',
245 '--no-pager', '-n0', '--show-cursor'],
246 universal_newlines=True)
247 self.assertTrue(out.startswith('-- cursor:'))
248 self.journal_cursor = out.split()[-1]
249
250 def tearDown(self):
251 self.shutdown_iface()
252 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
253 subprocess.call(['ip', 'link', 'del', 'dummy0'],
254 stderr=subprocess.DEVNULL)
255
256 def show_journal(self, unit):
257 '''Show journal of given unit since start of the test'''
258
259 print('---- {} ----'.format(unit))
260 subprocess.check_output(['journalctl', '--sync'])
261 sys.stdout.flush()
262 subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
263 '--cursor', self.journal_cursor, '-u', unit])
264
265 def create_iface(self, ipv6=False):
266 '''Create test interface with DHCP server behind it'''
267
268 raise NotImplementedError('must be implemented by a subclass')
269
270 def shutdown_iface(self):
271 '''Remove test interface and stop DHCP server'''
272
273 raise NotImplementedError('must be implemented by a subclass')
274
275 def print_server_log(self):
276 '''Print DHCP server log for debugging failures'''
277
278 raise NotImplementedError('must be implemented by a subclass')
279
280 def do_test(self, coldplug=True, ipv6=False, extra_opts='',
281 online_timeout=10, dhcp_mode='yes'):
282 try:
283 subprocess.check_call(['systemctl', 'start', 'systemd-resolved'])
284 except subprocess.CalledProcessError:
285 self.show_journal('systemd-resolved.service')
286 raise
287 self.write_network(self.config, '''\
288 [Match]
289 Name={}
290 [Network]
291 DHCP={}
292 {}'''.format(self.iface, dhcp_mode, extra_opts))
293
294 if coldplug:
295 # create interface first, then start networkd
296 self.create_iface(ipv6=ipv6)
297 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
298 elif coldplug is not None:
299 # start networkd first, then create interface
300 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
301 self.create_iface(ipv6=ipv6)
302 else:
303 # "None" means test sets up interface by itself
304 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
305
306 try:
307 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
308 self.iface, '--timeout=%i' % online_timeout])
309
310 if ipv6:
311 # check iface state and IP 6 address; FIXME: we need to wait a bit
312 # longer, as the iface is "configured" already with IPv4 *or*
313 # IPv6, but we want to wait for both
314 for _ in range(10):
315 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
316 if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out:
317 break
318 time.sleep(1)
319 else:
320 self.fail('timed out waiting for IPv6 configuration')
321
322 self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
323 self.assertRegex(out, b'inet6 fe80::.* scope link')
324 else:
325 # should have link-local address on IPv6 only
326 out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
327 self.assertRegex(out, br'inet6 fe80::.* scope link')
328 self.assertNotIn(b'scope global', out)
329
330 # should have IPv4 address
331 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
332 self.assertIn(b'state UP', out)
333 self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
334
335 # check networkctl state
336 out = subprocess.check_output(['networkctl'])
337 self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
338 self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
339
340 out = subprocess.check_output(['networkctl', 'status', self.iface])
341 self.assertRegex(out, br'Type:\s+ether')
342 self.assertRegex(out, br'State:\s+routable.*configured')
343 self.assertRegex(out, br'Address:\s+192.168.5.\d+')
344 if ipv6:
345 self.assertRegex(out, br'2600::')
346 else:
347 self.assertNotIn(br'2600::', out)
348 self.assertRegex(out, br'fe80::')
349 self.assertRegex(out, br'Gateway:\s+192.168.5.1')
350 self.assertRegex(out, br'DNS:\s+192.168.5.1')
351 except (AssertionError, subprocess.CalledProcessError):
352 # show networkd status, journal, and DHCP server log on failure
353 with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
354 print('\n---- {} ----\n{}'.format(self.config, f.read()))
355 print('---- interface status ----')
356 sys.stdout.flush()
357 subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
358 print('---- networkctl status {} ----'.format(self.iface))
359 sys.stdout.flush()
360 subprocess.call(['networkctl', 'status', self.iface])
361 self.show_journal('systemd-networkd.service')
362 self.print_server_log()
363 raise
364
365 for timeout in range(50):
366 with open(RESOLV_CONF) as f:
367 contents = f.read()
368 if 'nameserver 192.168.5.1\n' in contents:
369 break
370 time.sleep(0.1)
371 else:
372 self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF)
373
374 if coldplug is False:
375 # check post-down.d hook
376 self.shutdown_iface()
377
378 def test_coldplug_dhcp_yes_ip4(self):
379 # we have a 12s timeout on RA, so we need to wait longer
380 self.do_test(coldplug=True, ipv6=False, online_timeout=15)
381
382 def test_coldplug_dhcp_yes_ip4_no_ra(self):
383 # with disabling RA explicitly things should be fast
384 self.do_test(coldplug=True, ipv6=False,
385 extra_opts='IPv6AcceptRA=False')
386
387 def test_coldplug_dhcp_ip4_only(self):
388 # we have a 12s timeout on RA, so we need to wait longer
389 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
390 online_timeout=15)
391
392 def test_coldplug_dhcp_ip4_only_no_ra(self):
393 # with disabling RA explicitly things should be fast
394 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
395 extra_opts='IPv6AcceptRA=False')
396
397 def test_coldplug_dhcp_ip6(self):
398 self.do_test(coldplug=True, ipv6=True)
399
400 def test_hotplug_dhcp_ip4(self):
401 # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
402 self.do_test(coldplug=False, ipv6=False, online_timeout=15)
403
404 def test_hotplug_dhcp_ip6(self):
405 self.do_test(coldplug=False, ipv6=True)
406
407 def test_route_only_dns(self):
408 self.write_network('myvpn.netdev', '''\
409 [NetDev]
410 Name=dummy0
411 Kind=dummy
412 MACAddress=12:34:56:78:9a:bc''')
413 self.write_network('myvpn.network', '''\
414 [Match]
415 Name=dummy0
416 [Network]
417 Address=192.168.42.100
418 DNS=192.168.42.1
419 Domains= ~company''')
420
421 self.do_test(coldplug=True, ipv6=False,
422 extra_opts='IPv6AcceptRouterAdvertisements=False')
423
424 with open(RESOLV_CONF) as f:
425 contents = f.read()
426 # ~company is not a search domain, only a routing domain
427 self.assertNotRegex(contents, 'search.*company')
428 # our global server should appear
429 self.assertIn('nameserver 192.168.5.1\n', contents)
430 # should not have domain-restricted server as global server
431 self.assertNotIn('nameserver 192.168.42.1\n', contents)
432
433 def test_route_only_dns_all_domains(self):
434 self.write_network('myvpn.netdev', '''[NetDev]
435 Name=dummy0
436 Kind=dummy
437 MACAddress=12:34:56:78:9a:bc''')
438 self.write_network('myvpn.network', '''[Match]
439 Name=dummy0
440 [Network]
441 Address=192.168.42.100
442 DNS=192.168.42.1
443 Domains= ~company ~.''')
444
445 self.do_test(coldplug=True, ipv6=False,
446 extra_opts='IPv6AcceptRouterAdvertisements=False')
447
448 with open(RESOLV_CONF) as f:
449 contents = f.read()
450
451 # ~company is not a search domain, only a routing domain
452 self.assertNotRegex(contents, 'search.*company')
453
454 # our global server should appear
455 self.assertIn('nameserver 192.168.5.1\n', contents)
456 # should have company server as global server due to ~.
457 self.assertIn('nameserver 192.168.42.1\n', contents)
458
459
460 @unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
461 class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
462 '''Test networkd client against dnsmasq'''
463
464 def setUp(self):
465 super().setUp()
466 self.dnsmasq = None
467 self.iface_mac = 'de:ad:be:ef:47:11'
468
469 def create_iface(self, ipv6=False, dnsmasq_opts=None):
470 '''Create test interface with DHCP server behind it'''
471
472 # add veth pair
473 subprocess.check_call(['ip', 'link', 'add', 'name', self.iface,
474 'address', self.iface_mac,
475 'type', 'veth', 'peer', 'name', self.if_router])
476
477 # give our router an IP
478 subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
479 subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
480 if ipv6:
481 subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
482 subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])
483
484 # add DHCP server
485 self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
486 lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
487 if ipv6:
488 extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
489 else:
490 extra_opts = []
491 if dnsmasq_opts:
492 extra_opts += dnsmasq_opts
493 self.dnsmasq = subprocess.Popen(
494 ['dnsmasq', '--keep-in-foreground', '--log-queries',
495 '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
496 '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
497 '--interface=' + self.if_router, '--except-interface=lo',
498 '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)
499
500 def shutdown_iface(self):
501 '''Remove test interface and stop DHCP server'''
502
503 if self.if_router:
504 subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
505 self.if_router = None
506 if self.dnsmasq:
507 self.dnsmasq.kill()
508 self.dnsmasq.wait()
509 self.dnsmasq = None
510
511 def print_server_log(self):
512 '''Print DHCP server log for debugging failures'''
513
514 with open(self.dnsmasq_log) as f:
515 sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
516
517 def test_resolved_domain_restricted_dns(self):
518 '''resolved: domain-restricted DNS servers'''
519
520 # create interface for generic connections; this will map all DNS names
521 # to 192.168.42.1
522 self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
523 self.write_network('general.network', '''\
524 [Match]
525 Name={}
526 [Network]
527 DHCP=ipv4
528 IPv6AcceptRA=False'''.format(self.iface))
529
530 # create second device/dnsmasq for a .company/.lab VPN interface
531 # static IPs for simplicity
532 self.add_veth_pair('testvpnclient', 'testvpnrouter')
533 subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
534 subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
535 subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
536
537 vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log')
538 vpn_dnsmasq = subprocess.Popen(
539 ['dnsmasq', '--keep-in-foreground', '--log-queries',
540 '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null',
541 '--dhcp-leasefile=/dev/null', '--bind-interfaces',
542 '--interface=testvpnrouter', '--except-interface=lo',
543 '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
544 self.addCleanup(vpn_dnsmasq.wait)
545 self.addCleanup(vpn_dnsmasq.kill)
546
547 self.write_network('vpn.network', '''\
548 [Match]
549 Name=testvpnclient
550 [Network]
551 IPv6AcceptRA=False
552 Address=10.241.3.2/24
553 DNS=10.241.3.1
554 Domains= ~company ~lab''')
555
556 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
557 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
558 '--interface=testvpnclient', '--timeout=20'])
559
560 # ensure we start fresh with every test
561 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved'])
562
563 # test vpnclient specific domains; these should *not* be answered by
564 # the general DNS
565 out = subprocess.check_output(['systemd-resolve', 'math.lab'])
566 self.assertIn(b'math.lab: 10.241.3.3', out)
567 out = subprocess.check_output(['systemd-resolve', 'kettle.cantina.company'])
568 self.assertIn(b'kettle.cantina.company: 10.241.4.4', out)
569
570 # test general domains
571 out = subprocess.check_output(['systemd-resolve', 'megasearch.net'])
572 self.assertIn(b'megasearch.net: 192.168.42.1', out)
573
574 with open(self.dnsmasq_log) as f:
575 general_log = f.read()
576 with open(vpn_dnsmasq_log) as f:
577 vpn_log = f.read()
578
579 # VPN domains should only be sent to VPN DNS
580 self.assertRegex(vpn_log, 'query.*math.lab')
581 self.assertRegex(vpn_log, 'query.*cantina.company')
582 self.assertNotIn('.lab', general_log)
583 self.assertNotIn('.company', general_log)
584
585 # general domains should not be sent to the VPN DNS
586 self.assertRegex(general_log, 'query.*megasearch.net')
587 self.assertNotIn('megasearch.net', vpn_log)
588
589 def test_resolved_etc_hosts(self):
590 '''resolved queries to /etc/hosts'''
591
592 # FIXME: -t MX query fails with enabled DNSSEC (even when using
593 # the known negative trust anchor .internal instead of .example)
594 conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
595 os.makedirs(os.path.dirname(conf), exist_ok=True)
596 with open(conf, 'w') as f:
597 f.write('[Resolve]\nDNSSEC=no')
598 self.addCleanup(os.remove, conf)
599
600 # create /etc/hosts bind mount which resolves my.example for IPv4
601 hosts = os.path.join(self.workdir, 'hosts')
602 with open(hosts, 'w') as f:
603 f.write('172.16.99.99 my.example\n')
604 subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
605 self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
606 subprocess.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
607
608 # note: different IPv4 address here, so that it's easy to tell apart
609 # what resolved the query
610 self.create_iface(dnsmasq_opts=['--host-record=my.example,172.16.99.1,2600::99:99',
611 '--host-record=other.example,172.16.0.42,2600::42',
612 '--mx-host=example,mail.example'],
613 ipv6=True)
614 self.do_test(coldplug=None, ipv6=True)
615
616 try:
617 # family specific queries
618 out = subprocess.check_output(['systemd-resolve', '-4', 'my.example'])
619 self.assertIn(b'my.example: 172.16.99.99', out)
620 # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
621 # it's considered a sufficient source
622 self.assertNotEqual(subprocess.call(['systemd-resolve', '-6', 'my.example']), 0)
623 # "any family" query; IPv4 should come from /etc/hosts
624 out = subprocess.check_output(['systemd-resolve', 'my.example'])
625 self.assertIn(b'my.example: 172.16.99.99', out)
626 # IP → name lookup; again, takes the /etc/hosts one
627 out = subprocess.check_output(['systemd-resolve', '172.16.99.99'])
628 self.assertIn(b'172.16.99.99: my.example', out)
629
630 # non-address RRs should fall back to DNS
631 out = subprocess.check_output(['systemd-resolve', '--type=MX', 'example'])
632 self.assertIn(b'example IN MX 1 mail.example', out)
633
634 # other domains query DNS
635 out = subprocess.check_output(['systemd-resolve', 'other.example'])
636 self.assertIn(b'172.16.0.42', out)
637 out = subprocess.check_output(['systemd-resolve', '172.16.0.42'])
638 self.assertIn(b'172.16.0.42: other.example', out)
639 except (AssertionError, subprocess.CalledProcessError):
640 self.show_journal('systemd-resolved.service')
641 self.print_server_log()
642 raise
643
644 def test_transient_hostname(self):
645 '''networkd sets transient hostname from DHCP'''
646
647 orig_hostname = socket.gethostname()
648 self.addCleanup(socket.sethostname, orig_hostname)
649 # temporarily move /etc/hostname away; restart hostnamed to pick it up
650 if os.path.exists('/etc/hostname'):
651 subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
652 self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
653 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
654
655 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
656 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
657
658 try:
659 # should have received the fixed IP above
660 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
661 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
662 # should have set transient hostname in hostnamed; this is
663 # sometimes a bit lagging (issue #4753), so retry a few times
664 for retry in range(1, 6):
665 out = subprocess.check_output(['hostnamectl'])
666 if b'testgreen' in out:
667 break
668 time.sleep(5)
669 sys.stdout.write('[retry %i] ' % retry)
670 sys.stdout.flush()
671 else:
672 self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
673 # and also applied to the system
674 self.assertEqual(socket.gethostname(), 'testgreen')
675 except AssertionError:
676 self.show_journal('systemd-networkd.service')
677 self.show_journal('systemd-hostnamed.service')
678 self.print_server_log()
679 raise
680
681 def test_transient_hostname_with_static(self):
682 '''transient hostname is not applied if static hostname exists'''
683
684 orig_hostname = socket.gethostname()
685 self.addCleanup(socket.sethostname, orig_hostname)
686 if not os.path.exists('/etc/hostname'):
687 self.writeConfig('/etc/hostname', orig_hostname)
688 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
689
690 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
691 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
692
693 try:
694 # should have received the fixed IP above
695 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
696 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
697 # static hostname wins over transient one, thus *not* applied
698 self.assertEqual(socket.gethostname(), orig_hostname)
699 except AssertionError:
700 self.show_journal('systemd-networkd.service')
701 self.show_journal('systemd-hostnamed.service')
702 self.print_server_log()
703 raise
704
705
706 class NetworkdClientTest(ClientTestBase, unittest.TestCase):
707 '''Test networkd client against networkd server'''
708
709 def setUp(self):
710 super().setUp()
711 self.dnsmasq = None
712
713 def create_iface(self, ipv6=False, dhcpserver_opts=None):
714 '''Create test interface with DHCP server behind it'''
715
716 # run "router-side" networkd in own mount namespace to shield it from
717 # "client-side" configuration and networkd
718 (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh')
719 self.addCleanup(os.remove, script)
720 with os.fdopen(fd, 'w+') as f:
721 f.write('''\
722 #!/bin/sh
723 set -eu
724 mkdir -p /run/systemd/network
725 mkdir -p /run/systemd/netif
726 mount -t tmpfs none /run/systemd/network
727 mount -t tmpfs none /run/systemd/netif
728 [ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
729 # create router/client veth pair
730 cat << EOF > /run/systemd/network/test.netdev
731 [NetDev]
732 Name=%(ifr)s
733 Kind=veth
734
735 [Peer]
736 Name=%(ifc)s
737 EOF
738
739 cat << EOF > /run/systemd/network/test.network
740 [Match]
741 Name=%(ifr)s
742
743 [Network]
744 Address=192.168.5.1/24
745 %(addr6)s
746 DHCPServer=yes
747
748 [DHCPServer]
749 PoolOffset=10
750 PoolSize=50
751 DNS=192.168.5.1
752 %(dhopts)s
753 EOF
754
755 # run networkd as in systemd-networkd.service
756 exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
757 ''' % {'ifr': self.if_router, 'ifc': self.iface, 'addr6': ipv6 and 'Address=2600::1/64' or '',
758 'dhopts': dhcpserver_opts or ''})
759
760 os.fchmod(fd, 0o755)
761
762 subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service',
763 '-p', 'InaccessibleDirectories=-/etc/systemd/network',
764 '-p', 'InaccessibleDirectories=-/run/systemd/network',
765 '-p', 'InaccessibleDirectories=-/run/systemd/netif',
766 '--service-type=notify', script])
767
768 # wait until devices got created
769 for _ in range(50):
770 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
771 if b'state UP' in out and b'scope global' in out:
772 break
773 time.sleep(0.1)
774
775 def shutdown_iface(self):
776 '''Remove test interface and stop DHCP server'''
777
778 if self.if_router:
779 subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
780 # ensure failed transient unit does not stay around
781 subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
782 subprocess.call(['ip', 'link', 'del', 'dev', self.if_router])
783 self.if_router = None
784
785 def print_server_log(self):
786 '''Print DHCP server log for debugging failures'''
787
788 self.show_journal('networkd-test-router.service')
789
790 @unittest.skip('networkd does not have DHCPv6 server support')
791 def test_hotplug_dhcp_ip6(self):
792 pass
793
794 @unittest.skip('networkd does not have DHCPv6 server support')
795 def test_coldplug_dhcp_ip6(self):
796 pass
797
798 def test_search_domains(self):
799
800 # we don't use this interface for this test
801 self.if_router = None
802
803 self.write_network('test.netdev', '''\
804 [NetDev]
805 Name=dummy0
806 Kind=dummy
807 MACAddress=12:34:56:78:9a:bc''')
808 self.write_network('test.network', '''\
809 [Match]
810 Name=dummy0
811 [Network]
812 Address=192.168.42.100
813 DNS=192.168.42.1
814 Domains= one two three four five six seven eight nine ten''')
815
816 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
817
818 for timeout in range(50):
819 with open(RESOLV_CONF) as f:
820 contents = f.read()
821 if ' one' in contents:
822 break
823 time.sleep(0.1)
824 self.assertRegex(contents, 'search .*one two three four')
825 self.assertNotIn('seven\n', contents)
826 self.assertIn('# Too many search domains configured, remaining ones ignored.\n', contents)
827
828 def test_search_domains_too_long(self):
829
830 # we don't use this interface for this test
831 self.if_router = None
832
833 name_prefix = 'a' * 60
834
835 self.write_network('test.netdev', '''\
836 [NetDev]
837 Name=dummy0
838 Kind=dummy
839 MACAddress=12:34:56:78:9a:bc''')
840 self.write_network('test.network', '''\
841 [Match]
842 Name=dummy0
843 [Network]
844 Address=192.168.42.100
845 DNS=192.168.42.1
846 Domains={p}0 {p}1 {p}2 {p}3 {p}4'''.format(p=name_prefix))
847
848 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
849
850 for timeout in range(50):
851 with open(RESOLV_CONF) as f:
852 contents = f.read()
853 if ' one' in contents:
854 break
855 time.sleep(0.1)
856 self.assertRegex(contents, 'search .*{p}0 {p}1 {p}2'.format(p=name_prefix))
857 self.assertIn('# Total length of all search domains is too long, remaining ones ignored.', contents)
858
859 def test_dropin(self):
860 # we don't use this interface for this test
861 self.if_router = None
862
863 self.write_network('test.netdev', '''\
864 [NetDev]
865 Name=dummy0
866 Kind=dummy
867 MACAddress=12:34:56:78:9a:bc''')
868 self.write_network('test.network', '''\
869 [Match]
870 Name=dummy0
871 [Network]
872 Address=192.168.42.100
873 DNS=192.168.42.1''')
874 self.write_network_dropin('test.network', 'dns', '''\
875 [Network]
876 DNS=127.0.0.1''')
877
878 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
879
880 for timeout in range(50):
881 with open(RESOLV_CONF) as f:
882 contents = f.read()
883 if ' 127.0.0.1' in contents:
884 break
885 time.sleep(0.1)
886 self.assertIn('nameserver 192.168.42.1\n', contents)
887 self.assertIn('nameserver 127.0.0.1\n', contents)
888
889 def test_dhcp_timezone(self):
890 '''networkd sets time zone from DHCP'''
891
892 def get_tz():
893 out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
894 '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
895 assert out.startswith(b's "')
896 out = out.strip()
897 assert out.endswith(b'"')
898 return out[3:-1].decode()
899
900 orig_timezone = get_tz()
901 self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone])
902
903 self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
904 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4')
905
906 # should have applied the received timezone
907 try:
908 self.assertEqual(get_tz(), 'Pacific/Honolulu')
909 except AssertionError:
910 self.show_journal('systemd-networkd.service')
911 self.show_journal('systemd-hostnamed.service')
912 raise
913
914
915 class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
916 """Test [Match] sections in .network files.
917
918 Be aware that matching the test host's interfaces will wipe their
919 configuration, so as a precaution, all network files should have a
920 restrictive [Match] section to only ever interfere with the
921 temporary veth interfaces created here.
922 """
923
924 def tearDown(self):
925 """Stop networkd."""
926 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
927
928 def test_basic_matching(self):
929 """Verify the Name= line works throughout this class."""
930 self.add_veth_pair('test_if1', 'fake_if2')
931 self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
932 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
933 self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
934
935 def test_inverted_matching(self):
936 """Verify that a '!'-prefixed value inverts the match."""
937 # Use a MAC address as the interfaces' common matching attribute
938 # to avoid depending on udev, to support testing in containers.
939 mac = '00:01:02:03:98:99'
940 self.add_veth_pair('test_veth', 'test_peer',
941 ['addr', mac], ['addr', mac])
942 self.write_network('no-veth.network', """\
943 [Match]
944 MACAddress={}
945 Name=!nonexistent *peer*
946 [Network]""".format(mac))
947 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
948 self.assert_link_states(test_veth='managed', test_peer='unmanaged')
949
950
951 class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
952 """Test if networkd manages the correct interfaces."""
953
954 def setUp(self):
955 """Write .network files to match the named veth devices."""
956 # Define the veth+peer pairs to be created.
957 # Their pairing doesn't actually matter, only their names do.
958 self.veths = {
959 'm1def': 'm0unm',
960 'm1man': 'm1unm',
961 }
962
963 # Define the contents of .network files to be read in order.
964 self.configs = (
965 "[Match]\nName=m1def\n",
966 "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
967 "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
968 )
969
970 # Write out the .network files to be cleaned up automatically.
971 for i, config in enumerate(self.configs):
972 self.write_network("%02d-test.network" % i, config)
973
974 def tearDown(self):
975 """Stop networkd."""
976 subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
977
978 def create_iface(self):
979 """Create temporary veth pairs for interface matching."""
980 for veth, peer in self.veths.items():
981 self.add_veth_pair(veth, peer)
982
983 def test_unmanaged_setting(self):
984 """Verify link states with Unmanaged= settings, hot-plug."""
985 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
986 self.create_iface()
987 self.assert_link_states(m1def='managed',
988 m1man='managed',
989 m1unm='unmanaged',
990 m0unm='unmanaged')
991
992 def test_unmanaged_setting_coldplug(self):
993 """Verify link states with Unmanaged= settings, cold-plug."""
994 self.create_iface()
995 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
996 self.assert_link_states(m1def='managed',
997 m1man='managed',
998 m1unm='unmanaged',
999 m0unm='unmanaged')
1000
1001 def test_catchall_config(self):
1002 """Verify link states with a catch-all config, hot-plug."""
1003 # Don't actually catch ALL interfaces. It messes up the host.
1004 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1005 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1006 self.create_iface()
1007 self.assert_link_states(m1def='managed',
1008 m1man='managed',
1009 m1unm='unmanaged',
1010 m0unm='managed')
1011
1012 def test_catchall_config_coldplug(self):
1013 """Verify link states with a catch-all config, cold-plug."""
1014 # Don't actually catch ALL interfaces. It messes up the host.
1015 self.write_network('all.network', "[Match]\nName=m[01]???\n")
1016 self.create_iface()
1017 subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1018 self.assert_link_states(m1def='managed',
1019 m1man='managed',
1020 m1unm='unmanaged',
1021 m0unm='managed')
1022
1023
1024 if __name__ == '__main__':
1025 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
1026 verbosity=2))