]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Replace dns.query module with isctest.query
authorMichal Nowak <mnowak@isc.org>
Fri, 27 Sep 2024 11:35:56 +0000 (13:35 +0200)
committerMichal Nowak <mnowak@isc.org>
Tue, 1 Oct 2024 11:25:56 +0000 (13:25 +0200)
bin/tests/system/checkds/tests_checkds.py
bin/tests/system/dispatch/tests_connreset.py
bin/tests/system/hooks/tests_async_plugin.py
bin/tests/system/rndc/tests_cve-2023-3341.py
bin/tests/system/stress/tests_stress_update.py
bin/tests/system/tsiggss/tests_isc_spnego_flaws.py
bin/tests/system/ttl/tests_cache_ttl.py

index 0af90c2957ee5285c5c11cdcbd53df2db10b5ca6..3fd30e1c0b89893e6f6784d58c3842a69b7b97ff 100755 (executable)
@@ -24,7 +24,6 @@ pytest.importorskip("dns", minversion="2.0.0")
 import dns.exception
 import dns.message
 import dns.name
-import dns.query
 import dns.rcode
 import dns.rdataclass
 import dns.rdatatype
@@ -61,16 +60,9 @@ def has_signed_apex_nsec(zone, response):
 
 
 def do_query(server, qname, qtype, tcp=False):
-    query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
-    try:
-        if tcp:
-            response = dns.query.tcp(query, server.ip, timeout=3, port=server.ports.dns)
-        else:
-            response = dns.query.udp(query, server.ip, timeout=3, port=server.ports.dns)
-    except dns.exception.Timeout:
-        print(f"error: query timeout for query {qname} {qtype} to {server.ip}")
-        return None
-
+    msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
+    query_func = isctest.query.tcp if tcp else isctest.query.udp
+    response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
     return response
 
 
@@ -97,38 +89,26 @@ def verify_zone(zone, transfer):
 
 
 def read_statefile(server, zone):
-    addr = server.ip
     count = 0
     keyid = 0
     state = {}
 
     response = do_query(server, zone, "DS", tcp=True)
-    if not isinstance(response, dns.message.Message):
-        print(f"error: no response for {zone} DS from {addr}")
-        return {}
-
-    if response.rcode() == dns.rcode.NOERROR:
-        # fetch key id from response.
-        for rr in response.answer:
-            if rr.match(
-                dns.name.from_text(zone),
-                dns.rdataclass.IN,
-                dns.rdatatype.DS,
-                dns.rdatatype.NONE,
-            ):
-                if count == 0:
-                    keyid = list(dict(rr.items).items())[0][0].key_tag
-                count += 1
-
-        if count != 1:
-            print(
-                f"error: expected a single DS in response for {zone} from {addr}, got {count}"
-            )
-            return {}
-    else:
-        rcode = dns.rcode.to_text(response.rcode())
-        print(f"error: {rcode} response for {zone} DNSKEY from {addr}")
-        return {}
+    # fetch key id from response.
+    for rr in response.answer:
+        if rr.match(
+            dns.name.from_text(zone),
+            dns.rdataclass.IN,
+            dns.rdatatype.DS,
+            dns.rdatatype.NONE,
+        ):
+            if count == 0:
+                keyid = list(dict(rr.items).items())[0][0].key_tag
+            count += 1
+
+    assert (
+        count == 1
+    ), f"expected a single DS in response for {zone} from {server.ip}, got {count}"
 
     filename = f"ns9/K{zone}+013+{keyid:05d}.state"
     print(f"read state file {filename}")
@@ -140,7 +120,6 @@ def read_statefile(server, zone):
                     continue
                 key, val = line.strip().split(":", 1)
                 state[key.strip()] = val.strip()
-
     except FileNotFoundError:
         # file may not be written just yet.
         return {}
@@ -149,40 +128,15 @@ def read_statefile(server, zone):
 
 
 def zone_check(server, zone):
-    addr = server.ip
     fqdn = f"{zone}."
 
