]> git.ipfire.org Git - thirdparty/systemd.git/blobdiff - test/networkd-test.py
core: use id unit when retrieving unit file state (#8038)
[thirdparty/systemd.git] / test / networkd-test.py
index 3023cac97d7d9debfe76fd8ee9baa3ec2f7070a9..3f917f0d9c80177badb82865739df050eb52fd8f 100755 (executable)
@@ -1,4 +1,5 @@
 #!/usr/bin/env python3
+# SPDX-License-Identifier: LGPL-2.1+
 #
 # networkd integration test
 # This uses temporary configuration in /run and temporary veth devices, and
 # You should have received a copy of the GNU Lesser General Public License
 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
 
+import errno
 import os
+import shutil
+import socket
+import subprocess
 import sys
+import tempfile
 import time
 import unittest
-import tempfile
-import subprocess
-import shutil
-import socket
 
-networkd_active = subprocess.call(['systemctl', 'is-active', '--quiet',
-                                   'systemd-networkd']) == 0
-have_dnsmasq = shutil.which('dnsmasq')
+HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
+
+NETWORK_UNITDIR = '/run/systemd/network'
+
+NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
+                                    path='/usr/lib/systemd:/lib/systemd')
 
 RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
 
 
-@unittest.skipIf(networkd_active,
-                 'networkd is already active')
-class ClientTestBase:
+def setUpModule():
+    """Initialize the environment, and perform sanity checks on it."""
+    if NETWORKD_WAIT_ONLINE is None:
+        raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
+
+    # Do not run any tests if the system is using networkd already.
+    if subprocess.call(['systemctl', 'is-active', '--quiet',
+                        'systemd-networkd.service']) == 0:
+        raise unittest.SkipTest('networkd is already active')
+
+    # Avoid "Failed to open /dev/tty" errors in containers.
+    os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
+
+    # Ensure the unit directory exists so tests can dump files into it.
+    os.makedirs(NETWORK_UNITDIR, exist_ok=True)
+
+
+class NetworkdTestingUtilities:
+    """Provide a set of utility functions to facilitate networkd tests.
+
+    This class must be inherited along with unittest.TestCase to define
+    some required methods.
+    """
+
+    def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
+        """Add a veth interface pair, and queue them to be removed."""
+        subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
+                              list(veth_options) +
+                              ['type', 'veth', 'peer', 'name', peer] +
+                              list(peer_options))
+        self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
+
+    def write_network(self, unit_name, contents):
+        """Write a network unit file, and queue it to be removed."""
+        unit_path = os.path.join(NETWORK_UNITDIR, unit_name)
+
+        with open(unit_path, 'w') as unit:
+            unit.write(contents)
+        self.addCleanup(os.remove, unit_path)
+
+    def write_network_dropin(self, unit_name, dropin_name, contents):
+        """Write a network unit drop-in, and queue it to be removed."""
+        dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
+        dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
+
+        os.makedirs(dropin_dir, exist_ok=True)
+        self.addCleanup(os.rmdir, dropin_dir)
+        with open(dropin_path, 'w') as dropin:
+            dropin.write(contents)
+        self.addCleanup(os.remove, dropin_path)
+
+    def read_attr(self, link, attribute):
+        """Read a link attributed from the sysfs."""
+        # Note we we don't want to check if interface `link' is managed, we
+        # want to evaluate link variable and pass the value of the link to
+        # assert_link_states e.g. eth0=managed.
+        self.assert_link_states(**{link:'managed'})
+        with open(os.path.join('/sys/class/net', link, attribute)) as f:
+            return f.readline().strip()
+
+    def assert_link_states(self, **kwargs):
+        """Match networkctl link states to the given ones.
+
+        Each keyword argument should be the name of a network interface
+        with its expected value of the "SETUP" column in output from
+        networkctl.  The interfaces have five seconds to come online
+        before the check is performed.  Every specified interface must
+        be present in the output, and any other interfaces found in the
+        output are ignored.
+
+        A special interface state "managed" is supported, which matches
+        any value in the "SETUP" column other than "unmanaged".
+        """
+        if not kwargs:
+            return
+        interfaces = set(kwargs)
+
+        # Wait for the requested interfaces, but don't fail for them.
+        subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
+                        ['--interface={}'.format(iface) for iface in kwargs])
+
+        # Validate each link state found in the networkctl output.
+        out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
+        for line in out.decode('utf-8').split('\n'):
+            fields = line.split()
+            if len(fields) >= 5 and fields[1] in kwargs:
+                iface = fields[1]
+                expected = kwargs[iface]
+                actual = fields[-1]
+                if (actual != expected and
+                        not (expected == 'managed' and actual != 'unmanaged')):
+                    self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
+                interfaces.remove(iface)
+
+        # Ensure that all requested interfaces have been covered.
+        if interfaces:
+            self.fail("Missing links in status output: {}".format(interfaces))
+
+
+class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
+    """Provide common methods for testing networkd against servers."""
+
+    def setUp(self):
+        self.write_network('port1.netdev', '''\
+[NetDev]
+Name=port1
+Kind=dummy
+MACAddress=12:34:56:78:9a:bc''')
+        self.write_network('port2.netdev', '''\
+[NetDev]
+Name=port2
+Kind=dummy
+MACAddress=12:34:56:78:9a:bd''')
+        self.write_network('mybridge.netdev', '''\
+[NetDev]
+Name=mybridge
+Kind=bridge''')
+        self.write_network('port1.network', '''\
+[Match]
+Name=port1
+[Network]
+Bridge=mybridge''')
+        self.write_network('port2.network', '''\
+[Match]
+Name=port2
+[Network]
+Bridge=mybridge''')
+        self.write_network('mybridge.network', '''\
+[Match]
+Name=mybridge
+[Network]
+DNS=192.168.250.1
+Address=192.168.250.33/24
+Gateway=192.168.250.1''')
+        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+
+    def tearDown(self):
+        subprocess.check_call(['systemctl', 'stop', 'systemd-networkd'])
+        subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
+        subprocess.check_call(['ip', 'link', 'del', 'port1'])
+        subprocess.check_call(['ip', 'link', 'del', 'port2'])
+
+    def test_bridge_init(self):
+        self.assert_link_states(
+            port1='managed',
+            port2='managed',
+            mybridge='managed')
+
+    def test_bridge_port_priority(self):
+        self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
+        self.write_network_dropin('port1.network', 'priority', '''\
+[Bridge]
+Priority=28
+''')
+        subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
+        self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
+
+    def test_bridge_port_priority_set_zero(self):
+        """It should be possible to set the bridge port priority to 0"""
+        self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
+        self.write_network_dropin('port2.network', 'priority', '''\
+[Bridge]
+Priority=0
+''')
+        subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
+        self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
+
+class ClientTestBase(NetworkdTestingUtilities):
+    """Provide common methods for testing networkd against servers."""
+
     @classmethod
     def setUpClass(klass):
         klass.orig_log_level = subprocess.check_output(
@@ -65,19 +237,7 @@ class ClientTestBase:
         self.if_router = 'router_eth42'
         self.workdir_obj = tempfile.TemporaryDirectory()
         self.workdir = self.workdir_obj.name
-        self.config = '/run/systemd/network/test_eth42.network'
-
-        # avoid "Failed to open /dev/tty" errors in containers
-        os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
-
-        # determine path to systemd-networkd-wait-online
-        for p in ['/usr/lib/systemd/systemd-networkd-wait-online',
-                  '/lib/systemd/systemd-networkd-wait-online']:
-            if os.path.exists(p):
-                self.networkd_wait_online = p
-                break
-        else:
-            self.fail('systemd-networkd-wait-online not found')
+        self.config = 'test_eth42.network'
 
         # get current journal cursor
         subprocess.check_output(['journalctl', '--sync'])
@@ -93,16 +253,10 @@ class ClientTestBase:
         subprocess.call(['ip', 'link', 'del', 'dummy0'],
                         stderr=subprocess.DEVNULL)
 
-    def writeConfig(self, fname, contents):
-        os.makedirs(os.path.dirname(fname), exist_ok=True)
-        with open(fname, 'w') as f:
-            f.write(contents)
-        self.addCleanup(os.remove, fname)
-
     def show_journal(self, unit):
         '''Show journal of given unit since start of the test'''
 
-        print('---- %s ----' % unit)
+        print('---- {} ----'.format(unit))
         subprocess.check_output(['journalctl', '--sync'])
         sys.stdout.flush()
         subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
@@ -125,13 +279,17 @@ class ClientTestBase:
 
     def do_test(self, coldplug=True, ipv6=False, extra_opts='',
                 online_timeout=10, dhcp_mode='yes'):
-        subprocess.check_call(['systemctl', 'start', 'systemd-resolved'])
-        self.writeConfig(self.config, '''\
+        try:
+            subprocess.check_call(['systemctl', 'start', 'systemd-resolved'])
+        except subprocess.CalledProcessError:
+            self.show_journal('systemd-resolved.service')
+            raise
+        self.write_network(self.config, '''\
 [Match]
-Name=%s
+Name={}
 [Network]
-DHCP=%s
-%s''' % (self.iface, dhcp_mode, extra_opts))
+DHCP={}
+{}'''.format(self.iface, dhcp_mode, extra_opts))
 
         if coldplug:
             # create interface first, then start networkd
@@ -146,14 +304,14 @@ DHCP=%s
             subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
 
         try:
-            subprocess.check_call([self.networkd_wait_online, '--interface',
+            subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
                                    self.iface, '--timeout=%i' % online_timeout])
 
             if ipv6:
                 # check iface state and IP 6 address; FIXME: we need to wait a bit
                 # longer, as the iface is "configured" already with IPv4 *or*
                 # IPv6, but we want to wait for both
-                for timeout in range(10):
+                for _ in range(10):
                     out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
                     if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out:
                         break
@@ -166,38 +324,38 @@ DHCP=%s
             else:
                 # should have link-local address on IPv6 only
                 out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
-                self.assertRegex(out, b'inet6 fe80::.* scope link')
+                self.assertRegex(out, br'inet6 fe80::.* scope link')
                 self.assertNotIn(b'scope global', out)
 
             # should have IPv4 address
             out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
             self.assertIn(b'state UP', out)
-            self.assertRegex(out, b'inet 192.168.5.\d+/.* scope global dynamic')
+            self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
 
             # check networkctl state
             out = subprocess.check_output(['networkctl'])
-            self.assertRegex(out, ('%s\s+ether\s+routable\s+unmanaged' % self.if_router).encode())
-            self.assertRegex(out, ('%s\s+ether\s+routable\s+configured' % self.iface).encode())
+            self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
+            self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
 
             out = subprocess.check_output(['networkctl', 'status', self.iface])
-            self.assertRegex(out, b'Type:\s+ether')
-            self.assertRegex(out, b'State:\s+routable.*configured')
-            self.assertRegex(out, b'Address:\s+192.168.5.\d+')
+            self.assertRegex(out, br'Type:\s+ether')
+            self.assertRegex(out, br'State:\s+routable.*configured')
+            self.assertRegex(out, br'Address:\s+192.168.5.\d+')
             if ipv6:
-                self.assertRegex(out, b'2600::')
+                self.assertRegex(out, br'2600::')
             else:
-                self.assertNotIn(b'2600::', out)
-            self.assertRegex(out, b'fe80::')
-            self.assertRegex(out, b'Gateway:\s+192.168.5.1')
-            self.assertRegex(out, b'DNS:\s+192.168.5.1')
+                self.assertNotIn(br'2600::', out)
+            self.assertRegex(out, br'fe80::')
+            self.assertRegex(out, br'Gateway:\s+192.168.5.1')
+            self.assertRegex(out, br'DNS:\s+192.168.5.1')
         except (AssertionError, subprocess.CalledProcessError):
             # show networkd status, journal, and DHCP server log on failure
-            with open(self.config) as f:
-                print('\n---- %s ----\n%s' % (self.config, f.read()))
+            with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
+                print('\n---- {} ----\n{}'.format(self.config, f.read()))
             print('---- interface status ----')
             sys.stdout.flush()
             subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
-            print('---- networkctl status %s ----' % self.iface)
+            print('---- networkctl status {} ----'.format(self.iface))
             sys.stdout.flush()
             subprocess.call(['networkctl', 'status', self.iface])
             self.show_journal('systemd-networkd.service')
@@ -247,12 +405,12 @@ DHCP=%s
         self.do_test(coldplug=False, ipv6=True)
 
     def test_route_only_dns(self):
-        self.writeConfig('/run/systemd/network/myvpn.netdev', '''\
+        self.write_network('myvpn.netdev', '''\
 [NetDev]
 Name=dummy0
 Kind=dummy
 MACAddress=12:34:56:78:9a:bc''')
-        self.writeConfig('/run/systemd/network/myvpn.network', '''\
+        self.write_network('myvpn.network', '''\
 [Match]
 Name=dummy0
 [Network]
@@ -273,20 +431,16 @@ Domains= ~company''')
             self.assertNotIn('nameserver 192.168.42.1\n', contents)
 
     def test_route_only_dns_all_domains(self):
-        with open('/run/systemd/network/myvpn.netdev', 'w') as f:
-            f.write('''[NetDev]
+        self.write_network('myvpn.netdev', '''[NetDev]
 Name=dummy0
 Kind=dummy
 MACAddress=12:34:56:78:9a:bc''')
-        with open('/run/systemd/network/myvpn.network', 'w') as f:
-            f.write('''[Match]
+        self.write_network('myvpn.network', '''[Match]
 Name=dummy0
 [Network]
 Address=192.168.42.100
 DNS=192.168.42.1
 Domains= ~company ~.''')
-        self.addCleanup(os.remove, '/run/systemd/network/myvpn.netdev')
-        self.addCleanup(os.remove, '/run/systemd/network/myvpn.network')
 
         self.do_test(coldplug=True, ipv6=False,
                      extra_opts='IPv6AcceptRouterAdvertisements=False')
@@ -303,7 +457,7 @@ Domains= ~company ~.''')
         self.assertIn('nameserver 192.168.42.1\n', contents)
 
 
-@unittest.skipUnless(have_dnsmasq, 'dnsmasq not installed')
+@unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
 class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
     '''Test networkd client against dnsmasq'''
 
@@ -358,7 +512,7 @@ class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
         '''Print DHCP server log for debugging failures'''
 
         with open(self.dnsmasq_log) as f:
-            sys.stdout.write('\n\n---- dnsmasq log ----\n%s\n------\n\n' % f.read())
+            sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
 
     def test_resolved_domain_restricted_dns(self):
         '''resolved: domain-restricted DNS servers'''
@@ -366,18 +520,16 @@ class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
         # create interface for generic connections; this will map all DNS names
         # to 192.168.42.1
         self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
-        self.writeConfig('/run/systemd/network/general.network', '''\
+        self.write_network('general.network', '''\
 [Match]
-Name=%s
+Name={}
 [Network]
 DHCP=ipv4
-IPv6AcceptRA=False''' % self.iface)
+IPv6AcceptRA=False'''.format(self.iface))
 
         # create second device/dnsmasq for a .company/.lab VPN interface
         # static IPs for simplicity
-        subprocess.check_call(['ip', 'link', 'add', 'name', 'testvpnclient', 'type',
-                               'veth', 'peer', 'name', 'testvpnrouter'])
-        self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', 'testvpnrouter'])
+        self.add_veth_pair('testvpnclient', 'testvpnrouter')
         subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
         subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
         subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
@@ -392,7 +544,7 @@ IPv6AcceptRA=False''' % self.iface)
         self.addCleanup(vpn_dnsmasq.wait)
         self.addCleanup(vpn_dnsmasq.kill)
 
-        self.writeConfig('/run/systemd/network/vpn.network', '''\
+        self.write_network('vpn.network', '''\
 [Match]
 Name=testvpnclient
 [Network]
@@ -402,7 +554,7 @@ DNS=10.241.3.1
 Domains= ~company ~lab''')
 
         subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
-        subprocess.check_call([self.networkd_wait_online, '--interface', self.iface,
+        subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
                                '--interface=testvpnclient', '--timeout=20'])
 
         # ensure we start fresh with every test
@@ -427,13 +579,68 @@ Domains= ~company ~lab''')
         # VPN domains should only be sent to VPN DNS
         self.assertRegex(vpn_log, 'query.*math.lab')
         self.assertRegex(vpn_log, 'query.*cantina.company')
-        self.assertNotIn('lab', general_log)
-        self.assertNotIn('company', general_log)
+        self.assertNotIn('.lab', general_log)
+        self.assertNotIn('.company', general_log)
 
         # general domains should not be sent to the VPN DNS
         self.assertRegex(general_log, 'query.*megasearch.net')
         self.assertNotIn('megasearch.net', vpn_log)
 
+    def test_resolved_etc_hosts(self):
+        '''resolved queries to /etc/hosts'''
+
+        # FIXME: -t MX query fails with enabled DNSSEC (even when using
+        # the known negative trust anchor .internal instead of .example)
+        conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
+        os.makedirs(os.path.dirname(conf), exist_ok=True)
+        with open(conf, 'w') as f:
+            f.write('[Resolve]\nDNSSEC=no')
+        self.addCleanup(os.remove, conf)
+
+        # create /etc/hosts bind mount which resolves my.example for IPv4
+        hosts = os.path.join(self.workdir, 'hosts')
+        with open(hosts, 'w') as f:
+            f.write('172.16.99.99  my.example\n')
+        subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
+        self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
+        subprocess.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
+
+        # note: different IPv4 address here, so that it's easy to tell apart
+        # what resolved the query
+        self.create_iface(dnsmasq_opts=['--host-record=my.example,172.16.99.1,2600::99:99',
+                                        '--host-record=other.example,172.16.0.42,2600::42',
+                                        '--mx-host=example,mail.example'],
+                          ipv6=True)
+        self.do_test(coldplug=None, ipv6=True)
+
+        try:
+            # family specific queries
+            out = subprocess.check_output(['systemd-resolve', '-4', 'my.example'])
+            self.assertIn(b'my.example: 172.16.99.99', out)
+            # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
+            # it's considered a sufficient source
+            self.assertNotEqual(subprocess.call(['systemd-resolve', '-6', 'my.example']), 0)
+            # "any family" query; IPv4 should come from /etc/hosts
+            out = subprocess.check_output(['systemd-resolve', 'my.example'])
+            self.assertIn(b'my.example: 172.16.99.99', out)
+            # IP → name lookup; again, takes the /etc/hosts one
+            out = subprocess.check_output(['systemd-resolve', '172.16.99.99'])
+            self.assertIn(b'172.16.99.99: my.example', out)
+
+            # non-address RRs should fall back to DNS
+            out = subprocess.check_output(['systemd-resolve', '--type=MX', 'example'])
+            self.assertIn(b'example IN MX 1 mail.example', out)
+
+            # other domains query DNS
+            out = subprocess.check_output(['systemd-resolve', 'other.example'])
+            self.assertIn(b'172.16.0.42', out)
+            out = subprocess.check_output(['systemd-resolve', '172.16.0.42'])
+            self.assertIn(b'172.16.0.42: other.example', out)
+        except (AssertionError, subprocess.CalledProcessError):
+            self.show_journal('systemd-resolved.service')
+            self.print_server_log()
+            raise
+
     def test_transient_hostname(self):
         '''networkd sets transient hostname from DHCP'''
 
@@ -445,7 +652,7 @@ Domains= ~company ~lab''')
             self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
         subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
 
-        self.create_iface(dnsmasq_opts=['--dhcp-host=%s,192.168.5.210,testgreen' % self.iface_mac])
+        self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
         self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
 
         try:
@@ -462,7 +669,7 @@ Domains= ~company ~lab''')
                 sys.stdout.write('[retry %i] ' % retry)
                 sys.stdout.flush()
             else:
-                self.fail('Transient hostname not found in hostnamectl:\n%s' % out.decode())
+                self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
             # and also applied to the system
             self.assertEqual(socket.gethostname(), 'testgreen')
         except AssertionError:
@@ -480,7 +687,7 @@ Domains= ~company ~lab''')
             self.writeConfig('/etc/hostname', orig_hostname)
         subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
 
-        self.create_iface(dnsmasq_opts=['--dhcp-host=%s,192.168.5.210,testgreen' % self.iface_mac])
+        self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
         self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
 
         try:
@@ -512,7 +719,8 @@ class NetworkdClientTest(ClientTestBase, unittest.TestCase):
         self.addCleanup(os.remove, script)
         with os.fdopen(fd, 'w+') as f:
             f.write('''\
-#!/bin/sh -eu
+#!/bin/sh
+set -eu
 mkdir -p /run/systemd/network
 mkdir -p /run/systemd/netif
 mount -t tmpfs none /run/systemd/network
@@ -545,7 +753,7 @@ DNS=192.168.5.1
 EOF
 
 # run networkd as in systemd-networkd.service
-exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; p}')
+exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
 ''' % {'ifr': self.if_router, 'ifc': self.iface, 'addr6': ipv6 and 'Address=2600::1/64' or '',
        'dhopts': dhcpserver_opts or ''})
 
@@ -558,7 +766,7 @@ exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//
                                '--service-type=notify', script])
 
         # wait until devices got created
-        for timeout in range(50):
+        for _ in range(50):
             out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
             if b'state UP' in out and b'scope global' in out:
                 break
@@ -592,12 +800,12 @@ exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//
         # we don't use this interface for this test
         self.if_router = None
 
-        self.writeConfig('/run/systemd/network/test.netdev', '''\
+        self.write_network('test.netdev', '''\
 [NetDev]
 Name=dummy0
 Kind=dummy
 MACAddress=12:34:56:78:9a:bc''')
