import dns.exception
import dns.message
import dns.name
-import dns.query
import dns.rcode
import dns.rdataclass
import dns.rdatatype
def do_query(server, qname, qtype, tcp=False):
- query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
- try:
- if tcp:
- response = dns.query.tcp(query, server.ip, timeout=3, port=server.ports.dns)
- else:
- response = dns.query.udp(query, server.ip, timeout=3, port=server.ports.dns)
- except dns.exception.Timeout:
- print(f"error: query timeout for query {qname} {qtype} to {server.ip}")
- return None
-
+ msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
+ query_func = isctest.query.tcp if tcp else isctest.query.udp
+ response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
return response
def read_statefile(server, zone):
- addr = server.ip
count = 0
keyid = 0
state = {}
response = do_query(server, zone, "DS", tcp=True)
- if not isinstance(response, dns.message.Message):
- print(f"error: no response for {zone} DS from {addr}")
- return {}
-
- if response.rcode() == dns.rcode.NOERROR:
- # fetch key id from response.
- for rr in response.answer:
- if rr.match(
- dns.name.from_text(zone),
- dns.rdataclass.IN,
- dns.rdatatype.DS,
- dns.rdatatype.NONE,
- ):
- if count == 0:
- keyid = list(dict(rr.items).items())[0][0].key_tag
- count += 1
-
- if count != 1:
- print(
- f"error: expected a single DS in response for {zone} from {addr}, got {count}"
- )
- return {}
- else:
- rcode = dns.rcode.to_text(response.rcode())
- print(f"error: {rcode} response for {zone} DNSKEY from {addr}")
- return {}
+ # fetch key id from response.
+ for rr in response.answer:
+ if rr.match(
+ dns.name.from_text(zone),
+ dns.rdataclass.IN,
+ dns.rdatatype.DS,
+ dns.rdatatype.NONE,
+ ):
+ if count == 0:
+ keyid = list(dict(rr.items).items())[0][0].key_tag
+ count += 1
+
+ assert (
+ count == 1
+ ), f"expected a single DS in response for {zone} from {server.ip}, got {count}"
filename = f"ns9/K{zone}+013+{keyid:05d}.state"
print(f"read state file {filename}")
continue
key, val = line.strip().split(":", 1)
state[key.strip()] = val.strip()
-
except FileNotFoundError:
# file may not be written just yet.
return {}
def zone_check(server, zone):
- addr = server.ip
fqdn = f"{zone}."
- # wait until zone is fully signed.
- signed = False
- for _ in range(10):
- response = do_query(server, fqdn, "NSEC")
- if not isinstance(response, dns.message.Message):
- print(f"error: no response for {fqdn} NSEC from {addr}")
- elif response.rcode() == dns.rcode.NOERROR:
- signed = has_signed_apex_nsec(fqdn, response)
- else:
- rcode = dns.rcode.to_text(response.rcode())
- print(f"error: {rcode} response for {fqdn} NSEC from {addr}")
-
- if signed:
- break
-
- time.sleep(1)
-
- assert signed
+ # check zone is fully signed.
+ response = do_query(server, fqdn, "NSEC")
+ assert has_signed_apex_nsec(fqdn, response)
# check if zone if DNSSEC valid.
- verified = False
transfer = do_query(server, fqdn, "AXFR", tcp=True)
- if not isinstance(transfer, dns.message.Message):
- print(f"error: no response for {fqdn} AXFR from {addr}")
- elif transfer.rcode() == dns.rcode.NOERROR:
- verified = verify_zone(fqdn, transfer)
- else:
- rcode = dns.rcode.to_text(transfer.rcode())
- print(f"error: {rcode} response for {fqdn} AXFR from {addr}")
-
- assert verified
+ assert verify_zone(fqdn, transfer)
def keystate_check(server, zone, key):
# information regarding copyright ownership.
import pytest
+import isctest
pytest.importorskip("dns")
import dns.message
-import dns.query
-import dns.rcode
-def test_connreset(named_port):
+def test_connreset():
msg = dns.message.make_query(
"sub.example.", "A", want_dnssec=True, use_edns=0, payload=1232
)
- ans = dns.query.udp(msg, "10.53.0.2", timeout=10, port=named_port)
- assert ans.rcode() == dns.rcode.SERVFAIL
+ res = isctest.query.udp(msg, "10.53.0.2")
+ isctest.check.servfail(res)
# information regarding copyright ownership.
import pytest
+import isctest
pytest.importorskip("dns")
import dns.message
-import dns.query
-import dns.rcode
-def test_async_hook(named_port):
- msg = dns.message.make_query(
- "example.com.",
- "A",
- )
- ans = dns.query.udp(msg, "10.53.0.1", timeout=10, port=named_port)
+def test_async_hook():
+ msg = dns.message.make_query("example.com.", "A")
+ res = isctest.query.udp(msg, "10.53.0.1")
# the test-async plugin changes the status of any positive answer to NOTIMP
- assert ans.rcode() == dns.rcode.NOTIMP
+ isctest.check.notimp(res)
import time
import pytest
+import isctest
pytest.importorskip("dns")
import dns.message
-import dns.query
-import dns.rcode
-def test_cve_2023_3341(named_port, control_port):
+def test_cve_2023_3341(control_port):
depth = 4500
# Should not be more than isccc_ccmsg_setmaxsize(&conn->ccmsg, 32768)
total_len = 10 + (depth * 7) - 6
# Wait for named to (possibly) crash
time.sleep(10)
+
msg = dns.message.make_query("version.bind", "TXT", "CH")
- ans = dns.query.udp(msg, "10.53.0.2", timeout=10, port=named_port)
- assert ans.rcode() == dns.rcode.NOERROR
+ res = isctest.query.udp(msg, "10.53.0.2")
+ isctest.check.noerror(res)
import os
import time
-import dns.query
-import dns.update
-
import isctest
+import dns.update
+
def rndc_loop(test_state, server):
rndc = os.getenv("RNDC")
time.sleep(1)
-def update_zone(test_state, zone, named_port):
+def update_zone(test_state, zone):
server = "10.53.0.2"
for i in range(1000):
if test_state["finished"]:
update = dns.update.UpdateMessage(zone)
update.add(f"dynamic-{i}.{zone}", 300, "TXT", f"txt-{i}")
try:
- response = dns.query.udp(update, server, 10, named_port)
+ response = isctest.query.udp(update, server)
assert response.rcode() == dns.rcode.NOERROR
except dns.exception.Timeout:
isctest.log.info(f"error: query timeout for {zone}")
# If the test has run to completion without named crashing, it has succeeded.
-def test_update_stress(named_port):
+def test_update_stress():
test_state = {"finished": False}
with concurrent.futures.ThreadPoolExecutor() as executor:
updaters = []
for i in range(5):
zone = f"zone00000{i}.example."
- updaters.append(executor.submit(update_zone, test_state, zone, named_port))
+ updaters.append(executor.submit(update_zone, test_state, zone))
# All the update_zone() tasks are expected to complete within 5
# minutes. If they do not, we cannot assert immediately as that will
import pytest
+import isctest
+
pytest.importorskip("dns")
import dns.message
import dns.name
-import dns.query
import dns.rdata
import dns.rdataclass
import dns.rdatatype
print(query.to_text())
print()
- response = dns.query.tcp(query, opts.server_ip, timeout=2, port=opts.server_port)
+ response = isctest.query.tcp(query, opts.server_ip, timeout=2)
print("# < " + str(datetime.datetime.now()))
print(response.to_text())
print()
-def test_cve_2020_8625(named_port):
+def test_cve_2020_8625():
"""
Reproducer for CVE-2020-8625. When run for an affected BIND 9 version,
send_crafted_tkey_query() will raise a network-related exception due to
for i in range(0, 50):
opts = argparse.Namespace(
server_ip="10.53.0.1",
- server_port=named_port,
real_oid_length=i,
extra_oid_length=0,
)
send_crafted_tkey_query(opts)
-def test_cve_2021_25216(named_port):
+def test_cve_2021_25216():
"""
Reproducer for CVE-2021-25216. When run for an affected BIND 9 version,
send_crafted_tkey_query() will raise a network-related exception due to
"""
opts = argparse.Namespace(
server_ip="10.53.0.1",
- server_port=named_port,
real_oid_length=1,
extra_oid_length=1073741824,
)
import pytest
+import isctest
+
pytest.importorskip("dns")
import dns.message
-import dns.query
@pytest.mark.parametrize(
("max-example.", "MX", 60),
],
)
-def test_cache_ttl(qname, rdtype, expected_ttl, named_port):
+def test_cache_ttl(qname, rdtype, expected_ttl):
msg = dns.message.make_query(qname, rdtype)
- response = dns.query.udp(msg, "10.53.0.2", timeout=10, port=named_port)
+ response = isctest.query.udp(msg, "10.53.0.2")
for rr in response.answer + response.authority:
assert rr.ttl == expected_ttl