]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Replace dns.resolver module in system tests
authorMichal Nowak <mnowak@isc.org>
Thu, 11 Jul 2024 11:01:29 +0000 (13:01 +0200)
committerMichal Nowak <mnowak@isc.org>
Thu, 12 Sep 2024 09:42:22 +0000 (11:42 +0200)
bin/tests/system/dnstap/tests_dnstap.py
bin/tests/system/rpzextra/tests_rpzextra.py
bin/tests/system/shutdown/tests_shutdown.py

index 0dbf2aa3b804a7976845f5e4babe5359ceb69787..555435d8101975190b96fb4fdb6d48ddb443210f 100644 (file)
@@ -15,10 +15,12 @@ import os
 import re
 import subprocess
 
+import isctest
 import pytest
 
+import dns.message
+
 pytest.importorskip("dns", minversion="2.0.0")
-import dns.resolver
 
 
 def run_rndc(server, rndc_command):
@@ -34,15 +36,13 @@ def run_rndc(server, rndc_command):
     subprocess.check_output(cmdline, stderr=subprocess.STDOUT, timeout=10)
 
 
-def test_dnstap_dispatch_socket_addresses(named_port):
-    # Prepare for querying ns3.
-    resolver = dns.resolver.Resolver()
-    resolver.nameservers = ["10.53.0.3"]
-    resolver.port = named_port
-
+def test_dnstap_dispatch_socket_addresses():
     # Send some query to ns3 so that it records something in its dnstap file.
-    ans = resolver.resolve("mail.example.", "A")
-    assert ans[0].address == "10.0.0.2"
+    msg = dns.message.make_query("mail.example.", "A")
+    res = isctest.query.tcp(msg, "10.53.0.2")
+    assert res.answer == [
+        dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2")
+    ]
 
     # Before continuing, roll dnstap file to ensure it is flushed to disk.
     run_rndc("10.53.0.3", ["dnstap", "-roll", "1"])
index ab5da45973b1eff33321e6f00b1f3b2913c8ebc9..25cd1f44f8c915d4384383cd8d658a41118faa64 100644 (file)
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-import time
 import os
-
 import pytest
 
 pytest.importorskip("dns", minversion="2.0.0")
-import dns.resolver
+import isctest
 
+import dns.message
 
-def wait_for_transfer(ip, port, client_ip, name, rrtype):
-    resolver = dns.resolver.Resolver()
-    resolver.nameservers = [ip]
-    resolver.port = port
 
+def wait_for_transfer(ip, port, client_ip, name, rrtype):
+    msg = dns.message.make_query(name, rrtype)
     for _ in range(10):
         try:
-            resolver.resolve(name, rrtype, source=client_ip)
-        except dns.resolver.NoNameservers:
-            time.sleep(1)
-        else:
-            break
+            res = isctest.query.udp(msg, ip, source=client_ip)
+            if res.rcode() == dns.rcode.NOERROR:
+                break
+        except dns.exception.Timeout:
+            pass
     else:
         raise RuntimeError(
             "zone transfer failed: "
-            f"client {client_ip} got NXDOMAIN for {name} {rrtype} from @{ip}:{port}"
+            f"client: {client_ip}, name: {name}, rrtype: {rrtype} from @{ip}:{port}"
         )
 
 
-def test_rpz_multiple_views(named_port):
-    resolver = dns.resolver.Resolver()
-    resolver.nameservers = ["10.53.0.3"]
-    resolver.port = named_port
-
+@pytest.mark.parametrize(
+    "qname,source,rcode",
+    [
+        # For 10.53.0.1 source IP:
+        # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
+        # - gooddomain.com is allowed
+        # - allowed. is allowed
+        ("baddomain.", "10.53.0.1", dns.rcode.NXDOMAIN),
+        ("gooddomain.", "10.53.0.1", dns.rcode.NOERROR),
+        ("allowed.", "10.53.0.1", dns.rcode.NOERROR),
+        # For 10.53.0.2 source IP:
+        # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
+        # - baddomain.com is allowed
+        # - gooddomain.com is allowed
+        ("baddomain.", "10.53.0.2", dns.rcode.NOERROR),
+        ("gooddomain.", "10.53.0.2", dns.rcode.NOERROR),
+        ("allowed.", "10.53.0.2", dns.rcode.NXDOMAIN),
+        # For 10.53.0.3 source IP:
+        # - gooddomain.com is allowed
+        # - baddomain.com is allowed
+        # - allowed. is allowed
+        ("baddomain.", "10.53.0.3", dns.rcode.NOERROR),
+        ("gooddomain.", "10.53.0.3", dns.rcode.NOERROR),
+        ("allowed.", "10.53.0.3", dns.rcode.NOERROR),
+        # For 10.53.0.4 source IP:
+        # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
+        # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
+        # - allowed. is allowed
+        ("baddomain.", "10.53.0.4", dns.rcode.NXDOMAIN),
+        ("gooddomain.", "10.53.0.4", dns.rcode.NXDOMAIN),
+        ("allowed.", "10.53.0.4", dns.rcode.NOERROR),
+        # For 10.53.0.5 (any) source IP:
+        # - baddomain.com is allowed
+        # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
+        # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
+        ("baddomain.", "10.53.0.5", dns.rcode.NOERROR),
+        ("gooddomain.", "10.53.0.5", dns.rcode.NXDOMAIN),
+        ("allowed.", "10.53.0.5", dns.rcode.NXDOMAIN),
+    ],
+)
+def test_rpz_multiple_views(qname, source, rcode, named_port):
     wait_for_transfer("10.53.0.3", named_port, "10.53.0.2", "rpz-external.local", "SOA")
     wait_for_transfer("10.53.0.3", named_port, "10.53.0.5", "rpz-external.local", "SOA")
 
