]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Use isctest.query.create across system tests
authorNicki Křížek <nicki@isc.org>
Mon, 14 Jul 2025 12:58:51 +0000 (14:58 +0200)
committerEvan Hunt <each@isc.org>
Tue, 29 Jul 2025 19:13:11 +0000 (12:13 -0700)
Rather than using the dnspython's facilities and defaults to create the
queries, use the isctest.query.create function in all the cases that
don't require special handling to have consistent defaults.

27 files changed:
bin/tests/system/checkds/tests_checkds.py
bin/tests/system/cipher-suites/tests_cipher_suites.py
bin/tests/system/database/tests_database.py
bin/tests/system/dnstap/tests_dnstap.py
bin/tests/system/doth/tests_gnutls.py
bin/tests/system/dsdigest/tests_dsdigest.py
bin/tests/system/ecdsa/tests_ecdsa.py
bin/tests/system/emptyzones/tests_emptyzones.py
bin/tests/system/glue/tests_glue.py
bin/tests/system/hooks/tests_hooks.py
bin/tests/system/include-multiplecfg/tests_include_multiplecfg.py
bin/tests/system/isctest/check.py
bin/tests/system/isctest/kasp.py
bin/tests/system/kasp/tests_kasp.py
bin/tests/system/limits/tests_limits.py
bin/tests/system/masterfile/tests_masterfile.py
bin/tests/system/names/tests_names.py
bin/tests/system/nzd2nzf/tests_nzd2nzf.py
bin/tests/system/query-source/tests_querysource_none.py
bin/tests/system/rndc/tests_cve-2023-3341.py
bin/tests/system/rpzextra/tests_rpzextra.py
bin/tests/system/shutdown/tests_shutdown.py
bin/tests/system/stub/tests_stub.py
bin/tests/system/tsiggss/tests_isc_spnego_flaws.py
bin/tests/system/ttl/tests_cache_ttl.py
bin/tests/system/wildcard/tests_wildcard.py
bin/tests/system/xferquota/tests_xferquota.py

index edc7d13abd96f375db72c830d8c3ecfa4858abc3..5e859a44c241f8324b70084e895dc94505a66108 100755 (executable)
@@ -81,7 +81,7 @@ def has_signed_apex_nsec(zone, response):
 
 
 def do_query(server, qname, qtype, tcp=False):
-    msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
+    msg = isctest.query.create(qname, qtype)
     query_func = isctest.query.tcp if tcp else isctest.query.udp
     response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
     return response
index 1be3aafbbcb1ffa62f73c591d2f77786fe046561..edc3c09598a0314a18b8153c6fa4cd44231460d4 100644 (file)
@@ -84,7 +84,7 @@ def transfers_complete(servers):
 )
 # pylint: disable=redefined-outer-name,unused-argument
 def test_cipher_suites_tls_xfer(qname, ns, rcode, transfers_complete):
-    msg = dns.message.make_query(qname, "AXFR")
+    msg = isctest.query.create(qname, "AXFR")
     ans = isctest.query.tls(msg, f"10.53.0.{ns}")
     assert ans.rcode() == rcode
     if rcode == dns.rcode.NOERROR:
index 0e4fd9be796d016c1cb679c33d61b757c2fb9362..89f818e9aa1ca0c4813ee863b9572cadb4d25a73 100644 (file)
@@ -9,13 +9,13 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-import isctest
+import dns
 
-import dns.message
+import isctest
 
 
 def test_database(ns1, templates):
-    msg = dns.message.make_query("database.", "SOA")
+    msg = isctest.query.create("database.", "SOA")
 
     # checking pre reload zone
     res = isctest.query.tcp(msg, "10.53.0.1")
index e804eaa9ffd3096f618ee42eaef9e9f6f4ed099b..ba3c39c709ee73e08469bbf4fd591eea745b444d 100644 (file)
@@ -18,9 +18,8 @@ import isctest
 import isctest.mark
 import pytest
 
-import dns.message
-
 pytest.importorskip("dns", minversion="2.0.0")
