]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
test-network: add test for DHCPv4 DNR
authorRonan Pigott <ronan@rjp.ie>
Tue, 16 Jan 2024 23:07:54 +0000 (16:07 -0700)
committerRonan Pigott <ronan@rjp.ie>
Mon, 21 Oct 2024 16:10:19 +0000 (09:10 -0700)
This will test that networkd/resolved can understand the V4_DNR DHCP
option.

test/test-network/systemd-networkd-tests.py

index 687fda7b3d8450ff8475f169c8550f3b792e94b8..66e14e6f1d9ee46a27d84d690c98811a47ef000f 100755 (executable)
 
 import argparse
 import datetime
+import enum
 import errno
 import itertools
+import ipaddress
 import json
 import os
 import pathlib
@@ -27,6 +29,7 @@ import re
 import shutil
 import signal
 import socket
+import struct
 import subprocess
 import sys
 import time
@@ -742,6 +745,30 @@ def stop_by_pid_file(pid_file):
                 print(f"Unexpected exception when waiting for {pid} to die: {e.errno}")
     rm_f(pid_file)
 
+def dnr_v4_instance_data(adn, addrs=None, prio=1, alpns=("dot",), dohpath=None):
+    b = bytes()
+    pack = lambda c, w=1: struct.pack('>' + "_BH_I"[w], len(c)) + c
+    pyton = lambda n, w=2: struct.pack('>' + "_BH_I"[w], n)
+    ipv4 = ipaddress.IPv4Address
+    class SvcParam(enum.Enum):
+        ALPN = 1
+        DOHPATH = 7
+
+    data = pyton(prio)
+
+    adn = adn.rstrip('.') + '.'
+    data += pack(b.join(pack(label.encode('ascii')) for label in adn.split('.')))
+
+    if not addrs: # adn-only mode
+        return pack(data, 2)
+
+    data += pack(b.join(ipv4(addr).packed for addr in addrs))
+    data += pyton(SvcParam.ALPN.value) + pack(b.join(pack(alpn.encode('ascii')) for alpn in alpns), 2)
+    if dohpath is not None:
+        data += pyton(SvcParam.DOHPATH.value) + pack(dohpath.encode('utf-8'), 2)
+
+    return pack(data, 2)
+
 def start_dnsmasq(*additional_options, interface='veth-peer', ra_mode=None, ipv4_range='192.168.5.10,192.168.5.200', ipv4_router='192.168.5.1', ipv6_range='2600::10,2600::20'):
     if ra_mode:
         ra_mode = f',{ra_mode}'
@@ -7253,6 +7280,48 @@ class NetworkdDHCPClientTests(unittest.TestCase, Utilities):
         check(self, False, False, True)
         check(self, False, False, False)
 
+    def test_dhcp_client_use_dnr(self):
+        def check(self, ipv4, ipv6):
+            os.makedirs(os.path.join(network_unit_dir, '25-dhcp-client.network.d'), exist_ok=True)
+            with open(os.path.join(network_unit_dir, '25-dhcp-client.network.d/override.conf'), mode='w', encoding='utf-8') as f:
+                f.write('[DHCPv4]\nUseDNS=')
+                f.write('yes' if ipv4 else 'no')
+                f.write('\n[DHCPv6]\nUseDNS=')
+                f.write('yes' if ipv6 else 'no')
+                f.write('\n[IPv6AcceptRA]\nUseDNS=no')
+
+            networkctl_reload()
+            self.wait_online('veth99:routable')
+
+            # link becomes 'routable' when at least one protocol provide an valid address. Hence, we need to explicitly wait for both addresses.
+            self.wait_address('veth99', r'inet 192.168.5.[0-9]*/24 metric 1024 brd 192.168.5.255 scope global dynamic', ipv='-4')
+            self.wait_address('veth99', r'inet6 2600::[0-9a-f]*/128 scope global (dynamic noprefixroute|noprefixroute dynamic)', ipv='-6')
+
+            # make resolved re-read the link state file
+            resolvectl('revert', 'veth99')
+
+            output = resolvectl('dns', 'veth99')
+            print(output)
+            if ipv4:
+                self.assertIn('8.8.8.8#dns.google', output)
+                self.assertIn('0.7.4.2#homer.simpson', output)
+            else:
+                self.assertNotIn('8.8.8.8#dns.google', output)
+
+            check_json(networkctl_json())
+
+        copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client.network', copy_dropins=False)
+
+        start_networkd()
+        self.wait_online('veth-peer:carrier')
+        dnr_v4 = dnr_v4_instance_data(adn = "dns.google", addrs = ["8.8.8.8", "8.8.4.4"])
+        dnr_v4 += dnr_v4_instance_data(adn = "homer.simpson", addrs = ["0.7.4.2"], alpns = ("dot","h2","h3"), dohpath = "/springfield{?dns}")
+        masq = lambda bs: ':'.join(f"{b:02x}" for b in bs)
+        start_dnsmasq(f'--dhcp-option=162,{masq(dnr_v4)}')
+
+        check(self, True, False)
+        check(self, False, False)
+
     def test_dhcp_client_use_captive_portal(self):
         def check(self, ipv4, ipv6):
             os.makedirs(os.path.join(network_unit_dir, '25-dhcp-client.network.d'), exist_ok=True)