-        self.writeConfig('/run/systemd/network/test.network', '''\
+        self.write_network('test.network', '''\
 [Match]
 Name=dummy0
 [Network]
@@ -624,12 +832,12 @@ Domains= one two three four five six seven eight nine ten''')
 
         name_prefix = 'a' * 60
 
-        self.writeConfig('/run/systemd/network/test.netdev', '''\
+        self.write_network('test.netdev', '''\
 [NetDev]
 Name=dummy0
 Kind=dummy
 MACAddress=12:34:56:78:9a:bc''')
-        self.writeConfig('/run/systemd/network/test.network', '''\
+        self.write_network('test.network', '''\
 [Match]
 Name=dummy0
 [Network]
@@ -652,18 +860,18 @@ Domains={p}0 {p}1 {p}2 {p}3 {p}4'''.format(p=name_prefix))
         # we don't use this interface for this test
         self.if_router = None
 
-        self.writeConfig('/run/systemd/network/test.netdev', '''\
+        self.write_network('test.netdev', '''\
 [NetDev]
 Name=dummy0
 Kind=dummy
 MACAddress=12:34:56:78:9a:bc''')
-        self.writeConfig('/run/systemd/network/test.network', '''\
+        self.write_network('test.network', '''\
 [Match]
 Name=dummy0
 [Network]
 Address=192.168.42.100
 DNS=192.168.42.1''')
-        self.writeConfig('/run/systemd/network/test.network.d/dns.conf', '''\
+        self.write_network_dropin('test.network', 'dns', '''\
 [Network]
 DNS=127.0.0.1''')
 
@@ -704,6 +912,115 @@ DNS=127.0.0.1''')
             raise
 
 
+class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
+    """Test [Match] sections in .network files.
+
+    Be aware that matching the test host's interfaces will wipe their
+    configuration, so as a precaution, all network files should have a
+    restrictive [Match] section to only ever interfere with the
+    temporary veth interfaces created here.
+    """
+
+    def tearDown(self):
+        """Stop networkd."""
+        subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
+
+    def test_basic_matching(self):
+        """Verify the Name= line works throughout this class."""
+        self.add_veth_pair('test_if1', 'fake_if2')
+        self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
+        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+        self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
+
+    def test_inverted_matching(self):
+        """Verify that a '!'-prefixed value inverts the match."""
+        # Use a MAC address as the interfaces' common matching attribute
+        # to avoid depending on udev, to support testing in containers.
+        mac = '00:01:02:03:98:99'
+        self.add_veth_pair('test_veth', 'test_peer',
+                           ['addr', mac], ['addr', mac])
+        self.write_network('no-veth.network', """\
+[Match]
+MACAddress={}
+Name=!nonexistent *peer*
+[Network]""".format(mac))
+        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+        self.assert_link_states(test_veth='managed', test_peer='unmanaged')
+
+
+class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
+    """Test if networkd manages the correct interfaces."""
+
+    def setUp(self):
+        """Write .network files to match the named veth devices."""
+        # Define the veth+peer pairs to be created.
+        # Their pairing doesn't actually matter, only their names do.
+        self.veths = {
+            'm1def': 'm0unm',
+            'm1man': 'm1unm',
+        }
+
+        # Define the contents of .network files to be read in order.
+        self.configs = (
+            "[Match]\nName=m1def\n",
+            "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
+            "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
+        )
+
+        # Write out the .network files to be cleaned up automatically.
+        for i, config in enumerate(self.configs):
+            self.write_network("%02d-test.network" % i, config)
+
+    def tearDown(self):
+        """Stop networkd."""
+        subprocess.call(['systemctl', 'stop', 'systemd-networkd'])
+
+    def create_iface(self):
+        """Create temporary veth pairs for interface matching."""
+        for veth, peer in self.veths.items():
+            self.add_veth_pair(veth, peer)
+
+    def test_unmanaged_setting(self):
+        """Verify link states with Unmanaged= settings, hot-plug."""
+        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+        self.create_iface()
+        self.assert_link_states(m1def='managed',
+                                m1man='managed',
+                                m1unm='unmanaged',
+                                m0unm='unmanaged')
+
+    def test_unmanaged_setting_coldplug(self):
+        """Verify link states with Unmanaged= settings, cold-plug."""
+        self.create_iface()
+        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+        self.assert_link_states(m1def='managed',
+                                m1man='managed',
+                                m1unm='unmanaged',
+                                m0unm='unmanaged')
+
+    def test_catchall_config(self):
+        """Verify link states with a catch-all config, hot-plug."""
+        # Don't actually catch ALL interfaces.  It messes up the host.
+        self.write_network('all.network', "[Match]\nName=m[01]???\n")
+        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+        self.create_iface()
+        self.assert_link_states(m1def='managed',
+                                m1man='managed',
+                                m1unm='unmanaged',
+                                m0unm='managed')
+
+    def test_catchall_config_coldplug(self):
+        """Verify link states with a catch-all config, cold-plug."""
+        # Don't actually catch ALL interfaces.  It messes up the host.
+        self.write_network('all.network', "[Match]\nName=m[01]???\n")
+        self.create_iface()
+        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+        self.assert_link_states(m1def='managed',
+                                m1man='managed',
+                                m1unm='unmanaged',
+                                m0unm='managed')
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
                                                      verbosity=2))