]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Add support for TSIG in isctest.kasp
authorMatthijs Mekking <matthijs@isc.org>
Fri, 14 Mar 2025 10:19:40 +0000 (11:19 +0100)
committerMatthijs Mekking <matthijs@isc.org>
Thu, 10 Apr 2025 20:44:31 +0000 (15:44 -0500)
For some kasp test we are going to need TSIG based queries to
differentiate between views.

bin/tests/system/isctest/kasp.py

index fa3c297f787ff3336ce155ba1255190a50789aa4..7c05206d0f2e82324eff08b168f3022cfde5b0a8 100644 (file)
@@ -21,6 +21,7 @@ from typing import Dict, List, Optional, Union
 from datetime import datetime, timedelta, timezone
 
 import dns
+import dns.tsig
 import isctest.log
 import isctest.query
 
@@ -29,8 +30,14 @@ DEFAULT_TTL = 300
 NEXT_KEY_EVENT_THRESHOLD = 100
 
 
-def _query(server, qname, qtype):
+def _query(server, qname, qtype, tsig=None):
     query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
+
+    if tsig is not None:
+        tsigkey = tsig.split(":")
+        keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
+        query.use_tsig(keyring)
+
     try:
         response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
     except dns.exception.Timeout:
@@ -554,14 +561,14 @@ class Key:
         return self.path
 
 
-def check_zone_is_signed(server, zone):
+def check_zone_is_signed(server, zone, tsig=None):
     addr = server.ip
     fqdn = f"{zone}."
 
     # wait until zone is fully signed
     signed = False
     for _ in range(10):
-        response = _query(server, fqdn, dns.rdatatype.NSEC)
+        response = _query(server, fqdn, dns.rdatatype.NSEC, tsig=tsig)
         if not isinstance(response, dns.message.Message):
             isctest.log.debug(f"no response for {fqdn} NSEC from {addr}")
         elif response.rcode() != dns.rcode.NOERROR:
@@ -711,13 +718,13 @@ def check_keyrelationships(keys, expected):
                 assert key.is_metadata_consistent(status, expect.metadata)
 
 
-def check_dnssec_verify(server, zone):
+def check_dnssec_verify(server, zone, tsig=None):
     # Check if zone if DNSSEC valid with dnssec-verify.
     fqdn = f"{zone}."
 
     verified = False
     for _ in range(10):
-        transfer = _query(server, fqdn, dns.rdatatype.AXFR)
+        transfer = _query(server, fqdn, dns.rdatatype.AXFR, tsig=tsig)
         if not isinstance(transfer, dns.message.Message):
             isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}")
         elif transfer.rcode() != dns.rcode.NOERROR:
@@ -936,8 +943,8 @@ def check_cds(rrset, keys):
     assert numcds == len(cdss)
 
 
-def _query_rrset(server, fqdn, qtype):
-    response = _query(server, fqdn, qtype)
+def _query_rrset(server, fqdn, qtype, tsig=None):
+    response = _query(server, fqdn, qtype, tsig=tsig)
     assert response.rcode() == dns.rcode.NOERROR
 
     rrs = []
@@ -957,46 +964,46 @@ def _query_rrset(server, fqdn, qtype):
     return rrs, rrsigs
 
 
-def check_apex(server, zone, ksks, zsks):
+def check_apex(server, zone, ksks, zsks, tsig=None):
     # Test the apex of a zone. This checks that the SOA and DNSKEY RRsets
     # are signed correctly and with the appropriate keys.
     fqdn = f"{zone}."
 
     # test dnskey query
-    dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY)
+    dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig)
     assert len(dnskeys) > 0
     check_dnskeys(dnskeys, ksks, zsks)
     assert len(rrsigs) > 0
     check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks)
 
     # test soa query
-    soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA)
+    soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig)
     assert len(soa) == 1
     assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text()
     assert len(rrsigs) > 0
     check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks)
 
     # test cdnskey query
-    cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY)
+    cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig)
     check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
     if len(cdnskeys) > 0:
         assert len(rrsigs) > 0
         check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks)
 
     # test cds query
-    cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS)
+    cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig)
     check_cds(cds, ksks)
     if len(cds) > 0:
         assert len(rrsigs) > 0
         check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks)
 
 
-def check_subdomain(server, zone, ksks, zsks):
+def check_subdomain(server, zone, ksks, zsks, tsig=None):
     # Test an RRset below the apex and verify it is signed correctly.
     fqdn = f"{zone}."
     qname = f"a.{zone}."
     qtype = dns.rdatatype.A
-    response = _query(server, qname, qtype)
+    response = _query(server, qname, qtype, tsig=tsig)
     assert response.rcode() == dns.rcode.NOERROR
 
     match = f"{qname} {DEFAULT_TTL} IN A 10.0.0.1"