-    # wait until zone is fully signed.
-    signed = False
-    for _ in range(10):
-        response = do_query(server, fqdn, "NSEC")
-        if not isinstance(response, dns.message.Message):
-            print(f"error: no response for {fqdn} NSEC from {addr}")
-        elif response.rcode() == dns.rcode.NOERROR:
-            signed = has_signed_apex_nsec(fqdn, response)
-        else:
-            rcode = dns.rcode.to_text(response.rcode())
-            print(f"error: {rcode} response for {fqdn} NSEC from {addr}")
-
-        if signed:
-            break
-
-        time.sleep(1)
-
-    assert signed
+    # check zone is fully signed.
+    response = do_query(server, fqdn, "NSEC")
+    assert has_signed_apex_nsec(fqdn, response)
 
     # check if zone if DNSSEC valid.
-    verified = False
     transfer = do_query(server, fqdn, "AXFR", tcp=True)
-    if not isinstance(transfer, dns.message.Message):
-        print(f"error: no response for {fqdn} AXFR from {addr}")
-    elif transfer.rcode() == dns.rcode.NOERROR:
-        verified = verify_zone(fqdn, transfer)
-    else:
-        rcode = dns.rcode.to_text(transfer.rcode())
-        print(f"error: {rcode} response for {fqdn} AXFR from {addr}")
-
-    assert verified
+    assert verify_zone(fqdn, transfer)
 
 
 def keystate_check(server, zone, key):
index f74bfd719aca48842b37467856dcc6eb37d18b57..5dbab1e820776a51cb0118d9f52cb3d839b80f26 100644 (file)
 # information regarding copyright ownership.
 
 import pytest
+import isctest
 
 pytest.importorskip("dns")
 import dns.message
-import dns.query
-import dns.rcode
 
 
-def test_connreset(named_port):
+def test_connreset():
     msg = dns.message.make_query(
         "sub.example.", "A", want_dnssec=True, use_edns=0, payload=1232
     )
-    ans = dns.query.udp(msg, "10.53.0.2", timeout=10, port=named_port)
-    assert ans.rcode() == dns.rcode.SERVFAIL
+    res = isctest.query.udp(msg, "10.53.0.2")
+    isctest.check.servfail(res)
index 2f42e273790cab14fe3d1e71ccba7493ba6ef07f..ac89c85ac00d2cd7286ffd0a9274a8b694ee4a25 100644 (file)
 # information regarding copyright ownership.
 
 import pytest
+import isctest
 
 pytest.importorskip("dns")
 import dns.message
-import dns.query
-import dns.rcode
 
 
-def test_async_hook(named_port):
-    msg = dns.message.make_query(
-        "example.com.",
-        "A",
-    )
-    ans = dns.query.udp(msg, "10.53.0.1", timeout=10, port=named_port)
+def test_async_hook():
+    msg = dns.message.make_query("example.com.", "A")
+    res = isctest.query.udp(msg, "10.53.0.1")
     # the test-async plugin changes the status of any positive answer to NOTIMP
-    assert ans.rcode() == dns.rcode.NOTIMP
+    isctest.check.notimp(res)
index de2991b386c9f572bd29e38cc6473e2b5ea5928f..3860ec602f62d59efcf8c765cf911a395efebb7c 100644 (file)
@@ -15,14 +15,13 @@ import socket
 import time
 
 import pytest
+import isctest
 
 pytest.importorskip("dns")
 import dns.message
-import dns.query
-import dns.rcode
 
 
-def test_cve_2023_3341(named_port, control_port):
+def test_cve_2023_3341(control_port):
     depth = 4500
     # Should not be more than isccc_ccmsg_setmaxsize(&conn->ccmsg, 32768)
     total_len = 10 + (depth * 7) - 6
@@ -52,6 +51,7 @@ def test_cve_2023_3341(named_port, control_port):
 
     # Wait for named to (possibly) crash
     time.sleep(10)
+
     msg = dns.message.make_query("version.bind", "TXT", "CH")
-    ans = dns.query.udp(msg, "10.53.0.2", timeout=10, port=named_port)
-    assert ans.rcode() == dns.rcode.NOERROR
+    res = isctest.query.udp(msg, "10.53.0.2")
+    isctest.check.noerror(res)
index 4c6f1f36942a2af8279a89430b5bf1439f9f0706..f621da7c7b304b446537412e9bede82339cdafac 100644 (file)
@@ -13,11 +13,10 @@ import concurrent.futures
 import os
 import time
 
-import dns.query
-import dns.update
-
 import isctest
 
+import dns.update
+
 
 def rndc_loop(test_state, server):
     rndc = os.getenv("RNDC")
