From: Evan Hunt Date: Tue, 29 Jul 2025 23:08:44 +0000 (-0700) Subject: Use isctest.query.create across system tests X-Git-Tag: v9.18.39~10^2 X-Git-Url: http://git.ipfire.org/gitweb/?a=commitdiff_plain;h=2588b2a23c4ee74c9e26cc103049530ee9f08ddb;p=thirdparty%2Fbind9.git Use isctest.query.create across system tests Rather than using the dnspython's facilities and defaults to create the queries, use the isctest.query.create function in all the cases that don't require special handling to have consistent defaults. (cherry picked from commit 64143ea077c3ddb48f808af2d0b05e21209cd268) --- diff --git a/bin/tests/system/checkds/tests_checkds.py b/bin/tests/system/checkds/tests_checkds.py index 95ddce957f2..d9466349134 100755 --- a/bin/tests/system/checkds/tests_checkds.py +++ b/bin/tests/system/checkds/tests_checkds.py @@ -79,7 +79,7 @@ def has_signed_apex_nsec(zone, response): def do_query(server, qname, qtype, tcp=False): - msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) + msg = isctest.query.create(qname, qtype) query_func = isctest.query.tcp if tcp else isctest.query.udp response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR) return response diff --git a/bin/tests/system/database/tests_database.py b/bin/tests/system/database/tests_database.py index 476b81da95c..5695bf011a6 100644 --- a/bin/tests/system/database/tests_database.py +++ b/bin/tests/system/database/tests_database.py @@ -9,13 +9,13 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import isctest +import dns -import dns.message +import isctest def test_database(servers, templates): - msg = dns.message.make_query("database.", "SOA") + msg = isctest.query.create("database.", "SOA") # checking pre reload zone res = isctest.query.tcp(msg, "10.53.0.1") diff --git a/bin/tests/system/dnstap/tests_dnstap.py b/bin/tests/system/dnstap/tests_dnstap.py index 6326ef06f87..9dec5ddcc1d 100644 --- a/bin/tests/system/dnstap/tests_dnstap.py +++ b/bin/tests/system/dnstap/tests_dnstap.py @@ -18,9 +18,8 @@ import subprocess import isctest import pytest -import dns.message - pytest.importorskip("dns", minversion="2.0.0") +import dns.rrset pytestmark = pytest.mark.extra_artifacts( [ @@ -46,7 +45,7 @@ def run_rndc(server, rndc_command): def test_dnstap_dispatch_socket_addresses(): # Send some query to ns3 so that it records something in its dnstap file. - msg = dns.message.make_query("mail.example.", "A") + msg = isctest.query.create("mail.example.", "A") res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR) assert res.answer == [ dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2") diff --git a/bin/tests/system/doth/tests_gnutls.py b/bin/tests/system/doth/tests_gnutls.py index f49b4401c51..9c897714efe 100644 --- a/bin/tests/system/doth/tests_gnutls.py +++ b/bin/tests/system/doth/tests_gnutls.py @@ -20,11 +20,12 @@ import pytest pytest.importorskip("dns") import dns.exception -import dns.message import dns.name import dns.rdataclass import dns.rdatatype +import isctest + pytestmark = pytest.mark.extra_artifacts( [ "gnutls-cli.*", @@ -35,7 +36,7 @@ pytestmark = pytest.mark.extra_artifacts( def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport): # Prepare the example/SOA query which will be sent over TLS. - query = dns.message.make_query("example.", dns.rdatatype.SOA) + query = isctest.query.create("example.", dns.rdatatype.SOA) query_wire = query.to_wire() query_with_length = struct.pack(">H", len(query_wire)) + query_wire diff --git a/bin/tests/system/dsdigest/tests_dsdigest.py b/bin/tests/system/dsdigest/tests_dsdigest.py index f741a21147c..ccf1431e238 100644 --- a/bin/tests/system/dsdigest/tests_dsdigest.py +++ b/bin/tests/system/dsdigest/tests_dsdigest.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import dns.message +import dns.flags import pytest import isctest @@ -29,7 +29,7 @@ pytestmark = pytest.mark.extra_artifacts( def test_dsdigest_good(): """Check that validation with enabled digest types works""" - msg = dns.message.make_query("a.good.", "A", want_dnssec=True) + msg = isctest.query.create("a.good.", "A") res = isctest.query.tcp( msg, "10.53.0.3", @@ -51,7 +51,7 @@ def test_dsdigest_bad(): def test_dsdigest_insecure(): """Check that validation with not supported digest algorithms is insecure""" - msg_ds = dns.message.make_query("bad.", "DS", want_dnssec=True) + msg_ds = isctest.query.create("bad.", "DS") res_ds = isctest.query.tcp( msg_ds, "10.53.0.4", @@ -59,7 +59,7 @@ def test_dsdigest_insecure(): isctest.check.noerror(res_ds) assert res_ds.flags & dns.flags.AD - msg_a = dns.message.make_query("a.bad.", "A", want_dnssec=True) + msg_a = isctest.query.create("a.bad.", "A") res_a = isctest.query.tcp( msg_a, "10.53.0.4", diff --git a/bin/tests/system/emptyzones/tests_emptyzones.py b/bin/tests/system/emptyzones/tests_emptyzones.py index 7a8d3966bde..6c2fa56ed5e 100644 --- a/bin/tests/system/emptyzones/tests_emptyzones.py +++ b/bin/tests/system/emptyzones/tests_emptyzones.py @@ -9,8 +9,6 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import dns.message - import isctest @@ -20,11 +18,11 @@ def test_emptyzones(servers, templates): ns1.rndc("reload") templates.render("ns1/named.conf", {"automatic_empty_zones": True}) ns1.rndc("reload") - msg = dns.message.make_query("version.bind", "TXT", "CH") + msg = isctest.query.create("version.bind", "TXT", "CH") res = isctest.query.tcp(msg, "10.53.0.1") isctest.check.noerror(res) # check that allow-transfer { none; } works - msg = dns.message.make_query("10.in-addr.arpa", "AXFR") + msg = isctest.query.create("10.in-addr.arpa", "AXFR") res = isctest.query.tcp(msg, "10.53.0.1") isctest.check.refused(res) diff --git a/bin/tests/system/glue/tests_glue.py b/bin/tests/system/glue/tests_glue.py index 77dff57158c..faf251367da 100644 --- a/bin/tests/system/glue/tests_glue.py +++ b/bin/tests/system/glue/tests_glue.py @@ -10,6 +10,7 @@ # information regarding copyright ownership. +import dns.flags import dns.message import pytest @@ -20,7 +21,7 @@ pytest.importorskip("dns", minversion="2.0.0") def test_glue_full_glue_set(): """test that a ccTLD referral gets a full glue set from the root zone""" - msg = dns.message.make_query("foo.bar.fi", "A") + msg = isctest.query.create("foo.bar.fi", "A") msg.flags &= ~dns.flags.RD res = isctest.query.udp(msg, "10.53.0.1") @@ -51,7 +52,7 @@ NS.UU.NET. 172800 IN A 137.39.1.3 def test_glue_no_glue_set(): """test that out-of-zone glue is not found""" - msg = dns.message.make_query("example.net.", "A") + msg = isctest.query.create("example.net.", "A") msg.flags &= ~dns.flags.RD res = isctest.query.udp(msg, "10.53.0.1") diff --git a/bin/tests/system/hooks/tests_async_plugin.py b/bin/tests/system/hooks/tests_async_plugin.py index ac89c85ac00..cb70c5c78c3 100644 --- a/bin/tests/system/hooks/tests_async_plugin.py +++ b/bin/tests/system/hooks/tests_async_plugin.py @@ -9,15 +9,11 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import pytest import isctest -pytest.importorskip("dns") -import dns.message - def test_async_hook(): - msg = dns.message.make_query("example.com.", "A") + msg = isctest.query.create("example.com.", "A") res = isctest.query.udp(msg, "10.53.0.1") # the test-async plugin changes the status of any positive answer to NOTIMP isctest.check.notimp(res) diff --git a/bin/tests/system/include-multiplecfg/tests_include_multiplecfg.py b/bin/tests/system/include-multiplecfg/tests_include_multiplecfg.py index 346f33453ee..dffc917c9de 100644 --- a/bin/tests/system/include-multiplecfg/tests_include_multiplecfg.py +++ b/bin/tests/system/include-multiplecfg/tests_include_multiplecfg.py @@ -11,10 +11,10 @@ import os -import isctest +import dns.rrset import pytest -import dns.message +import isctest @pytest.mark.parametrize( @@ -26,7 +26,7 @@ import dns.message ], ) def test_include_multiplecfg(qname): - msg = dns.message.make_query(qname, "A") + msg = isctest.query.create(qname, "A") res = isctest.query.tcp(msg, "10.53.0.2") isctest.check.noerror(res) diff --git a/bin/tests/system/isctest/check.py b/bin/tests/system/isctest/check.py index 7095dd9fcab..e173ed4ca02 100644 --- a/bin/tests/system/isctest/check.py +++ b/bin/tests/system/isctest/check.py @@ -13,8 +13,8 @@ import shutil from typing import Optional import dns.flags -import dns.rcode import dns.message +import dns.rcode import dns.zone import isctest.log @@ -152,6 +152,16 @@ def is_executable(cmd: str, errmsg: str) -> None: assert executable is not None, errmsg +def named_alive(named_proc, resolver_ip): + assert named_proc.poll() is None, "named isn't running" + msg = isctest.query.create("version.bind", "TXT", "CH") + isctest.query.tcp(msg, resolver_ip, expected_rcode=dns_rcode.NOERROR) + + +def notauth(message: dns.message.Message) -> None: + rcode(message, dns.rcode.NOTAUTH) + + def nxdomain(message: dns.message.Message) -> None: rcode(message, dns.rcode.NXDOMAIN) diff --git a/bin/tests/system/limits/tests_limits.py b/bin/tests/system/limits/tests_limits.py index 1fe5ea00877..ca7214a9de4 100644 --- a/bin/tests/system/limits/tests_limits.py +++ b/bin/tests/system/limits/tests_limits.py @@ -14,11 +14,10 @@ import itertools import isctest import pytest -import dns.message - # Everything from getting a big answer to creating an RR set with thousands # of records takes minutes of CPU and real time with dnspython < 2.0.0. pytest.importorskip("dns", minversion="2.0.0") +import dns.rrset @pytest.mark.parametrize( @@ -32,7 +31,7 @@ pytest.importorskip("dns", minversion="2.0.0") ], ) def test_limits(name, limit): - msg_query = dns.message.make_query(f"{name}.example.", "A") + msg_query = isctest.query.create(f"{name}.example.", "A") res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False) iplist = [ @@ -46,7 +45,7 @@ def test_limits(name, limit): def test_limit_exceeded(): - msg_query = dns.message.make_query("5000.example.", "A") + msg_query = isctest.query.create("5000.example.", "A") res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False) assert res.flags & dns.flags.TC, "TC flag was not set" diff --git a/bin/tests/system/masterfile/tests_masterfile.py b/bin/tests/system/masterfile/tests_masterfile.py index 9aaaa769a53..5d7baf58b39 100644 --- a/bin/tests/system/masterfile/tests_masterfile.py +++ b/bin/tests/system/masterfile/tests_masterfile.py @@ -19,7 +19,7 @@ import isctest def test_masterfile_include_semantics(): """Test master file $INCLUDE semantics""" - msg_axfr = dns.message.make_query("include.", "AXFR") + msg_axfr = isctest.query.create("include.", "AXFR") res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1") axfr_include_semantics = """;ANSWER include. 300 IN SOA ns.include. hostmaster.include. 1 3600 1800 1814400 3600 @@ -40,7 +40,7 @@ ns.include. 300 IN A 127.0.0.1 def test_masterfile_bind_8_compat_semantics(): """Test master file BIND 8 TTL and $TTL semantics compatibility""" - msg_axfr = dns.message.make_query("ttl1.", "AXFR") + msg_axfr = isctest.query.create("ttl1.", "AXFR") res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1") axfr_ttl_semantics = """;ANSWER ttl1. 3 IN SOA ns.ttl1. hostmaster.ttl1. 1 3600 1800 1814400 3 @@ -59,7 +59,7 @@ ns.ttl1. 3 IN A 10.53.0.1 def test_masterfile_rfc_1035_semantics(): """Test master file RFC1035 TTL and $TTL semantics""" - msg_axfr = dns.message.make_query("ttl2.", "AXFR") + msg_axfr = isctest.query.create("ttl2.", "AXFR") res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1") axfr_ttl_semantics = """;ANSWER ttl2. 1 IN SOA ns.ttl2. hostmaster.ttl2. 1 3600 1800 1814400 3 @@ -78,7 +78,7 @@ ns.ttl2. 1 IN A 10.53.0.1 def test_masterfile_missing_master_file(): """Test nameserver running with a missing master file""" - msg_soa = dns.message.make_query("example.", "SOA") + msg_soa = isctest.query.create("example.", "SOA") res_soa = isctest.query.tcp(msg_soa, "10.53.0.2") expected_soa_rr = """;ANSWER example. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600 @@ -89,7 +89,7 @@ example. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600 def test_masterfile_missing_master_file_servfail(): """Test nameserver returning SERVFAIL for a missing master file""" - msg_soa = dns.message.make_query("missing.", "SOA") + msg_soa = isctest.query.create("missing.", "SOA") res_soa = isctest.query.tcp(msg_soa, "10.53.0.2") isctest.check.servfail(res_soa) diff --git a/bin/tests/system/names/tests_names.py b/bin/tests/system/names/tests_names.py index e4fc296277f..b72fcbec980 100644 --- a/bin/tests/system/names/tests_names.py +++ b/bin/tests/system/names/tests_names.py @@ -13,8 +13,6 @@ import pytest pytest.importorskip("dns", minversion="2.7.0") - -import dns.message import isctest @@ -22,7 +20,7 @@ import isctest # about twice as large as the answer with compression enabled, while # maintaining identical content. def test_names(): - msg = dns.message.make_query("example.", "MX") + msg = isctest.query.create("example.", "MX") # Getting message size with compression enabled res_enabled = isctest.query.tcp(msg, ip="10.53.0.1", source="10.53.0.1") # Getting message size with compression disabled diff --git a/bin/tests/system/rndc/tests_cve-2023-3341.py b/bin/tests/system/rndc/tests_cve-2023-3341.py index c195a0db10d..73a5117e9f0 100644 --- a/bin/tests/system/rndc/tests_cve-2023-3341.py +++ b/bin/tests/system/rndc/tests_cve-2023-3341.py @@ -15,10 +15,9 @@ import socket import time import pytest + import isctest -pytest.importorskip("dns") -import dns.message pytestmark = pytest.mark.extra_artifacts( [ @@ -66,6 +65,6 @@ def test_cve_2023_3341(control_port): # Wait for named to (possibly) crash time.sleep(10) - msg = dns.message.make_query("version.bind", "TXT", "CH") + msg = isctest.query.create("version.bind", "TXT", "CH") res = isctest.query.udp(msg, "10.53.0.2") isctest.check.noerror(res) diff --git a/bin/tests/system/rpzextra/tests_rpzextra.py b/bin/tests/system/rpzextra/tests_rpzextra.py index 359b7aab437..e918fa832e2 100644 --- a/bin/tests/system/rpzextra/tests_rpzextra.py +++ b/bin/tests/system/rpzextra/tests_rpzextra.py @@ -12,13 +12,16 @@ # information regarding copyright ownership. import os + import pytest pytest.importorskip("dns", minversion="2.0.0") +import dns.rcode +import dns.rrset + import isctest from isctest.compat import dns_rcode -import dns.message pytestmark = pytest.mark.extra_artifacts( [ @@ -70,7 +73,7 @@ pytestmark = pytest.mark.extra_artifacts( ) def test_rpz_multiple_views(qname, source, rcode): # Wait for the rpz-external.local zone transfer - msg = dns.message.make_query("rpz-external.local", "SOA") + msg = isctest.query.create("rpz-external.local", "SOA") isctest.query.tcp( msg, ip="10.53.0.3", @@ -84,7 +87,7 @@ def test_rpz_multiple_views(qname, source, rcode): expected_rcode=dns_rcode.NOERROR, ) - msg = dns.message.make_query(qname, "A") + msg = isctest.query.create(qname, "A") res = isctest.query.udp(msg, "10.53.0.3", source=source, expected_rcode=rcode) if rcode == dns.rcode.NOERROR: assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")] @@ -94,7 +97,7 @@ def test_rpz_passthru_logging(): resolver_ip = "10.53.0.3" # Should generate a log entry into rpz_passthru.txt - msg_allowed = dns.message.make_query("allowed.", "A") + msg_allowed = isctest.query.create("allowed.", "A") res_allowed = isctest.query.udp( msg_allowed, resolver_ip, source="10.53.0.1", expected_rcode=dns.rcode.NOERROR ) @@ -104,7 +107,7 @@ def test_rpz_passthru_logging(): # baddomain.com isn't allowed (CNAME .), should return NXDOMAIN # Should generate a log entry into rpz.txt - msg_not_allowed = dns.message.make_query("baddomain.", "A") + msg_not_allowed = isctest.query.create("baddomain.", "A") res_not_allowed = isctest.query.udp( msg_not_allowed, resolver_ip, diff --git a/bin/tests/system/shutdown/tests_shutdown.py b/bin/tests/system/shutdown/tests_shutdown.py index d9c7e67f619..3d0c410ee08 100755 --- a/bin/tests/system/shutdown/tests_shutdown.py +++ b/bin/tests/system/shutdown/tests_shutdown.py @@ -105,7 +105,7 @@ def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries ) qname = relname + ".test" - msg = dns.message.make_query(qname, "A") + msg = isctest.query.create(qname, "A") futures[ executor.submit( isctest.query.udp, msg, resolver_ip, timeout=1, attempts=1 diff --git a/bin/tests/system/tsiggss/tests_isc_spnego_flaws.py b/bin/tests/system/tsiggss/tests_isc_spnego_flaws.py index 4c32557e894..38b424d4c59 100755 --- a/bin/tests/system/tsiggss/tests_isc_spnego_flaws.py +++ b/bin/tests/system/tsiggss/tests_isc_spnego_flaws.py @@ -56,7 +56,7 @@ class CraftedTKEYQuery: rrset = dns.rrset.from_rdata(dns.name.root, dns.rdatatype.TKEY, rdata) # Prepare complete TKEY query to send - self.msg = dns.message.make_query( + self.msg = isctest.query.create( dns.name.root, dns.rdatatype.TKEY, dns.rdataclass.ANY ) self.msg.additional.append(rrset) diff --git a/bin/tests/system/ttl/tests_cache_ttl.py b/bin/tests/system/ttl/tests_cache_ttl.py index 631c907c7eb..3e7f91549e5 100644 --- a/bin/tests/system/ttl/tests_cache_ttl.py +++ b/bin/tests/system/ttl/tests_cache_ttl.py @@ -13,9 +13,6 @@ import pytest import isctest -pytest.importorskip("dns") -import dns.message - @pytest.mark.parametrize( "qname,rdtype,expected_ttl", @@ -27,7 +24,7 @@ import dns.message ], ) def test_cache_ttl(qname, rdtype, expected_ttl): - msg = dns.message.make_query(qname, rdtype) + msg = isctest.query.create(qname, rdtype) response = isctest.query.udp(msg, "10.53.0.2") for rr in response.answer + response.authority: assert rr.ttl == expected_ttl diff --git a/bin/tests/system/wildcard/tests_wildcard.py b/bin/tests/system/wildcard/tests_wildcard.py index cad2eb07578..459e3397b35 100755 --- a/bin/tests/system/wildcard/tests_wildcard.py +++ b/bin/tests/system/wildcard/tests_wildcard.py @@ -94,7 +94,7 @@ def test_wildcard_rdtype_mismatch( # See RFC 4592 section 2.2.1. assume(name == SUFFIX or name.labels[-len(SUFFIX) - 1] != b"*") - query_msg = dns.message.make_query(name, rdtype) + query_msg = isctest.query.create(name, rdtype) response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT) isctest.check.is_response_to(response_msg, query_msg) @@ -111,7 +111,7 @@ def test_wildcard_match(name: dns.name.Name, named_port: int) -> None: # See RFC 4592 section 2.2.1. assume(name.labels[-len(SUFFIX) - 1] != b"*") - query_msg = dns.message.make_query(name, WILDCARD_RDTYPE) + query_msg = isctest.query.create(name, WILDCARD_RDTYPE) response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT) isctest.check.is_response_to(response_msg, query_msg) @@ -140,7 +140,7 @@ def test_wildcard_with_star_not_synthesized( name: dns.name.Name, named_port: int ) -> None: """RFC 4592 section 2.2.1 ghost.*.example.""" - query_msg = dns.message.make_query(name, WILDCARD_RDTYPE) + query_msg = isctest.query.create(name, WILDCARD_RDTYPE) response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT) isctest.check.is_response_to(response_msg, query_msg) @@ -170,7 +170,7 @@ def test_name_in_between_wildcards(name: dns.name.Name, named_port: int) -> None or name.labels[-len(NESTED_SUFFIX) - 1] != b"*" ) - query_msg = dns.message.make_query(name, WILDCARD_RDTYPE) + query_msg = isctest.query.create(name, WILDCARD_RDTYPE) response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT) isctest.check.is_response_to(response_msg, query_msg) @@ -201,7 +201,7 @@ def test_name_nested_wildcard_subdomains_not_synthesized( `foo.*.*.*.nestedwild.test. A` must not be synthesized. """ - query_msg = dns.message.make_query(name, WILDCARD_RDTYPE) + query_msg = isctest.query.create(name, WILDCARD_RDTYPE) response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT) isctest.check.is_response_to(response_msg, query_msg) diff --git a/bin/tests/system/xferquota/tests_xferquota.py b/bin/tests/system/xferquota/tests_xferquota.py index c31700e4a26..1cb169156e1 100644 --- a/bin/tests/system/xferquota/tests_xferquota.py +++ b/bin/tests/system/xferquota/tests_xferquota.py @@ -60,8 +60,8 @@ def test_xferquota(named_port, servers): isctest.run.retry_with_timeout(check_line_count, timeout=360) - axfr_msg = dns.message.make_query("zone000099.example.", "AXFR") - a_msg = dns.message.make_query("a.changing.", "A") + axfr_msg = isctest.query.create("zone000099.example.", "AXFR") + a_msg = isctest.query.create("a.changing.", "A") def query_and_compare(msg): ns1response = isctest.query.tcp(msg, "10.53.0.1")