-    # For 10.53.0.1 source IP:
-    # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
-    # - gooddomain.com is allowed
-    # - allowed. is allowed
-    with pytest.raises(dns.resolver.NXDOMAIN):
-        resolver.resolve("baddomain.", "A", source="10.53.0.1")
-
-    ans = resolver.resolve("gooddomain.", "A", source="10.53.0.1")
-    assert ans[0].address == "10.53.0.2"
-
-    ans = resolver.resolve("allowed.", "A", source="10.53.0.1")
-    assert ans[0].address == "10.53.0.2"
-
-    # For 10.53.0.2 source IP:
-    # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
-    # - baddomain.com is allowed
-    # - gooddomain.com is allowed
-    ans = resolver.resolve("baddomain.", "A", source="10.53.0.2")
-    assert ans[0].address == "10.53.0.2"
-
-    ans = resolver.resolve("gooddomain.", "A", source="10.53.0.2")
-    assert ans[0].address == "10.53.0.2"
-
-    with pytest.raises(dns.resolver.NXDOMAIN):
-        resolver.resolve("allowed.", "A", source="10.53.0.2")
-
-    # For 10.53.0.3 source IP:
-    # - gooddomain.com is allowed
-    # - baddomain.com is allowed
-    # - allowed. is allowed
-    ans = resolver.resolve("baddomain.", "A", source="10.53.0.3")
-    assert ans[0].address == "10.53.0.2"
-
-    ans = resolver.resolve("gooddomain.", "A", source="10.53.0.3")
-    assert ans[0].address == "10.53.0.2"
-
-    ans = resolver.resolve("allowed.", "A", source="10.53.0.3")
-    assert ans[0].address == "10.53.0.2"
-
-    # For 10.53.0.4 source IP:
-    # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
-    # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
-    # - allowed. is allowed
-    with pytest.raises(dns.resolver.NXDOMAIN):
-        resolver.resolve("baddomain.", "A", source="10.53.0.4")
-
-    with pytest.raises(dns.resolver.NXDOMAIN):
-        resolver.resolve("gooddomain.", "A", source="10.53.0.4")
-
-    ans = resolver.resolve("allowed.", "A", source="10.53.0.4")
-    assert ans[0].address == "10.53.0.2"
-
-    # For 10.53.0.5 (any) source IP:
-    # - baddomain.com is allowed
-    # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
-    # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
-    ans = resolver.resolve("baddomain.", "A", source="10.53.0.5")
-    assert ans[0].address == "10.53.0.2"
-
-    with pytest.raises(dns.resolver.NXDOMAIN):
-        resolver.resolve("gooddomain.", "A", source="10.53.0.5")
-
-    with pytest.raises(dns.resolver.NXDOMAIN):
-        resolver.resolve("allowed.", "A", source="10.53.0.5")
+    msg = dns.message.make_query(qname, "A")
+    res = isctest.query.udp(msg, "10.53.0.3", source=source)
+    assert res.rcode() == rcode
+    if rcode == dns.rcode.NOERROR:
+        assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")]
 
 
-def test_rpz_passthru_logging(named_port):
-    resolver = dns.resolver.Resolver()
-    resolver.nameservers = ["10.53.0.3"]
-    resolver.port = named_port
+def test_rpz_passthru_logging():
+    resolver_ip = "10.53.0.3"
 
     # Should generate a log entry into rpz_passthru.txt
-    ans = resolver.resolve("allowed.", "A", source="10.53.0.1")
-    assert ans[0].address == "10.53.0.2"
+    msg_allowed = dns.message.make_query("allowed.", "A")
+    res_allowed = isctest.query.udp(msg_allowed, resolver_ip, source="10.53.0.1")
+    assert res_allowed.answer == [
+        dns.rrset.from_text("allowed.", 300, "IN", "A", "10.53.0.2")
+    ]
 
     # baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
     # Should generate a log entry into rpz.txt
