def do_query(server, qname, qtype, tcp=False):
- msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
+ msg = isctest.query.create(qname, qtype)
query_func = isctest.query.tcp if tcp else isctest.query.udp
response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
return response
)
# pylint: disable=redefined-outer-name,unused-argument
def test_cipher_suites_tls_xfer(qname, ns, rcode, transfers_complete):
- msg = dns.message.make_query(qname, "AXFR")
+ msg = isctest.query.create(qname, "AXFR")
ans = isctest.query.tls(msg, f"10.53.0.{ns}")
assert ans.rcode() == rcode
if rcode == dns.rcode.NOERROR:
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-import isctest
+import dns
-import dns.message
+import isctest
def test_database(ns1, templates):
- msg = dns.message.make_query("database.", "SOA")
+ msg = isctest.query.create("database.", "SOA")
# checking pre reload zone
res = isctest.query.tcp(msg, "10.53.0.1")
import isctest.mark
import pytest
-import dns.message
-
pytest.importorskip("dns", minversion="2.0.0")
+import dns.rrset
pytestmark = [
isctest.mark.with_dnstap,
def test_dnstap_dispatch_socket_addresses():
# Send some query to ns3 so that it records something in its dnstap file.
- msg = dns.message.make_query("mail.example.", "A")
+ msg = isctest.query.create("mail.example.", "A")
res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR)
assert res.answer == [
dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2")
pytest.importorskip("dns")
import dns.exception
-import dns.message
import dns.name
import dns.rdataclass
import dns.rdatatype
+import isctest
+
pytestmark = pytest.mark.extra_artifacts(
[
"gnutls-cli.*",
def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport):
# Prepare the example/SOA query which will be sent over TLS.
- query = dns.message.make_query("example.", dns.rdatatype.SOA)
+ query = isctest.query.create("example.", dns.rdatatype.SOA)
query_wire = query.to_wire()
query_with_length = struct.pack(">H", len(query_wire)) + query_wire
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-import dns.message
+import dns.flags
import pytest
import isctest
def test_dsdigest_good():
"""Check that validation with enabled digest types works"""
- msg = dns.message.make_query("a.good.", "A", want_dnssec=True)
+ msg = isctest.query.create("a.good.", "A")
res = isctest.query.tcp(
msg,
"10.53.0.3",
def test_dsdigest_insecure():
"""Check that validation with not supported digest algorithms is insecure"""
- msg_ds = dns.message.make_query("bad.", "DS", want_dnssec=True)
+ msg_ds = isctest.query.create("bad.", "DS")
res_ds = isctest.query.tcp(
msg_ds,
"10.53.0.4",
isctest.check.noerror(res_ds)
assert res_ds.flags & dns.flags.AD
- msg_a = dns.message.make_query("a.bad.", "A", want_dnssec=True)
+ msg_a = isctest.query.create("a.bad.", "A")
res_a = isctest.query.tcp(
msg_a,
"10.53.0.4",
import os
import pytest
-import dns.message
+import dns.flags
+
import isctest
def check_server_soa(resolver):
- msg = dns.message.make_query(".", "SOA")
- msg.flags += dns.flags.AD
+ msg = isctest.query.create(".", "SOA")
res1 = isctest.query.tcp(msg, "10.53.0.1")
res2 = isctest.query.tcp(msg, resolver)
isctest.check.rrsets_equal(res1.answer, res2.answer)
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-import dns.message
-
import isctest
watcher.wait_for_line("all zones loaded")
templates.render("ns1/named.conf", {"automatic_empty_zones": True})
ns1.rndc("reload")
- msg = dns.message.make_query("version.bind", "TXT", "CH")
+ msg = isctest.query.create("version.bind", "TXT", "CH")
res = isctest.query.tcp(msg, "10.53.0.1")
isctest.check.noerror(res)
# check that allow-transfer { none; } works
- msg = dns.message.make_query("10.in-addr.arpa", "AXFR")
+ msg = isctest.query.create("10.in-addr.arpa", "AXFR")
res = isctest.query.tcp(msg, "10.53.0.1")
isctest.check.refused(res)
# information regarding copyright ownership.
+import dns.flags
import dns.message
import pytest
def test_glue_full_glue_set():
"""test that a ccTLD referral gets a full glue set from the root zone"""
- msg = dns.message.make_query("foo.bar.fi", "A")
+ msg = isctest.query.create("foo.bar.fi", "A")
msg.flags &= ~dns.flags.RD
res = isctest.query.udp(msg, "10.53.0.1")
def test_glue_no_glue_set():
"""test that out-of-zone glue is not found"""
- msg = dns.message.make_query("example.net.", "A")
+ msg = isctest.query.create("example.net.", "A")
msg.flags &= ~dns.flags.RD
res = isctest.query.udp(msg, "10.53.0.1")
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-import pytest
import isctest
-pytest.importorskip("dns")
-import dns.message
-
def test_hooks():
- msg = dns.message.make_query("example.com.", "A")
+ msg = isctest.query.create("example.com.", "A")
res = isctest.query.udp(msg, "10.53.0.1")
# the test-async plugin changes the status of any positive answer to NOTIMP
isctest.check.notimp(res)
import os
-import isctest
+import dns.rrset
import pytest
-import dns.message
+import isctest
@pytest.mark.parametrize(
],
)
def test_include_multiplecfg(qname):
- msg = dns.message.make_query(qname, "A")
+ msg = isctest.query.create(qname, "A")
res = isctest.query.tcp(msg, "10.53.0.2")
isctest.check.noerror(res)
from typing import Optional
import dns.flags
-import dns.rcode
import dns.message
+import dns.rcode
import dns.zone
import isctest.log
def named_alive(named_proc, resolver_ip):
assert named_proc.poll() is None, "named isn't running"
- msg = dns.message.make_query("version.bind", "TXT", "CH")
+ msg = isctest.query.create("version.bind", "TXT", "CH")
isctest.query.tcp(msg, resolver_ip, expected_rcode=dns_rcode.NOERROR)
import dns
import dns.tsig
-from isctest.instance import NamedInstance
import isctest.log
import isctest.query
import isctest.util
+from isctest.instance import NamedInstance
from isctest.vars.algorithms import Algorithm, ALL_ALGORITHMS_BY_NUM
DEFAULT_TTL = 300
def _query(server, qname, qtype, tsig=None):
- query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
+ query = isctest.query.create(qname, qtype)
if tsig is not None:
tsigkey = tsig.split(":")
qname = f"view.{zone}."
qtype = dns.rdatatype.TXT
rdata = txt_rdata
- query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
+ query = isctest.query.create(qname, qtype)
tsigkey = tsig.split(":")
keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
query.use_tsig(keyring)
def query_soa(qname):
fqdn = dns.name.from_text(qname)
qtype = dns.rdatatype.SOA
- query = dns.message.make_query(fqdn, qtype, use_edns=True, want_dnssec=True)
+ query = isctest.query.create(fqdn, qtype)
try:
response = isctest.query.tcp(query, ns6.ip, ns6.ports.dns, timeout=3)
except dns.exception.Timeout:
import isctest
import pytest
-import dns.message
-
# Everything from getting a big answer to creating an RR set with thousands
# of records takes minutes of CPU and real time with dnspython < 2.0.0.
pytest.importorskip("dns", minversion="2.0.0")
+import dns.rrset
@pytest.mark.parametrize(
],
)
def test_limits(name, limit):
- msg_query = dns.message.make_query(f"{name}.example.", "A")
+ msg_query = isctest.query.create(f"{name}.example.", "A")
res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False)
iplist = [
def test_limit_exceeded():
- msg_query = dns.message.make_query("5000.example.", "A")
+ msg_query = isctest.query.create("5000.example.", "A")
res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False)
assert res.flags & dns.flags.TC, "TC flag was not set"
def test_masterfile_include_semantics():
"""Test master file $INCLUDE semantics"""
- msg_axfr = dns.message.make_query("include.", "AXFR")
+ msg_axfr = isctest.query.create("include.", "AXFR")
res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
axfr_include_semantics = """;ANSWER
include. 300 IN SOA ns.include. hostmaster.include. 1 3600 1800 1814400 3600
def test_masterfile_bind_8_compat_semantics():
"""Test master file BIND 8 TTL and $TTL semantics compatibility"""
- msg_axfr = dns.message.make_query("ttl1.", "AXFR")
+ msg_axfr = isctest.query.create("ttl1.", "AXFR")
res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
axfr_ttl_semantics = """;ANSWER
ttl1. 3 IN SOA ns.ttl1. hostmaster.ttl1. 1 3600 1800 1814400 3
def test_masterfile_rfc_1035_semantics():
"""Test master file RFC1035 TTL and $TTL semantics"""
- msg_axfr = dns.message.make_query("ttl2.", "AXFR")
+ msg_axfr = isctest.query.create("ttl2.", "AXFR")
res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
axfr_ttl_semantics = """;ANSWER
ttl2. 1 IN SOA ns.ttl2. hostmaster.ttl2. 1 3600 1800 1814400 3
def test_masterfile_missing_master_file():
"""Test nameserver running with a missing master file"""
- msg_soa = dns.message.make_query("example.", "SOA")
+ msg_soa = isctest.query.create("example.", "SOA")
res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
expected_soa_rr = """;ANSWER
example. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600
"""Test zone configurations with initial template files"""
# example inherited its configuration from the template,
# make sure it works
- msg_soa = dns.message.make_query("example.", "SOA")
+ msg_soa = isctest.query.create("example.", "SOA")
res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
expected_soa_rr = """;ANSWER
example. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600
# initial uses an initial-file option with the "file"
# option set to "copied.db". make sure it works and that
# copied.db has been populated.
- msg_soa = dns.message.make_query("initial.", "SOA")
+ msg_soa = isctest.query.create("initial.", "SOA")
res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
expected_soa_rr = """;ANSWER
initial. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600
# present uses an initial-file option, but the file 'present.db'
# already exists and is empty, so the initial-file should not be
# copied into place and the zone should not load.
- msg_soa = dns.message.make_query("present.", "SOA")
+ msg_soa = isctest.query.create("present.", "SOA")
res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
isctest.check.servfail(res_soa)
isctest.check.file_empty("ns2/present.db")
"""Test zone configurations with overridden template options"""
# different inherited configuration from the template, but
# overrides the "file" option to 'alternate.db'.
- msg_soa = dns.message.make_query("different.", "SOA")
+ msg_soa = isctest.query.create("different.", "SOA")
res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
expected_soa_rr = """;ANSWER
different. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600
def test_masterfile_missing_master_file_servfail():
"""Test nameserver returning SERVFAIL for a missing master file"""
- msg_soa = dns.message.make_query("missing.", "SOA")
+ msg_soa = isctest.query.create("missing.", "SOA")
res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
isctest.check.servfail(res_soa)
pytest.importorskip("dns", minversion="2.7.0")
-
-import dns.message
import isctest
# about twice as large as the answer with compression enabled, while
# maintaining identical content.
def test_names():
- msg = dns.message.make_query("example.", "MX")
+ msg = isctest.query.create("example.", "MX")
# Getting message size with compression enabled
res_enabled = isctest.query.tcp(msg, ip="10.53.0.1", source="10.53.0.1")
# Getting message size with compression disabled
import isctest.mark
import isctest.run
-import dns.message
-
pytestmark = [
isctest.mark.with_lmdb,
pytest.mark.extra_artifacts(
def test_nzd2nzf(ns1):
zone_data = '"added.example" { type primary; file "added.db"; };'
- msg = dns.message.make_query("a.added.example.", "A")
+ msg = isctest.query.create("a.added.example.", "A")
# query for non-existing zone data
res = isctest.query.tcp(msg, ns1.ip)
# information regarding copyright ownership.
import pytest
+
import isctest
-pytest.importorskip("dns")
-import dns.message
pytestmark = pytest.mark.extra_artifacts(
[
def test_querysource_none():
- msg = dns.message.make_query("example.", "A", want_dnssec=False)
+ msg = isctest.query.create("example.", "A", dnssec=False)
res = isctest.query.udp(msg, "10.53.0.2")
isctest.check.noerror(res)
# using a different name below to make sure we don't use the
# resolver cache
- msg = dns.message.make_query("exampletwo.", "A", want_dnssec=False)
+ msg = isctest.query.create("exampletwo.", "A", dnssec=False)
res = isctest.query.udp(msg, "fd92:7065:b8e:ffff::2")
isctest.check.noerror(res)
import time
import pytest
+
import isctest
-pytest.importorskip("dns")
-import dns.message
pytestmark = pytest.mark.extra_artifacts(
[
# Wait for named to (possibly) crash
time.sleep(10)
- msg = dns.message.make_query("version.bind", "TXT", "CH")
+ msg = isctest.query.create("version.bind", "TXT", "CH")
res = isctest.query.udp(msg, "10.53.0.2")
isctest.check.noerror(res)
# information regarding copyright ownership.
import os
+
import pytest
pytest.importorskip("dns", minversion="2.0.0")
+import dns.rcode
+import dns.rrset
+
import isctest
from isctest.compat import dns_rcode
-import dns.message
pytestmark = pytest.mark.extra_artifacts(
[
)
def test_rpz_multiple_views(qname, source, rcode):
# Wait for the rpz-external.local zone transfer
- msg = dns.message.make_query("rpz-external.local", "SOA")
+ msg = isctest.query.create("rpz-external.local", "SOA")
isctest.query.tcp(
msg,
ip="10.53.0.3",
expected_rcode=dns_rcode.NOERROR,
)
- msg = dns.message.make_query(qname, "A")
+ msg = isctest.query.create(qname, "A")
res = isctest.query.udp(msg, "10.53.0.3", source=source, expected_rcode=rcode)
if rcode == dns.rcode.NOERROR:
assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")]
resolver_ip = "10.53.0.3"
# Should generate a log entry into rpz_passthru.txt
- msg_allowed = dns.message.make_query("allowed.", "A")
+ msg_allowed = isctest.query.create("allowed.", "A")
res_allowed = isctest.query.udp(
msg_allowed, resolver_ip, source="10.53.0.1", expected_rcode=dns.rcode.NOERROR
)
]
# Should also generate a log entry into rpz_passthru.txt
- msg_allowed_any = dns.message.make_query("allowed.", "ANY")
+ msg_allowed_any = isctest.query.create("allowed.", "ANY")
res_allowed_any = isctest.query.udp(
msg_allowed_any,
resolver_ip,
# baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# Should generate a log entry into rpz.txt
- msg_not_allowed = dns.message.make_query("baddomain.", "A")
+ msg_not_allowed = isctest.query.create("baddomain.", "A")
res_not_allowed = isctest.query.udp(
msg_not_allowed,
resolver_ip,
)
qname = relname + ".test"
- msg = dns.message.make_query(qname, "A")
+ msg = isctest.query.create(qname, "A")
futures[
executor.submit(
isctest.query.udp, msg, resolver_ip, timeout=1, attempts=1
# try an AXFR that should be denied (NOTAUTH)
def axfr_denied():
- msg = dns.message.make_query("child.example.", "AXFR")
+ msg = isctest.query.create("child.example.", "AXFR")
res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.notauth(res)
# look for stub zone data without recursion (should not be found)
def stub_zone_lookout_without_recursion():
# drop all flags (dns.flags.RD is set by default)
- msg = dns.message.make_query("data.child.example.", "TXT")
+ msg = isctest.query.create("data.child.example.", "TXT")
msg.flags = 0
res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.noerror(res)
# look for stub zone data with recursion (should be found)
def stub_zone_lookout_with_recursion():
# dns.flags.RD is set by default
- msg = dns.message.make_query("data.child.example.", "TXT")
+ msg = isctest.query.create("data.child.example.", "TXT")
res = isctest.query.tcp(msg, "10.53.0.3")
isctest.check.noerror(res)
assert res.answer[0] == dns.rrset.from_text(
assert os.path.exists("ns5/example.db")
# this query would fail if NS glue wasn't transferred
- msg_txt = dns.message.make_query("target.example.", "TXT", want_dnssec=False)
+ msg_txt = isctest.query.create("target.example.", "TXT", dnssec=False)
res_txt = isctest.query.tcp(msg_txt, "10.53.0.5")
isctest.check.noerror(res_txt)
assert res_txt.answer[0] == dns.rrset.from_text(
)
# ensure both IPv4 and IPv6 glue records were transferred
- msg_a = dns.message.make_query("ns4.example.", "A")
+ msg_a = isctest.query.create("ns4.example.", "A")
res_a = isctest.query.tcp(msg_a, "10.53.0.5")
assert res_a.answer[0] == dns.rrset.from_text(
"ns4.example.", "300", "IN", "A", "10.53.0.4"
)
- msg_aaaa = dns.message.make_query("ns4.example.", "AAAA")
+ msg_aaaa = isctest.query.create("ns4.example.", "AAAA")
res_aaaa = isctest.query.tcp(msg_aaaa, "10.53.0.5")
assert res_aaaa.answer[0] == dns.rrset.from_text(
"ns4.example.", "300", "IN", "AAAA", "fd92:7065:b8e:ffff::4"
rrset = dns.rrset.from_rdata(dns.name.root, dns.rdatatype.TKEY, rdata)
# Prepare complete TKEY query to send
- self.msg = dns.message.make_query(
+ self.msg = isctest.query.create(
dns.name.root, dns.rdatatype.TKEY, dns.rdataclass.ANY
)
self.msg.additional.append(rrset)
import isctest
-pytest.importorskip("dns")
-import dns.message
-
@pytest.mark.parametrize(
"qname,rdtype,expected_ttl",
],
)
def test_cache_ttl(qname, rdtype, expected_ttl):
- msg = dns.message.make_query(qname, rdtype)
+ msg = isctest.query.create(qname, rdtype)
response = isctest.query.udp(msg, "10.53.0.2")
for rr in response.answer + response.authority:
assert rr.ttl == expected_ttl
# See RFC 4592 section 2.2.1.
assume(name == SUFFIX or name.labels[-len(SUFFIX) - 1] != b"*")
- query_msg = dns.message.make_query(name, rdtype)
+ query_msg = isctest.query.create(name, rdtype)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg)
# See RFC 4592 section 2.2.1.
assume(name.labels[-len(SUFFIX) - 1] != b"*")
- query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
+ query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg)
name: dns.name.Name, named_port: int
) -> None:
"""RFC 4592 section 2.2.1 ghost.*.example."""
- query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
+ query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg)
or name.labels[-len(NESTED_SUFFIX) - 1] != b"*"
)
- query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
+ query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg)
`foo.*.*.*.nestedwild.test. A` must not be synthesized.
"""
- query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
+ query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response_msg, query_msg)
isctest.run.retry_with_timeout(check_line_count, timeout=360)
- axfr_msg = dns.message.make_query("zone000099.example.", "AXFR")
- a_msg = dns.message.make_query("a.changing.", "A")
+ axfr_msg = isctest.query.create("zone000099.example.", "AXFR")
+ a_msg = isctest.query.create("a.changing.", "A")
def query_and_compare(msg):
ns1response = isctest.query.tcp(msg, "10.53.0.1")