import os
from pathlib import Path
import re
+import subprocess
import time
from typing import Optional, Union
import dns
import isctest.log
-
+import isctest.query
DEFAULT_TTL = 300
def _query(server, qname, qtype):
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
try:
- response = dns.query.tcp(query, server.ip, port=server.ports.dns, timeout=3)
+ response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
except dns.exception.Timeout:
isctest.log.debug(f"query timeout for query {qname} {qtype} to {server.ip}")
return None
def check_dnssec_verify(server, zone):
# Check if zone if DNSSEC valid with dnssec-verify.
fqdn = f"{zone}."
- transfer = _query(server, fqdn, dns.rdatatype.AXFR)
- if not isinstance(transfer, dns.message.Message):
- isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}")
- elif transfer.rcode() != dns.rcode.NOERROR:
- rcode = dns.rcode.to_text(transfer.rcode())
- isctest.log.debug(f"{rcode} response for {fqdn} AXFR from {server.ip}")
- else:
- zonefile = f"{zone}.axfr"
- with open(zonefile, "w", encoding="utf-8") as file:
- for rr in transfer.answer:
- file.write(rr.to_text())
- file.write("\n")
-
- verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
- isctest.run.cmd(verify_command)
+
+ verified = False
+ for _ in range(10):
+ transfer = _query(server, fqdn, dns.rdatatype.AXFR)
+ if not isinstance(transfer, dns.message.Message):
+ isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}")
+ elif transfer.rcode() != dns.rcode.NOERROR:
+ rcode = dns.rcode.to_text(transfer.rcode())
+ isctest.log.debug(f"{rcode} response for {fqdn} AXFR from {server.ip}")
+ else:
+ zonefile = f"{zone}.axfr"
+ with open(zonefile, "w", encoding="utf-8") as file:
+ for rr in transfer.answer:
+ file.write(rr.to_text())
+ file.write("\n")
+
+ try:
+ verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
+ verified = isctest.run.cmd(verify_command)
+ except subprocess.CalledProcessError:
+ pass
+
+ if verified:
+ break
+
+ time.sleep(1)
+
+ assert verified
def check_dnssecstatus(server, zone, keys, policy=None, view=None):