from datetime import datetime, timedelta, timezone
import dns
+import dns.tsig
import isctest.log
import isctest.query
NEXT_KEY_EVENT_THRESHOLD = 100
-def _query(server, qname, qtype):
+def _query(server, qname, qtype, tsig=None):
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
+
+ if tsig is not None:
+ tsigkey = tsig.split(":")
+ keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
+ query.use_tsig(keyring)
+
try:
response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
except dns.exception.Timeout:
return self.path
-def check_zone_is_signed(server, zone):
+def check_zone_is_signed(server, zone, tsig=None):
addr = server.ip
fqdn = f"{zone}."
# wait until zone is fully signed
signed = False
for _ in range(10):
- response = _query(server, fqdn, dns.rdatatype.NSEC)
+ response = _query(server, fqdn, dns.rdatatype.NSEC, tsig=tsig)
if not isinstance(response, dns.message.Message):
isctest.log.debug(f"no response for {fqdn} NSEC from {addr}")
elif response.rcode() != dns.rcode.NOERROR:
assert key.is_metadata_consistent(status, expect.metadata)
-def check_dnssec_verify(server, zone):
+def check_dnssec_verify(server, zone, tsig=None):
# Check if zone if DNSSEC valid with dnssec-verify.
fqdn = f"{zone}."
verified = False
for _ in range(10):
- transfer = _query(server, fqdn, dns.rdatatype.AXFR)
+ transfer = _query(server, fqdn, dns.rdatatype.AXFR, tsig=tsig)
if not isinstance(transfer, dns.message.Message):
isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}")
elif transfer.rcode() != dns.rcode.NOERROR:
assert numcds == len(cdss)
-def _query_rrset(server, fqdn, qtype):
- response = _query(server, fqdn, qtype)
+def _query_rrset(server, fqdn, qtype, tsig=None):
+ response = _query(server, fqdn, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
rrs = []
return rrs, rrsigs
-def check_apex(server, zone, ksks, zsks):
+def check_apex(server, zone, ksks, zsks, tsig=None):
# Test the apex of a zone. This checks that the SOA and DNSKEY RRsets
# are signed correctly and with the appropriate keys.
fqdn = f"{zone}."
# test dnskey query
- dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY)
+ dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
assert len(dnskeys) > 0
check_dnskeys(dnskeys, ksks, zsks)
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks)
# test soa query
- soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA)
+ soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig)
assert len(soa) == 1
assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text()
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks)
# test cdnskey query
- cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY)
+ cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig)
check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
if len(cdnskeys) > 0:
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks)
# test cds query
- cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS)
+ cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig)
check_cds(cds, ksks)
if len(cds) > 0:
assert len(rrsigs) > 0
check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks)
-def check_subdomain(server, zone, ksks, zsks):
+def check_subdomain(server, zone, ksks, zsks, tsig=None):
# Test an RRset below the apex and verify it is signed correctly.
fqdn = f"{zone}."
qname = f"a.{zone}."
qtype = dns.rdatatype.A
- response = _query(server, qname, qtype)
+ response = _query(server, qname, qtype, tsig=tsig)
assert response.rcode() == dns.rcode.NOERROR
match = f"{qname} {DEFAULT_TTL} IN A 10.0.0.1"