-    with pytest.raises(dns.resolver.NXDOMAIN):
-        resolver.resolve("baddomain.", "A", source="10.53.0.1")
+    msg_not_allowed = dns.message.make_query("baddomain.", "A")
+    res_not_allowed = isctest.query.udp(
+        msg_not_allowed, resolver_ip, source="10.53.0.1"
+    )
+    isctest.check.nxdomain(res_not_allowed)
 
     rpz_passthru_logfile = os.path.join("ns3", "rpz_passthru.txt")
     rpz_logfile = os.path.join("ns3", "rpz.txt")
index a993cc9ffa8c65b52d33399a9f8bb48364a377a0..3b64389a627a66ffc5902e515a5f8dcd69d7d36b 100755 (executable)
@@ -23,12 +23,11 @@ import pytest
 
 pytest.importorskip("dns", minversion="2.0.0")
 import dns.exception
-import dns.resolver
 
 import isctest
 
 
-def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
+def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries):
     """Creates a number of A queries to run in parallel
     in order simulate a slightly more realistic test scenario.
 
@@ -47,8 +46,8 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
     :param named_proc: named process instance
     :type named_proc: subprocess.Popen
 
-    :param resolver: target resolver
-    :type resolver: dns.resolver.Resolver
+    :param resolver_ip: target resolver's IP address
+    :type resolver_ip: str
 
     :param instance: the named instance to send RNDC commands to
     :type instance: isctest.instance.NamedInstance
@@ -74,7 +73,7 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
             return -1
 
     # We're going to execute queries in parallel by means of a thread pool.
-    # dnspython functions block, so we need to circunvent that.
+    # dnspython functions block, so we need to circumvent that.
     with ThreadPoolExecutor(n_workers + 1) as executor:
         # Helper dict, where keys=Future objects and values are tags used
         # to process results later.
@@ -83,7 +82,7 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
         # 50% of work will be A queries.
         # 1 work will be rndc stop.
         # Remaining work will be rndc status (so we test parallel control
-        #  connections that were crashing named).
+        # connections that were crashing named).
         shutdown = True
         for i in range(n_queries):
             if i < (n_queries // 2):
@@ -101,7 +100,8 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
                     )
 
                 qname = relname + ".test"
-                futures[executor.submit(resolver.resolve, qname, "A")] = tag
+                msg = dns.message.make_query(qname, "A")
+                futures[executor.submit(isctest.query.udp, msg, resolver_ip)] = tag
             elif shutdown:  # We attempt to stop named in the middle
                 shutdown = False
                 if kill_method == "rndc":
@@ -125,24 +125,21 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
                 # named process exited gracefully after SIGTERM signal.
                 if futures[future] == "stop":
                     ret_code = result
-
-            except (
-                dns.resolver.NXDOMAIN,
-                dns.resolver.NoNameservers,
-                dns.exception.Timeout,
-            ):
+            except dns.exception.Timeout:
                 pass
 
         if kill_method == "rndc":
             assert ret_code == 0
 
 
-def wait_for_named_loaded(resolver, retries=10):
+def wait_for_named_loaded(resolver_ip, retries=10):
+    msg = dns.message.make_query("version.bind", "TXT", "CH")
     for _ in range(retries):
         try:
-            resolver.resolve("version.bind", "TXT", "CH")
-            return True
-        except (dns.resolver.NoNameservers, dns.exception.Timeout):
+            res = isctest.query.udp(msg, resolver_ip)
+            if res.rcode() == dns.rcode.NOERROR:
+                return True
+        except dns.exception.Timeout:
             time.sleep(1)
     return False
 
@@ -189,11 +186,7 @@ def test_named_shutdown(kill_method):
     named_ports = isctest.instance.NamedPorts.from_env()
     instance = isctest.instance.NamedInstance("ns3", named_ports)
 
-    # We create a resolver instance that will be used to send queries.
-    resolver = dns.resolver.Resolver()
-    resolver.nameservers = ["10.53.0.3"]
-    resolver.port = named_ports.dns
-
+    resolver_ip = "10.53.0.3"
     named_cmdline = [named, "-c", cfg_file, "-d", "99", "-g"]
     with open(os.path.join(cfg_dir, "named.run"), "ab") as named_log:
         with subprocess.Popen(
@@ -201,10 +194,10 @@ def test_named_shutdown(kill_method):
         ) as named_proc:
             try:
                 assert named_proc.poll() is None, "named isn't running"
-                assert wait_for_named_loaded(resolver)
+                assert wait_for_named_loaded(resolver_ip)
                 do_work(
                     named_proc,
-                    resolver,
+                    resolver_ip,
                     instance,
                     kill_method,
                     n_workers=12,