+import dns.rrset
 
 pytestmark = [
     isctest.mark.with_dnstap,
@@ -49,7 +48,7 @@ def run_rndc(server, rndc_command):
 
 def test_dnstap_dispatch_socket_addresses():
     # Send some query to ns3 so that it records something in its dnstap file.
-    msg = dns.message.make_query("mail.example.", "A")
+    msg = isctest.query.create("mail.example.", "A")
     res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR)
     assert res.answer == [
         dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2")
index f49b4401c5109c9f41dc740a016f61b22e9e0a2a..9c897714efe473f92bbbe7ca49fe89a8e613e184 100644 (file)
@@ -20,11 +20,12 @@ import pytest
 
 pytest.importorskip("dns")
 import dns.exception
-import dns.message
 import dns.name
 import dns.rdataclass
 import dns.rdatatype
 
+import isctest
+
 pytestmark = pytest.mark.extra_artifacts(
     [
         "gnutls-cli.*",
@@ -35,7 +36,7 @@ pytestmark = pytest.mark.extra_artifacts(
 
 def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport):
     # Prepare the example/SOA query which will be sent over TLS.
-    query = dns.message.make_query("example.", dns.rdatatype.SOA)
+    query = isctest.query.create("example.", dns.rdatatype.SOA)
     query_wire = query.to_wire()
     query_with_length = struct.pack(">H", len(query_wire)) + query_wire
 
index 853c49cba5be53e03e46ece92fa4f7b3b36d7770..d2074b63da4dfee053d71f0e4cd0ff9c60cb483b 100644 (file)
@@ -9,7 +9,7 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-import dns.message
+import dns.flags
 import pytest
 
 import isctest
@@ -29,7 +29,7 @@ pytestmark = pytest.mark.extra_artifacts(
 
 def test_dsdigest_good():
     """Check that validation with enabled digest types works"""
-    msg = dns.message.make_query("a.good.", "A", want_dnssec=True)
+    msg = isctest.query.create("a.good.", "A")
     res = isctest.query.tcp(
         msg,
         "10.53.0.3",
@@ -40,7 +40,7 @@ def test_dsdigest_good():
 
 def test_dsdigest_insecure():
     """Check that validation with not supported digest algorithms is insecure"""
-    msg_ds = dns.message.make_query("bad.", "DS", want_dnssec=True)
+    msg_ds = isctest.query.create("bad.", "DS")
     res_ds = isctest.query.tcp(
         msg_ds,
         "10.53.0.4",
@@ -48,7 +48,7 @@ def test_dsdigest_insecure():
     isctest.check.noerror(res_ds)
     assert res_ds.flags & dns.flags.AD
 
-    msg_a = dns.message.make_query("a.bad.", "A", want_dnssec=True)
+    msg_a = isctest.query.create("a.bad.", "A")
     res_a = isctest.query.tcp(
         msg_a,
         "10.53.0.4",
index 1329fa710ed7e69dec03594764e2e546d3bebcb8..e0379f79f160400e48ddb58120e0db100c32dc5a 100644 (file)
@@ -12,7 +12,8 @@
 import os
 import pytest
 
-import dns.message
+import dns.flags
+
 import isctest
 
 
@@ -29,8 +30,7 @@ pytestmark = pytest.mark.extra_artifacts(
 
 
 def check_server_soa(resolver):
-    msg = dns.message.make_query(".", "SOA")
-    msg.flags += dns.flags.AD
+    msg = isctest.query.create(".", "SOA")
     res1 = isctest.query.tcp(msg, "10.53.0.1")
     res2 = isctest.query.tcp(msg, resolver)
     isctest.check.rrsets_equal(res1.answer, res2.answer)
index c47813939538c14250cafe046aa7c4f5a289392d..7eadc1bf54ff39d0938193675efdc441a2dc0c36 100644 (file)
@@ -9,8 +9,6 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-import dns.message
-
 import isctest
 
 
@@ -21,11 +19,11 @@ def test_emptyzones(ns1, templates):
         watcher.wait_for_line("all zones loaded")
     templates.render("ns1/named.conf", {"automatic_empty_zones": True})
     ns1.rndc("reload")
-    msg = dns.message.make_query("version.bind", "TXT", "CH")
+    msg = isctest.query.create("version.bind", "TXT", "CH")
     res = isctest.query.tcp(msg, "10.53.0.1")
     isctest.check.noerror(res)
 
     # check that allow-transfer { none; } works
-    msg = dns.message.make_query("10.in-addr.arpa", "AXFR")
+    msg = isctest.query.create("10.in-addr.arpa", "AXFR")
     res = isctest.query.tcp(msg, "10.53.0.1")
     isctest.check.refused(res)
index f346998dc078bf08a0ea6057a461ebe9ab841471..517781c7e22adb5c7bed2f9c7b9876c65e556f23 100644 (file)
@@ -10,6 +10,7 @@
 # information regarding copyright ownership.
 
 
+import dns.flags
 import dns.message
 import pytest
 
@@ -29,7 +30,7 @@ pytestmark = pytest.mark.extra_artifacts(
 
 def test_glue_full_glue_set():
     """test that a ccTLD referral gets a full glue set from the root zone"""
-    msg = dns.message.make_query("foo.bar.fi", "A")
+    msg = isctest.query.create("foo.bar.fi", "A")
     msg.flags &= ~dns.flags.RD
     res = isctest.query.udp(msg, "10.53.0.1")
 
@@ -60,7 +61,7 @@ NS.UU.NET. 172800 IN A 137.39.1.3
 
 def test_glue_no_glue_set():
     """test that out-of-zone glue is not found"""
-    msg = dns.message.make_query("example.net.", "A")
+    msg = isctest.query.create("example.net.", "A")
     msg.flags &= ~dns.flags.RD
     res = isctest.query.udp(msg, "10.53.0.1")
 
index b5391ee46a155e146a927c2a00acfa17ed3b29e1..5de435825921c5b58ace89b09d443f0e23c3de65 100644 (file)
@@ -9,15 +9,11 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-import pytest
 import isctest
 
-pytest.importorskip("dns")
-import dns.message
-
 
 def test_hooks():
-    msg = dns.message.make_query("example.com.", "A")
+    msg = isctest.query.create("example.com.", "A")
     res = isctest.query.udp(msg, "10.53.0.1")
     # the test-async plugin changes the status of any positive answer to NOTIMP
     isctest.check.notimp(res)
index 346f33453eef159a6bcb39737dca223263722f41..dffc917c9de010c6ca893be309d8839b681c1ba1 100644 (file)
 
 import os
 
-import isctest
+import dns.rrset
 import pytest
 
-import dns.message
+import isctest
 
 
 @pytest.mark.parametrize(
@@ -26,7 +26,7 @@ import dns.message
     ],
 )
 def test_include_multiplecfg(qname):
-    msg = dns.message.make_query(qname, "A")
+    msg = isctest.query.create(qname, "A")
     res = isctest.query.tcp(msg, "10.53.0.2")
 
     isctest.check.noerror(res)
index b09e895ef11f6ae8d72b7a3497462e31be777bd8..b705bfee213a6cb280aaf510898555858339522e 100644 (file)
@@ -15,8 +15,8 @@ import os
 from typing import Optional
 
 import dns.flags
-import dns.rcode
 import dns.message
+import dns.rcode
 import dns.zone
 
 import isctest.log
@@ -156,7 +156,7 @@ def is_executable(cmd: str, errmsg: str) -> None:
 
 def named_alive(named_proc, resolver_ip):
     assert named_proc.poll() is None, "named isn't running"
-    msg = dns.message.make_query("version.bind", "TXT", "CH")
+    msg = isctest.query.create("version.bind", "TXT", "CH")
     isctest.query.tcp(msg, resolver_ip, expected_rcode=dns_rcode.NOERROR)
 
 
index 9de5c4f09fa3cb70311d9f66ba0e2233907dafe7..094a542f5466c202fd5a070e121f3e6bcf9e62ea 100644 (file)
@@ -22,10 +22,10 @@ from typing import Dict, List, Optional, Tuple, Union
 import dns
 import dns.tsig
 
-from isctest.instance import NamedInstance
 import isctest.log
 import isctest.query
 import isctest.util
+from isctest.instance import NamedInstance
 from isctest.vars.algorithms import Algorithm, ALL_ALGORITHMS_BY_NUM
 
 DEFAULT_TTL = 300
@@ -34,7 +34,7 @@ NEXT_KEY_EVENT_THRESHOLD = 100
 
 
 def _query(server, qname, qtype, tsig=None):
-    query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
+    query = isctest.query.create(qname, qtype)
 
     if tsig is not None:
         tsigkey = tsig.split(":")
index 0c4005d950d49c827831b93f201aad2f9e669925..f43f0dea6f12d1bc260724ae2e4799c08c16c745 100644 (file)
@@ -805,7 +805,7 @@ def test_kasp_inherit_view(number, dynamic, inline_signing, txt_rdata, ns4):
     qname = f"view.{zone}."
     qtype = dns.rdatatype.TXT
     rdata = txt_rdata
-    query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
+    query = isctest.query.create(qname, qtype)
     tsigkey = tsig.split(":")
     keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
     query.use_tsig(keyring)
@@ -1612,7 +1612,7 @@ def test_kasp_reload_restart(ns6):
     def query_soa(qname):
         fqdn = dns.name.from_text(qname)
         qtype = dns.rdatatype.SOA
-        query = dns.message.make_query(fqdn, qtype, use_edns=True, want_dnssec=True)
+        query = isctest.query.create(fqdn, qtype)
         try:
             response = isctest.query.tcp(query, ns6.ip, ns6.ports.dns, timeout=3)
         except dns.exception.Timeout:
index 1fe5ea008772e51e310ab555cc76ea221b607dde..ca7214a9de4f64b1a2f2d5caeb38d2ebd5db2596 100644 (file)
@@ -14,11 +14,10 @@ import itertools
 import isctest
 import pytest
 
-import dns.message
-
 # Everything from getting a big answer to creating an RR set with thousands
 # of records takes minutes of CPU and real time with dnspython < 2.0.0.
 pytest.importorskip("dns", minversion="2.0.0")
+import dns.rrset
 
 
 @pytest.mark.parametrize(
@@ -32,7 +31,7 @@ pytest.importorskip("dns", minversion="2.0.0")
     ],
 )
 def test_limits(name, limit):
-    msg_query = dns.message.make_query(f"{name}.example.", "A")
+    msg_query = isctest.query.create(f"{name}.example.", "A")
     res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False)
 
     iplist = [
@@ -46,7 +45,7 @@ def test_limits(name, limit):
 
 
 def test_limit_exceeded():
-    msg_query = dns.message.make_query("5000.example.", "A")
+    msg_query = isctest.query.create("5000.example.", "A")
     res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False)
 
     assert res.flags & dns.flags.TC, "TC flag was not set"
index 862be26baee6bcc1fa15e43917e143a95c7a1228..9b3d0b114f0a229e54b58110c3498ab0541155bb 100644 (file)
@@ -24,7 +24,7 @@ pytestmark = pytest.mark.extra_artifacts(
 
 def test_masterfile_include_semantics():
     """Test master file $INCLUDE semantics"""
-    msg_axfr = dns.message.make_query("include.", "AXFR")
+    msg_axfr = isctest.query.create("include.", "AXFR")
     res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
     axfr_include_semantics = """;ANSWER
 include.                       300     IN      SOA     ns.include. hostmaster.include. 1 3600 1800 1814400 3600
@@ -45,7 +45,7 @@ ns.include.                   300     IN      A       127.0.0.1
 
 def test_masterfile_bind_8_compat_semantics():
     """Test master file BIND 8 TTL and $TTL semantics compatibility"""
-    msg_axfr = dns.message.make_query("ttl1.", "AXFR")
+    msg_axfr = isctest.query.create("ttl1.", "AXFR")
     res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
     axfr_ttl_semantics = """;ANSWER
 ttl1.          3       IN      SOA     ns.ttl1. hostmaster.ttl1. 1 3600 1800 1814400 3
@@ -64,7 +64,7 @@ ns.ttl1.      3       IN      A       10.53.0.1
 
 def test_masterfile_rfc_1035_semantics():
     """Test master file RFC1035 TTL and $TTL semantics"""
-    msg_axfr = dns.message.make_query("ttl2.", "AXFR")
+    msg_axfr = isctest.query.create("ttl2.", "AXFR")
     res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
     axfr_ttl_semantics = """;ANSWER
 ttl2.          1       IN      SOA     ns.ttl2. hostmaster.ttl2. 1 3600 1800 1814400 3
@@ -83,7 +83,7 @@ ns.ttl2.      1       IN      A       10.53.0.1
 
 def test_masterfile_missing_master_file():
     """Test nameserver running with a missing master file"""
-    msg_soa = dns.message.make_query("example.", "SOA")
+    msg_soa = isctest.query.create("example.", "SOA")
     res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
     expected_soa_rr = """;ANSWER
 example.       300     IN      SOA     mname1. . 2010042407 20 20 1814400 3600
@@ -96,7 +96,7 @@ def test_masterfile_initial_file():
     """Test zone configurations with initial template files"""
     # example inherited its configuration from the template,
     # make sure it works
-    msg_soa = dns.message.make_query("example.", "SOA")
+    msg_soa = isctest.query.create("example.", "SOA")
     res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
     expected_soa_rr = """;ANSWER
 example.       300     IN      SOA     mname1. . 2010042407 20 20 1814400 3600
@@ -107,7 +107,7 @@ example.    300     IN      SOA     mname1. . 2010042407 20 20 1814400 3600
     # initial uses an initial-file option with the "file"
     # option set to "copied.db". make sure it works and that
     # copied.db has been populated.
-    msg_soa = dns.message.make_query("initial.", "SOA")
+    msg_soa = isctest.query.create("initial.", "SOA")
     res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
     expected_soa_rr = """;ANSWER
 initial.       300     IN      SOA     mname1. . 2010042407 20 20 1814400 3600
@@ -119,7 +119,7 @@ initial.    300     IN      SOA     mname1. . 2010042407 20 20 1814400 3600
     # present uses an initial-file option, but the file 'present.db'
     # already exists and is empty, so the initial-file should not be
     # copied into place and the zone should not load.
-    msg_soa = dns.message.make_query("present.", "SOA")
+    msg_soa = isctest.query.create("present.", "SOA")
     res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
     isctest.check.servfail(res_soa)
     isctest.check.file_empty("ns2/present.db")
@@ -129,7 +129,7 @@ def test_masterfile_template_override():
     """Test zone configurations with overridden template options"""
     # different inherited configuration from the template, but
     # overrides the "file" option to 'alternate.db'.
-    msg_soa = dns.message.make_query("different.", "SOA")
+    msg_soa = isctest.query.create("different.", "SOA")
     res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
     expected_soa_rr = """;ANSWER
 different.     300     IN      SOA     mname1. . 2010042407 20 20 1814400 3600
@@ -142,7 +142,7 @@ different.  300     IN      SOA     mname1. . 2010042407 20 20 1814400 3600
 
 def test_masterfile_missing_master_file_servfail():
     """Test nameserver returning SERVFAIL for a missing master file"""
-    msg_soa = dns.message.make_query("missing.", "SOA")
+    msg_soa = isctest.query.create("missing.", "SOA")
     res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
     isctest.check.servfail(res_soa)
 
index e4fc296277fc17e3eda379efb0e7d7f0450e3603..b72fcbec980fe1e1ac73791d40c825eab95a2039 100644 (file)
@@ -13,8 +13,6 @@ import pytest
 
 pytest.importorskip("dns", minversion="2.7.0")
 
-
-import dns.message
 import isctest
 
 
@@ -22,7 +20,7 @@ import isctest
 # about twice as large as the answer with compression enabled, while
 # maintaining identical content.
 def test_names():
-    msg = dns.message.make_query("example.", "MX")
+    msg = isctest.query.create("example.", "MX")
     # Getting message size with compression enabled
     res_enabled = isctest.query.tcp(msg, ip="10.53.0.1", source="10.53.0.1")
     # Getting message size with compression disabled
index 0f5c89a09b54eeb6d37b84bf4938cba5d74f335a..e25f2dfc9a9a906f3c44870563e5615454fb3f36 100644 (file)
@@ -17,8 +17,6 @@ import isctest
 import isctest.mark
 import isctest.run
 
-import dns.message
-
 pytestmark = [
     isctest.mark.with_lmdb,
     pytest.mark.extra_artifacts(
@@ -29,7 +27,7 @@ pytestmark = [
 
 def test_nzd2nzf(ns1):
     zone_data = '"added.example" { type primary; file "added.db"; };'
-    msg = dns.message.make_query("a.added.example.", "A")
+    msg = isctest.query.create("a.added.example.", "A")
 
     # query for non-existing zone data
     res = isctest.query.tcp(msg, ns1.ip)
index 0f19d051153ed5be19a87f7a74131ff56a59fb88..9b078a40d25b1c4b6ec7d59bdffb9b88f56edd8f 100644 (file)
 # information regarding copyright ownership.
 
 import pytest
+
 import isctest
 
-pytest.importorskip("dns")
-import dns.message
 
 pytestmark = pytest.mark.extra_artifacts(
     [
@@ -26,7 +25,7 @@ pytestmark = pytest.mark.extra_artifacts(
 
 
 def test_querysource_none():
-    msg = dns.message.make_query("example.", "A", want_dnssec=False)
+    msg = isctest.query.create("example.", "A", dnssec=False)
 
     res = isctest.query.udp(msg, "10.53.0.2")
     isctest.check.noerror(res)
@@ -43,7 +42,7 @@ def test_querysource_none():
     # using a different name below to make sure we don't use the
     # resolver cache
 
-    msg = dns.message.make_query("exampletwo.", "A", want_dnssec=False)
+    msg = isctest.query.create("exampletwo.", "A", dnssec=False)
 
     res = isctest.query.udp(msg, "fd92:7065:b8e:ffff::2")
     isctest.check.noerror(res)
index c195a0db10dd5f8dcb2a06f5f2d70b9365cfcf84..73a5117e9f0988728b15580aa4999258a5629405 100644 (file)
@@ -15,10 +15,9 @@ import socket
 import time
 
 import pytest
+
 import isctest
 
-pytest.importorskip("dns")
-import dns.message
 
 pytestmark = pytest.mark.extra_artifacts(
     [
@@ -66,6 +65,6 @@ def test_cve_2023_3341(control_port):
     # Wait for named to (possibly) crash
     time.sleep(10)
 
-    msg = dns.message.make_query("version.bind", "TXT", "CH")
+    msg = isctest.query.create("version.bind", "TXT", "CH")
     res = isctest.query.udp(msg, "10.53.0.2")
     isctest.check.noerror(res)
index 424830ed80a691600ac5815842d33c2f4eb5224b..8dfa8cddf16ac1568f9d62d38a235e08bb3bb2c5 100644 (file)
 # information regarding copyright ownership.
 
 import os
+
 import pytest
 
 pytest.importorskip("dns", minversion="2.0.0")
+import dns.rcode
+import dns.rrset
+
 import isctest
 from isctest.compat import dns_rcode
 
-import dns.message
 
 pytestmark = pytest.mark.extra_artifacts(
     [
@@ -70,7 +73,7 @@ pytestmark = pytest.mark.extra_artifacts(
 )
 def test_rpz_multiple_views(qname, source, rcode):
     # Wait for the rpz-external.local zone transfer
-    msg = dns.message.make_query("rpz-external.local", "SOA")
+    msg = isctest.query.create("rpz-external.local", "SOA")
     isctest.query.tcp(
         msg,
         ip="10.53.0.3",
@@ -84,7 +87,7 @@ def test_rpz_multiple_views(qname, source, rcode):
         expected_rcode=dns_rcode.NOERROR,
     )
 
-    msg = dns.message.make_query(qname, "A")
+    msg = isctest.query.create(qname, "A")
     res = isctest.query.udp(msg, "10.53.0.3", source=source, expected_rcode=rcode)
     if rcode == dns.rcode.NOERROR:
         assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")]
@@ -94,7 +97,7 @@ def test_rpz_passthru_logging():
     resolver_ip = "10.53.0.3"
 
     # Should generate a log entry into rpz_passthru.txt
-    msg_allowed = dns.message.make_query("allowed.", "A")
+    msg_allowed = isctest.query.create("allowed.", "A")
     res_allowed = isctest.query.udp(
         msg_allowed, resolver_ip, source="10.53.0.1", expected_rcode=dns.rcode.NOERROR
     )
@@ -103,7 +106,7 @@ def test_rpz_passthru_logging():
     ]
 
     # Should also generate a log entry into rpz_passthru.txt
-    msg_allowed_any = dns.message.make_query("allowed.", "ANY")
+    msg_allowed_any = isctest.query.create("allowed.", "ANY")
     res_allowed_any = isctest.query.udp(
         msg_allowed_any,
         resolver_ip,
@@ -121,7 +124,7 @@ def test_rpz_passthru_logging():
 
     # baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
     # Should generate a log entry into rpz.txt
-    msg_not_allowed = dns.message.make_query("baddomain.", "A")
+    msg_not_allowed = isctest.query.create("baddomain.", "A")
     res_not_allowed = isctest.query.udp(
         msg_not_allowed,
         resolver_ip,
index bbfab34cc4f78ca93ce2d7faac0e24cb0020445a..d746b3821d9f9448d476c47658c8842d02a406af 100755 (executable)
@@ -105,7 +105,7 @@ def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries
                     )
 
                 qname = relname + ".test"
-                msg = dns.message.make_query(qname, "A")
+                msg = isctest.query.create(qname, "A")
                 futures[
                     executor.submit(
                         isctest.query.udp, msg, resolver_ip, timeout=1, attempts=1
index 89df5bdce057f9db9efc4a1705e88c00a6dd58de..2c9bd00cd60567fa9b2592bf10c5a0e0ea163c6a 100644 (file)
@@ -32,14 +32,14 @@ def test_stub_zones_availability(ns3):
 
     # try an AXFR that should be denied (NOTAUTH)
     def axfr_denied():
-        msg = dns.message.make_query("child.example.", "AXFR")
+        msg = isctest.query.create("child.example.", "AXFR")
         res = isctest.query.tcp(msg, "10.53.0.3")
         isctest.check.notauth(res)
 
     # look for stub zone data without recursion (should not be found)
     def stub_zone_lookout_without_recursion():
         # drop all flags (dns.flags.RD is set by default)
-        msg = dns.message.make_query("data.child.example.", "TXT")
+        msg = isctest.query.create("data.child.example.", "TXT")
         msg.flags = 0
         res = isctest.query.tcp(msg, "10.53.0.3")
         isctest.check.noerror(res)
@@ -54,7 +54,7 @@ def test_stub_zones_availability(ns3):
     # look for stub zone data with recursion (should be found)
     def stub_zone_lookout_with_recursion():
         # dns.flags.RD is set by default
-        msg = dns.message.make_query("data.child.example.", "TXT")
+        msg = isctest.query.create("data.child.example.", "TXT")
         res = isctest.query.tcp(msg, "10.53.0.3")
         isctest.check.noerror(res)
         assert res.answer[0] == dns.rrset.from_text(
@@ -79,7 +79,7 @@ def test_stub_glue_record_with_minimal_response():
     assert os.path.exists("ns5/example.db")
 
     # this query would fail if NS glue wasn't transferred
-    msg_txt = dns.message.make_query("target.example.", "TXT", want_dnssec=False)
+    msg_txt = isctest.query.create("target.example.", "TXT", dnssec=False)
     res_txt = isctest.query.tcp(msg_txt, "10.53.0.5")
     isctest.check.noerror(res_txt)
     assert res_txt.answer[0] == dns.rrset.from_text(
@@ -87,13 +87,13 @@ def test_stub_glue_record_with_minimal_response():
     )
 
     # ensure both IPv4 and IPv6 glue records were transferred
-    msg_a = dns.message.make_query("ns4.example.", "A")
+    msg_a = isctest.query.create("ns4.example.", "A")
     res_a = isctest.query.tcp(msg_a, "10.53.0.5")
     assert res_a.answer[0] == dns.rrset.from_text(
         "ns4.example.", "300", "IN", "A", "10.53.0.4"
     )
 
-    msg_aaaa = dns.message.make_query("ns4.example.", "AAAA")
+    msg_aaaa = isctest.query.create("ns4.example.", "AAAA")
     res_aaaa = isctest.query.tcp(msg_aaaa, "10.53.0.5")
     assert res_aaaa.answer[0] == dns.rrset.from_text(
         "ns4.example.", "300", "IN", "AAAA", "fd92:7065:b8e:ffff::4"
index 4c32557e894ea867d59141861673c8e392dacc0a..38b424d4c59bc1efa768938dc170b6e4ecbaef99 100755 (executable)
@@ -56,7 +56,7 @@ class CraftedTKEYQuery:
         rrset = dns.rrset.from_rdata(dns.name.root, dns.rdatatype.TKEY, rdata)
 
         # Prepare complete TKEY query to send
-        self.msg = dns.message.make_query(
+        self.msg = isctest.query.create(
             dns.name.root, dns.rdatatype.TKEY, dns.rdataclass.ANY
         )
         self.msg.additional.append(rrset)
index 631c907c7ebd3c31f78571f36ad188cfb2be41e7..3e7f91549e5ac37e8e6b51573d09296d9eea7afd 100644 (file)
@@ -13,9 +13,6 @@ import pytest
 
 import isctest
 
-pytest.importorskip("dns")
-import dns.message
-
 
 @pytest.mark.parametrize(
     "qname,rdtype,expected_ttl",
@@ -27,7 +24,7 @@ import dns.message
     ],
 )
 def test_cache_ttl(qname, rdtype, expected_ttl):
-    msg = dns.message.make_query(qname, rdtype)
+    msg = isctest.query.create(qname, rdtype)
     response = isctest.query.udp(msg, "10.53.0.2")
     for rr in response.answer + response.authority:
         assert rr.ttl == expected_ttl
index cad2eb075787c0f61c92d0fd413bda96f6518a2e..459e3397b35ad8a5e7222fba00ae56d3a7dff15e 100755 (executable)
@@ -94,7 +94,7 @@ def test_wildcard_rdtype_mismatch(
     # See RFC 4592 section 2.2.1.
     assume(name == SUFFIX or name.labels[-len(SUFFIX) - 1] != b"*")
 
-    query_msg = dns.message.make_query(name, rdtype)
+    query_msg = isctest.query.create(name, rdtype)
     response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
 
     isctest.check.is_response_to(response_msg, query_msg)
@@ -111,7 +111,7 @@ def test_wildcard_match(name: dns.name.Name, named_port: int) -> None:
     # See RFC 4592 section 2.2.1.
     assume(name.labels[-len(SUFFIX) - 1] != b"*")
 
-    query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
+    query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
     response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
 
     isctest.check.is_response_to(response_msg, query_msg)
@@ -140,7 +140,7 @@ def test_wildcard_with_star_not_synthesized(
     name: dns.name.Name, named_port: int
 ) -> None:
     """RFC 4592 section 2.2.1 ghost.*.example."""
-    query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
+    query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
     response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
 
     isctest.check.is_response_to(response_msg, query_msg)
@@ -170,7 +170,7 @@ def test_name_in_between_wildcards(name: dns.name.Name, named_port: int) -> None
         or name.labels[-len(NESTED_SUFFIX) - 1] != b"*"
     )
 
-    query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
+    query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
     response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
 
     isctest.check.is_response_to(response_msg, query_msg)
@@ -201,7 +201,7 @@ def test_name_nested_wildcard_subdomains_not_synthesized(
 
     `foo.*.*.*.nestedwild.test. A` must not be synthesized.
     """
-    query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
+    query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
     response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
 
     isctest.check.is_response_to(response_msg, query_msg)
index d64ed58868910df2f07b223b4defdee6e71517c0..a4edc5998f019a4d24bee255fba928f34c6924a0 100644 (file)
@@ -60,8 +60,8 @@ def test_xferquota(named_port, ns1, ns2):
 
     isctest.run.retry_with_timeout(check_line_count, timeout=360)
 
-    axfr_msg = dns.message.make_query("zone000099.example.", "AXFR")
-    a_msg = dns.message.make_query("a.changing.", "A")
+    axfr_msg = isctest.query.create("zone000099.example.", "AXFR")
+    a_msg = isctest.query.create("a.changing.", "A")
 
     def query_and_compare(msg):
         ns1response = isctest.query.tcp(msg, "10.53.0.1")