#!/usr/bin/env python3
+# SPDX-License-Identifier: LGPL-2.1+
#
# networkd integration test
# This uses temporary configuration in /run and temporary veth devices, and
# ATTENTION: This uses the *installed* networkd, not the one from the built
# source tree.
#
-# (C) 2015 Canonical Ltd.
+# © 2015 Canonical Ltd.
# Author: Martin Pitt <martin.pitt@ubuntu.com>
-#
-# systemd is free software; you can redistribute it and/or modify it
-# under the terms of the GNU Lesser General Public License as published by
-# the Free Software Foundation; either version 2.1 of the License, or
-# (at your option) any later version.
-
-# systemd is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with systemd; If not, see <http://www.gnu.org/licenses/>.
import errno
import os
+import shutil
+import socket
+import subprocess
import sys
+import tempfile
import time
import unittest
-import tempfile
-import subprocess
-import shutil
-import socket
HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
+tmpmounts = []
+running_units = []
+stopped_units = []
+
def setUpModule():
+ global tmpmounts
+
"""Initialize the environment, and perform sanity checks on it."""
if NETWORKD_WAIT_ONLINE is None:
raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
- # Do not run any tests if the system is using networkd already.
- if subprocess.call(['systemctl', 'is-active', '--quiet',
- 'systemd-networkd.service']) == 0:
- raise unittest.SkipTest('networkd is already active')
+ # Do not run any tests if the system is using networkd already and it's not virtualized
+ if (subprocess.call(['systemctl', 'is-active', '--quiet', 'systemd-networkd.service']) == 0 and
+ subprocess.call(['systemd-detect-virt', '--quiet']) != 0):
+ raise unittest.SkipTest('not virtualized and networkd is already active')
+ # Ensure we don't mess with an existing networkd config
+ for u in ['systemd-networkd.socket', 'systemd-networkd', 'systemd-resolved']:
+ if subprocess.call(['systemctl', 'is-active', '--quiet', u]) == 0:
+ subprocess.call(['systemctl', 'stop', u])
+ running_units.append(u)
+ else:
+ stopped_units.append(u)
+ for d in ['/etc/systemd/network', '/run/systemd/network',
+ '/run/systemd/netif', '/run/systemd/resolve']:
+ if os.path.isdir(d):
+ subprocess.check_call(["mount", "-t", "tmpfs", "none", d])
+ tmpmounts.append(d)
+ if os.path.isdir('/run/systemd/resolve'):
+ os.chmod('/run/systemd/resolve', 0o755)
# Avoid "Failed to open /dev/tty" errors in containers.
os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
# Ensure the unit directory exists so tests can dump files into it.
os.makedirs(NETWORK_UNITDIR, exist_ok=True)
+ # create static systemd-network user for networkd-test-router.service (it
+ # needs to do some stuff as root and can't start as user; but networkd
+ # still insists on the user)
+ subprocess.check_call(['adduser', '--system', '--no-create-home', 'systemd-network'])
+
+
+def tearDownModule():
+ global tmpmounts
+ for d in tmpmounts:
+ subprocess.check_call(["umount", d])
+ for u in stopped_units:
+ subprocess.call(["systemctl", "stop", u])
+ for u in running_units:
+ subprocess.call(["systemctl", "restart", u])
+
class NetworkdTestingUtilities:
"""Provide a set of utility functions to facilitate networkd tests.
def write_network_dropin(self, unit_name, dropin_name, contents):
"""Write a network unit drop-in, and queue it to be removed."""
- dropin_dir = os.path.join(NETWORK_UNITDIR, "%s.d" % unit_name)
- dropin_path = os.path.join(dropin_dir, "%s.conf" % dropin_name)
+ dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
+ dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
os.makedirs(dropin_dir, exist_ok=True)
+ self.addCleanup(os.rmdir, dropin_dir)
with open(dropin_path, 'w') as dropin:
dropin.write(contents)
self.addCleanup(os.remove, dropin_path)
+ def read_attr(self, link, attribute):
+ """Read a link attributed from the sysfs."""
+ # Note we we don't want to check if interface `link' is managed, we
+ # want to evaluate link variable and pass the value of the link to
+ # assert_link_states e.g. eth0=managed.
+ self.assert_link_states(**{link:'managed'})
+ with open(os.path.join('/sys/class/net', link, attribute)) as f:
+ return f.readline().strip()
+
def assert_link_states(self, **kwargs):
"""Match networkctl link states to the given ones.
# Wait for the requested interfaces, but don't fail for them.
subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
- ['--interface=%s' % iface for iface in kwargs])
+ ['--interface={}'.format(iface) for iface in kwargs])
# Validate each link state found in the networkctl output.
out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
actual = fields[-1]
if (actual != expected and
not (expected == 'managed' and actual != 'unmanaged')):
- self.fail("Link %s expects state %s, found %s" %
- (iface, expected, actual))
+ self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
interfaces.remove(iface)
# Ensure that all requested interfaces have been covered.
if interfaces:
- self.fail("Missing links in status output: %s" % interfaces)
+ self.fail("Missing links in status output: {}".format(interfaces))
+class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
+ """Provide common methods for testing networkd against servers."""
+
+ def setUp(self):
+ self.write_network('port1.netdev', '''\
+[NetDev]
+Name=port1
+Kind=dummy
+MACAddress=12:34:56:78:9a:bc''')
+ self.write_network('port2.netdev', '''\
+[NetDev]
+Name=port2
+Kind=dummy
+MACAddress=12:34:56:78:9a:bd''')
+ self.write_network('mybridge.netdev', '''\
+[NetDev]
+Name=mybridge
+Kind=bridge''')
+ self.write_network('port1.network', '''\
+[Match]
+Name=port1
+[Network]
+Bridge=mybridge''')
+ self.write_network('port2.network', '''\
+[Match]
+Name=port2
+[Network]
+Bridge=mybridge''')
+ self.write_network('mybridge.network', '''\
+[Match]
+Name=mybridge
+[Network]
+DNS=192.168.250.1
+Address=192.168.250.33/24
+Gateway=192.168.250.1''')
+ subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
+ subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+
+ def tearDown(self):
+ subprocess.check_call(['systemctl', 'stop', 'systemd-networkd'])
+ subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
+ subprocess.check_call(['ip', 'link', 'del', 'port1'])
+ subprocess.check_call(['ip', 'link', 'del', 'port2'])
+
+ def test_bridge_init(self):
+ self.assert_link_states(
+ port1='managed',
+ port2='managed',
+ mybridge='managed')
+
+ def test_bridge_port_priority(self):
+ self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
+ self.write_network_dropin('port1.network', 'priority', '''\
+[Bridge]
+Priority=28
+''')
+ subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
+ self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
+
+ def test_bridge_port_priority_set_zero(self):
+ """It should be possible to set the bridge port priority to 0"""
+ self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
+ self.write_network_dropin('port2.network', 'priority', '''\
+[Bridge]
+Priority=0
+''')
+ subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
+ self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
+
+ def test_bridge_port_property(self):
+ """Test the "[Bridge]" section keys"""
+ self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
+ self.write_network_dropin('port2.network', 'property', '''\
+[Bridge]
+UnicastFlood=true
+HairPin=true
+UseBPDU=true
+FastLeave=true
+AllowPortToBeRoot=true
+Cost=555
+Priority=23
+''')
+ subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
+
+ self.assertEqual(self.read_attr('port2', 'brport/priority'), '23')
+ self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1')
+ self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555')
+ self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1')
+ self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1')
+ self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1')
+ self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1')
+
class ClientTestBase(NetworkdTestingUtilities):
"""Provide common methods for testing networkd against servers."""
self.assertTrue(out.startswith('-- cursor:'))
self.journal_cursor = out.split()[-1]
+ subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
+
def tearDown(self):
self.shutdown_iface()
subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
def show_journal(self, unit):
'''Show journal of given unit since start of the test'''
- print('---- %s ----' % unit)
+ print('---- {} ----'.format(unit))
subprocess.check_output(['journalctl', '--sync'])
sys.stdout.flush()
subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
def do_test(self, coldplug=True, ipv6=False, extra_opts='',
online_timeout=10, dhcp_mode='yes'):
- subprocess.check_call(['systemctl', 'start', 'systemd-resolved'])
+ try:
+ subprocess.check_call(['systemctl', 'start', 'systemd-resolved'])
+ except subprocess.CalledProcessError:
+ self.show_journal('systemd-resolved.service')
+ raise
self.write_network(self.config, '''\
[Match]
-Name=%s
+Name={}
[Network]
-DHCP=%s
-%s''' % (self.iface, dhcp_mode, extra_opts))
+DHCP={}
+{}'''.format(self.iface, dhcp_mode, extra_opts))
if coldplug:
# create interface first, then start networkd
# check networkctl state
out = subprocess.check_output(['networkctl'])
- self.assertRegex(out, (r'%s\s+ether\s+routable\s+unmanaged' % self.if_router).encode())
- self.assertRegex(out, (r'%s\s+ether\s+routable\s+configured' % self.iface).encode())
+ self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
+ self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
out = subprocess.check_output(['networkctl', 'status', self.iface])
self.assertRegex(out, br'Type:\s+ether')
except (AssertionError, subprocess.CalledProcessError):
# show networkd status, journal, and DHCP server log on failure
with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
- print('\n---- %s ----\n%s' % (self.config, f.read()))
+ print('\n---- {} ----\n{}'.format(self.config, f.read()))
print('---- interface status ----')
sys.stdout.flush()
subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
- print('---- networkctl status %s ----' % self.iface)
+ print('---- networkctl status {} ----'.format(self.iface))
sys.stdout.flush()
subprocess.call(['networkctl', 'status', self.iface])
self.show_journal('systemd-networkd.service')
'''Print DHCP server log for debugging failures'''
with open(self.dnsmasq_log) as f:
- sys.stdout.write('\n\n---- dnsmasq log ----\n%s\n------\n\n' % f.read())
+ sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
def test_resolved_domain_restricted_dns(self):
'''resolved: domain-restricted DNS servers'''
self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
self.write_network('general.network', '''\
[Match]
-Name=%s
+Name={}
[Network]
DHCP=ipv4
-IPv6AcceptRA=False''' % self.iface)
+IPv6AcceptRA=False'''.format(self.iface))
# create second device/dnsmasq for a .company/.lab VPN interface
# static IPs for simplicity
# VPN domains should only be sent to VPN DNS
self.assertRegex(vpn_log, 'query.*math.lab')
self.assertRegex(vpn_log, 'query.*cantina.company')
- self.assertNotIn('lab', general_log)
- self.assertNotIn('company', general_log)
+ self.assertNotIn('.lab', general_log)
+ self.assertNotIn('.company', general_log)
# general domains should not be sent to the VPN DNS
self.assertRegex(general_log, 'query.*megasearch.net')
self.assertNotIn('megasearch.net', vpn_log)
+ def test_resolved_etc_hosts(self):
+ '''resolved queries to /etc/hosts'''
+
+ # FIXME: -t MX query fails with enabled DNSSEC (even when using
+ # the known negative trust anchor .internal instead of .example)
+ conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
+ os.makedirs(os.path.dirname(conf), exist_ok=True)
+ with open(conf, 'w') as f:
+ f.write('[Resolve]\nDNSSEC=no')
+ self.addCleanup(os.remove, conf)
+
+ # create /etc/hosts bind mount which resolves my.example for IPv4
+ hosts = os.path.join(self.workdir, 'hosts')
+ with open(hosts, 'w') as f:
+ f.write('172.16.99.99 my.example\n')
+ subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
+ self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
+ subprocess.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
+
+ # note: different IPv4 address here, so that it's easy to tell apart
+ # what resolved the query
+ self.create_iface(dnsmasq_opts=['--host-record=my.example,172.16.99.1,2600::99:99',
+ '--host-record=other.example,172.16.0.42,2600::42',
+ '--mx-host=example,mail.example'],
+ ipv6=True)
+ self.do_test(coldplug=None, ipv6=True)
+
+ try:
+ # family specific queries
+ out = subprocess.check_output(['systemd-resolve', '-4', 'my.example'])
+ self.assertIn(b'my.example: 172.16.99.99', out)
+ # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
+ # it's considered a sufficient source
+ self.assertNotEqual(subprocess.call(['systemd-resolve', '-6', 'my.example']), 0)
+ # "any family" query; IPv4 should come from /etc/hosts
+ out = subprocess.check_output(['systemd-resolve', 'my.example'])
+ self.assertIn(b'my.example: 172.16.99.99', out)
+ # IP → name lookup; again, takes the /etc/hosts one
+ out = subprocess.check_output(['systemd-resolve', '172.16.99.99'])
+ self.assertIn(b'172.16.99.99: my.example', out)
+
+ # non-address RRs should fall back to DNS
+ out = subprocess.check_output(['systemd-resolve', '--type=MX', 'example'])
+ self.assertIn(b'example IN MX 1 mail.example', out)
+
+ # other domains query DNS
+ out = subprocess.check_output(['systemd-resolve', 'other.example'])
+ self.assertIn(b'172.16.0.42', out)
+ out = subprocess.check_output(['systemd-resolve', '172.16.0.42'])
+ self.assertIn(b'172.16.0.42: other.example', out)
+ except (AssertionError, subprocess.CalledProcessError):
+ self.show_journal('systemd-resolved.service')
+ self.print_server_log()
+ raise
+
def test_transient_hostname(self):
'''networkd sets transient hostname from DHCP'''
self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
- self.create_iface(dnsmasq_opts=['--dhcp-host=%s,192.168.5.210,testgreen' % self.iface_mac])
+ self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
try:
sys.stdout.write('[retry %i] ' % retry)
sys.stdout.flush()
else:
- self.fail('Transient hostname not found in hostnamectl:\n%s' % out.decode())
+ self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
# and also applied to the system
self.assertEqual(socket.gethostname(), 'testgreen')
except AssertionError:
self.writeConfig('/etc/hostname', orig_hostname)
subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
- self.create_iface(dnsmasq_opts=['--dhcp-host=%s,192.168.5.210,testgreen' % self.iface_mac])
+ self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
try:
self.addCleanup(os.remove, script)
with os.fdopen(fd, 'w+') as f:
f.write('''\
-#!/bin/sh -eu
+#!/bin/sh
+set -eu
mkdir -p /run/systemd/network
mkdir -p /run/systemd/netif
mount -t tmpfs none /run/systemd/network
EOF
# run networkd as in systemd-networkd.service
-exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; p}')
+exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
''' % {'ifr': self.if_router, 'ifc': self.iface, 'addr6': ipv6 and 'Address=2600::1/64' or '',
'dhopts': dhcpserver_opts or ''})
['addr', mac], ['addr', mac])
self.write_network('no-veth.network', """\
[Match]
-MACAddress=%s
+MACAddress={}
Name=!nonexistent *peer*
-[Network]""" % mac)
+[Network]""".format(mac))
subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
self.assert_link_states(test_veth='managed', test_peer='unmanaged')