@@ -39,7 +38,7 @@ def rndc_loop(test_state, server):
         time.sleep(1)
 
 
-def update_zone(test_state, zone, named_port):
+def update_zone(test_state, zone):
     server = "10.53.0.2"
     for i in range(1000):
         if test_state["finished"]:
@@ -47,7 +46,7 @@ def update_zone(test_state, zone, named_port):
         update = dns.update.UpdateMessage(zone)
         update.add(f"dynamic-{i}.{zone}", 300, "TXT", f"txt-{i}")
         try:
-            response = dns.query.udp(update, server, 10, named_port)
+            response = isctest.query.udp(update, server)
             assert response.rcode() == dns.rcode.NOERROR
         except dns.exception.Timeout:
             isctest.log.info(f"error: query timeout for {zone}")
@@ -56,7 +55,7 @@ def update_zone(test_state, zone, named_port):
 
 
 # If the test has run to completion without named crashing, it has succeeded.
-def test_update_stress(named_port):
+def test_update_stress():
     test_state = {"finished": False}
 
     with concurrent.futures.ThreadPoolExecutor() as executor:
@@ -65,7 +64,7 @@ def test_update_stress(named_port):
         updaters = []
         for i in range(5):
             zone = f"zone00000{i}.example."
-            updaters.append(executor.submit(update_zone, test_state, zone, named_port))
+            updaters.append(executor.submit(update_zone, test_state, zone))
 
         # All the update_zone() tasks are expected to complete within 5
         # minutes.  If they do not, we cannot assert immediately as that will
index 6340b5abf88f8e448376aa599e985d4ac32bc8cd..b474fc1ab879292b84b612d8c004cd37d7003cfa 100755 (executable)
@@ -23,10 +23,11 @@ import time
 
 import pytest
 
+import isctest
+
 pytest.importorskip("dns")
 import dns.message
 import dns.name
-import dns.query
 import dns.rdata
 import dns.rdataclass
 import dns.rdatatype
@@ -177,13 +178,13 @@ def send_crafted_tkey_query(opts: argparse.Namespace) -> None:
     print(query.to_text())
     print()
 
-    response = dns.query.tcp(query, opts.server_ip, timeout=2, port=opts.server_port)
+    response = isctest.query.tcp(query, opts.server_ip, timeout=2)
     print("# < " + str(datetime.datetime.now()))
     print(response.to_text())
     print()
 
 
-def test_cve_2020_8625(named_port):
+def test_cve_2020_8625():
     """
     Reproducer for CVE-2020-8625.  When run for an affected BIND 9 version,
     send_crafted_tkey_query() will raise a network-related exception due to
@@ -192,14 +193,13 @@ def test_cve_2020_8625(named_port):
     for i in range(0, 50):
         opts = argparse.Namespace(
             server_ip="10.53.0.1",
-            server_port=named_port,
             real_oid_length=i,
             extra_oid_length=0,
         )
         send_crafted_tkey_query(opts)
 
 
-def test_cve_2021_25216(named_port):
+def test_cve_2021_25216():
     """
     Reproducer for CVE-2021-25216.  When run for an affected BIND 9 version,
     send_crafted_tkey_query() will raise a network-related exception due to
@@ -207,7 +207,6 @@ def test_cve_2021_25216(named_port):
     """
     opts = argparse.Namespace(
         server_ip="10.53.0.1",
-        server_port=named_port,
         real_oid_length=1,
         extra_oid_length=1073741824,
     )
index 9025283bd2efd8b107032e815299599ffac31200..631c907c7ebd3c31f78571f36ad188cfb2be41e7 100644 (file)
 
 import pytest
 
+import isctest
+
 pytest.importorskip("dns")
 import dns.message
-import dns.query
 
 
 @pytest.mark.parametrize(
@@ -25,8 +26,8 @@ import dns.query
         ("max-example.", "MX", 60),
     ],
 )
-def test_cache_ttl(qname, rdtype, expected_ttl, named_port):
+def test_cache_ttl(qname, rdtype, expected_ttl):
     msg = dns.message.make_query(qname, rdtype)
-    response = dns.query.udp(msg, "10.53.0.2", timeout=10, port=named_port)
+    response = isctest.query.udp(msg, "10.53.0.2")
     for rr in response.answer + response.authority:
         assert rr.ttl == expected_ttl