# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
+from typing import NamedTuple, Tuple
+
import os
import subprocess
import sys
nextname = "a."
labelcount = zone.count(".") # zone is specified as FQDN
types = "NS SOA RRSIG NSEC DNSKEY"
- match = "{0} {1} IN NSEC {2}{0} {3}".format(zone, ttl, nextname, types)
- sig = "{0} {1} IN RRSIG NSEC 13 {2} 300".format(zone, ttl, labelcount)
+ match = f"{zone} {ttl} IN NSEC {nextname}{zone} {types}"
+ sig = f"{zone} {ttl} IN RRSIG NSEC 13 {labelcount} 300"
for rr in response.answer:
if match in rr.to_text():
verify = os.getenv("VERIFY")
assert verify is not None
- filename = "{}out".format(zone)
+ filename = f"{zone}out"
with open(filename, "w", encoding="utf-8") as file:
for rr in transfer.answer:
file.write(rr.to_text())
verifier = subprocess.run(verify_cmd, capture_output=True, check=True)
if verifier.returncode != 0:
- print("error: dnssec-verify {} failed".format(zone))
+ print(f"error: dnssec-verify {zone} failed")
sys.stderr.buffer.write(verifier.stderr)
return verifier.returncode == 0
response = do_query(server, zone, "DS", tcp=True)
if not isinstance(response, dns.message.Message):
- print("error: no response for {} DS from {}".format(zone, addr))
+ print(f"error: no response for {zone} DS from {addr}")
return {}
if response.rcode() == dns.rcode.NOERROR:
if count != 1:
print(
- "error: expected a single DS in response for {} from {},"
- "got {}".format(zone, addr, count)
+ f"error: expected a single DS in response for {zone} from {addr}, got {count}"
)
return {}
else:
- print(
- "error: {} response for {} DNSKEY from {}".format(
- dns.rcode.to_text(response.rcode()), zone, addr
- )
- )
+ rcode = dns.rcode.to_text(response.rcode())
+ print(f"error: {rcode} response for {zone} DNSKEY from {addr}")
return {}
- filename = "ns9/K{}+013+{:05d}.state".format(zone, keyid)
- print("read state file {}".format(filename))
+ filename = f"ns9/K{zone}+013+{keyid:05d}.state"
+ print(f"read state file {filename}")
try:
with open(filename, "r", encoding="utf-8") as file:
def zone_check(server, zone):
addr = server.ip
- fqdn = "{}.".format(zone)
+ fqdn = f"{zone}."
# wait until zone is fully signed.
signed = False
for _ in range(10):
response = do_query(server, fqdn, "NSEC")
if not isinstance(response, dns.message.Message):
- print("error: no response for {} NSEC from {}".format(fqdn, addr))
+ print(f"error: no response for {fqdn} NSEC from {addr}")
elif response.rcode() == dns.rcode.NOERROR:
signed = has_signed_apex_nsec(fqdn, response)
else:
- print(
- "error: {} response for {} NSEC from {}".format(
- dns.rcode.to_text(response.rcode()), fqdn, addr
- )
- )
+ rcode = dns.rcode.to_text(response.rcode())
+ print(f"error: {rcode} response for {fqdn} NSEC from {addr}")
if signed:
break
verified = False
transfer = do_query(server, fqdn, "AXFR", tcp=True)
if not isinstance(transfer, dns.message.Message):
- print("error: no response for {} AXFR from {}".format(fqdn, addr))
+ print(f"error: no response for {fqdn} AXFR from {addr}")
elif transfer.rcode() == dns.rcode.NOERROR:
verified = verify_zone(fqdn, transfer)
else:
- print(
- "error: {} response for {} AXFR from {}".format(
- dns.rcode.to_text(transfer.rcode()), fqdn, addr
- )
- )
+ rcode = dns.rcode.to_text(transfer.rcode())
+ print(f"error: {rcode} response for {fqdn} AXFR from {addr}")
assert verified
def keystate_check(server, zone, key):
- fqdn = "{}.".format(zone)
+ fqdn = f"{zone}."
val = 0
deny = False
controller = subprocess.run(rndc_cmd, capture_output=True, check=True)
if controller.returncode != 0:
- print("error: rndc loadkeys {} failed".format(zone))
+ print(f"error: rndc loadkeys {zone} failed")
sys.stderr.buffer.write(controller.stderr)
assert controller.returncode == 0
-def checkds_dspublished(named_port, servers, checkds, addr):
- #
- # 1.1.1: DS is correctly published in parent.
- # parental-agents: ns2
- #
-
- # The simple case.
- zone = "good.{}.dspublish.ns2".format(checkds)
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from {addr}"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "DSPublish")
-
- #
- # 1.1.2: DS is not published in parent.
- # parental-agents: ns5
- #
- zone = "not-yet.{}.dspublish.ns5".format(checkds)
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.5"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "!DSPublish")
-
- #
- # 1.1.3: The parental agent is badly configured.
- # parental-agents: ns6
- #
- zone = "bad.{}.dspublish.ns6".format(checkds)
- zone_check(servers["ns9"], zone)
- if checkds == "explicit":
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: bad DS response from 10.53.0.6"
- watcher.wait_for_line(line)
- elif checkds == "yes":
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: error during parental-agents processing"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "!DSPublish")
-
- #
- # 1.1.4: DS is published, but has bogus signature.
- #
- # TBD
-
- #
- # 1.2.1: DS is correctly published in all parents.
- # parental-agents: ns2, ns4
- #
- zone = "good.{}.dspublish.ns2-4".format(checkds)
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from {addr}"
- watcher.wait_for_line(line)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.4"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "DSPublish")
-
- #
- # 1.2.2: DS is not published in some parents.
- # parental-agents: ns2, ns4, ns5
- #
- zone = "incomplete.{}.dspublish.ns2-4-5".format(checkds)
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from {addr}"
- watcher.wait_for_line(line)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.4"
- watcher.wait_for_line(line)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.5"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "!DSPublish")
-
- #
- # 1.2.3: One parental agent is badly configured.
- # parental-agents: ns2, ns4, ns6
- #
- zone = "bad.{}.dspublish.ns2-4-6".format(checkds)
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from {addr}"
- watcher.wait_for_line(line)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.4"
- watcher.wait_for_line(line)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: bad DS response from 10.53.0.6"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "!DSPublish")
-
- #
- # 1.2.4: DS is completely published, bogus signature.
- #
- # TBD
-
- # TBD: Check with TSIG
- # TBD: Check with TLS
-
-
-def checkds_dswithdrawn(named_port, servers, checkds, addr):
- #
- # 2.1.1: DS correctly withdrawn from the parent.
- # parental-agents: ns5
- #
-
- # The simple case.
- zone = "good.{}.dsremoved.ns5".format(checkds)
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: empty DS response from {addr}"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "DSRemoved")
-
- #
- # 2.1.2: DS is published in the parent.
- # parental-agents: ns2
- #
- zone = "still-there.{}.dsremoved.ns2".format(checkds)
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.2"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "!DSRemoved")
-
- #
- # 2.1.3: The parental agent is badly configured.
- # parental-agents: ns6
- #
- zone = "bad.{}.dsremoved.ns6".format(checkds)
- zone_check(servers["ns9"], zone)
- if checkds == "explicit":
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: bad DS response from 10.53.0.6"
- watcher.wait_for_line(line)
- elif checkds == "yes":
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: error during parental-agents processing"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "!DSRemoved")
-
- #
- # 2.1.4: DS is withdrawn, but has bogus signature.
- #
- # TBD
-
- #
- # 2.2.1: DS is correctly withdrawn from all parents.
- # parental-agents: ns5, ns7
- #
- zone = "good.{}.dsremoved.ns5-7".format(checkds)
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: empty DS response from {addr}"
- watcher.wait_for_line(line)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.7"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "DSRemoved")
-
- #
- # 2.2.2: DS is not withdrawn from some parents.
- # parental-agents: ns2, ns5, ns7
- #
- zone = "incomplete.{}.dsremoved.ns2-5-7".format(checkds)
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.2"
- watcher.wait_for_line(line)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: empty DS response from {addr}"
- watcher.wait_for_line(line)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.7"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "!DSRemoved")
-
- #
- # 2.2.3: One parental agent is badly configured.
- # parental-agents: ns5, ns6, ns7
- #
- zone = "bad.{}.dsremoved.ns5-6-7".format(checkds)
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: empty DS response from {addr}"
- watcher.wait_for_line(line)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.7"
- watcher.wait_for_line(line)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: bad DS response from 10.53.0.6"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "!DSRemoved")
-
- #
- # 2.2.4:: DS is removed completely, bogus signature.
- #
- # TBD
-
-
-def test_checkds_reference(named_port, servers):
- # Using a reference to parental-agents.
- zone = "reference.explicit.dspublish.ns2"
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.8"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "DSPublish")
+class CheckDSTest(NamedTuple):
+ zone: str
+ logs_to_wait_for: Tuple[str]
+ expected_parent_state: str
-def test_checkds_resolver(named_port, servers):
+parental_agents_tests = [
+ # Using a reference to parental-agents.
+ CheckDSTest(
+ zone="reference.explicit.dspublish.ns2",
+ logs_to_wait_for=("DS response from 10.53.0.8",),
+ expected_parent_state="DSPublish",
+ ),
# Using a resolver as parental-agent (ns3).
- zone = "resolver.explicit.dspublish.ns2"
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.3"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "DSPublish")
-
+ CheckDSTest(
+ zone="resolver.explicit.dspublish.ns2",
+ logs_to_wait_for=("DS response from 10.53.0.3",),
+ expected_parent_state="DSPublish",
+ ),
# Using a resolver as parental-agent (ns3).
- zone = "resolver.explicit.dsremoved.ns5"
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.3"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "DSRemoved")
-
-
-def test_checkds_no_ent(named_port, servers):
- zone = "no-ent.ns2"
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.2"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "DSPublish")
-
- zone = "no-ent.ns5"
- zone_check(servers["ns9"], zone)
- with servers["ns9"].watch_log_from_start() as watcher:
- line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.5"
- watcher.wait_for_line(line)
- keystate_check(servers["ns2"], zone, "DSRemoved")
-
-
-def test_checkds_dspublished(named_port, servers):
- checkds_dspublished(named_port, servers, "explicit", "10.53.0.8")
- checkds_dspublished(named_port, servers, "yes", "10.53.0.2")
+ CheckDSTest(
+ zone="resolver.explicit.dsremoved.ns5",
+ logs_to_wait_for=("empty DS response from 10.53.0.3",),
+ expected_parent_state="DSRemoved",
+ ),
+]
+
+no_ent_tests = [
+ CheckDSTest(
+ zone="no-ent.ns2",
+ logs_to_wait_for=("DS response from 10.53.0.2",),
+ expected_parent_state="DSPublish",
+ ),
+ CheckDSTest(
+ zone="no-ent.ns5",
+ logs_to_wait_for=("DS response from 10.53.0.5",),
+ expected_parent_state="DSRemoved",
+ ),
+]
+
+
+def dspublished_tests(checkds, addr):
+ return [
+ #
+ # 1.1.1: DS is correctly published in parent.
+ # parental-agents: ns2
+ #
+ # The simple case.
+ CheckDSTest(
+ zone=f"good.{checkds}.dspublish.ns2",
+ logs_to_wait_for=(f"DS response from {addr}",),
+ expected_parent_state="DSPublish",
+ ),
+ #
+ # 1.1.2: DS is not published in parent.
+ # parental-agents: ns5
+ #
+ CheckDSTest(
+ zone=f"not-yet.{checkds}.dspublish.ns5",
+ logs_to_wait_for=("empty DS response from 10.53.0.5",),
+ expected_parent_state="!DSPublish",
+ ),
+ #
+ # 1.1.3: The parental agent is badly configured.
+ # parental-agents: ns6
+ #
+ CheckDSTest(
+ zone=f"bad.{checkds}.dspublish.ns6",
+ logs_to_wait_for=(
+ "bad DS response from 10.53.0.6"
+ if checkds == "explicit"
+ else "error during parental-agents processing",
+ ),
+ expected_parent_state="!DSPublish",
+ ),
+ #
+ # 1.1.4: DS is published, but has bogus signature.
+ #
+ # TBD
+ #
+ # 1.2.1: DS is correctly published in all parents.
+ # parental-agents: ns2, ns4
+ #
+ CheckDSTest(
+ zone=f"good.{checkds}.dspublish.ns2-4",
+ logs_to_wait_for=(f"DS response from {addr}", "DS response from 10.53.0.4"),
+ expected_parent_state="DSPublish",
+ ),
+ #
+ # 1.2.2: DS is not published in some parents.
+ # parental-agents: ns2, ns4, ns5
+ #
+ CheckDSTest(
+ zone=f"incomplete.{checkds}.dspublish.ns2-4-5",
+ logs_to_wait_for=(
+ f"DS response from {addr}",
+ "DS response from 10.53.0.4",
+ "empty DS response from 10.53.0.5",
+ ),
+ expected_parent_state="!DSPublish",
+ ),
+ #
+ # 1.2.3: One parental agent is badly configured.
+ # parental-agents: ns2, ns4, ns6
+ #
+ CheckDSTest(
+ zone=f"bad.{checkds}.dspublish.ns2-4-6",
+ logs_to_wait_for=(
+ f"DS response from {addr}",
+ "DS response from 10.53.0.4",
+ "bad DS response from 10.53.0.6",
+ ),
+ expected_parent_state="!DSPublish",
+ ),
+ #
+ # 1.2.4: DS is completely published, bogus signature.
+ #
+ # TBD
+ # TBD: Check with TSIG
+ # TBD: Check with TLS
+ ]
-def test_checkds_dswithdrawn(named_port, servers):
- checkds_dswithdrawn(named_port, servers, "explicit", "10.53.0.10")
- checkds_dswithdrawn(named_port, servers, "yes", "10.53.0.5")
+def dswithdrawn_tests(checkds, addr):
+ return [
+ #
+ # 2.1.1: DS correctly withdrawn from the parent.
+ # parental-agents: ns5
+ #
+ # The simple case.
+ CheckDSTest(
+ zone=f"good.{checkds}.dsremoved.ns5",
+ logs_to_wait_for=(f"empty DS response from {addr}",),
+ expected_parent_state="DSRemoved",
+ ),
+ #
+ # 2.1.2: DS is published in the parent.
+ # parental-agents: ns2
+ #
+ CheckDSTest(
+ zone=f"still-there.{checkds}.dsremoved.ns2",
+ logs_to_wait_for=("DS response from 10.53.0.2",),
+ expected_parent_state="!DSRemoved",
+ ),
+ #
+ # 2.1.3: The parental agent is badly configured.
+ # parental-agents: ns6
+ #
+ CheckDSTest(
+ zone=f"bad.{checkds}.dsremoved.ns6",
+ logs_to_wait_for=(
+ "bad DS response from 10.53.0.6"
+ if checkds == "explicit"
+ else "error during parental-agents processing",
+ ),
+ expected_parent_state="!DSRemoved",
+ ),
+ #
+ # 2.1.4: DS is withdrawn, but has bogus signature.
+ #
+ # TBD
+ #
+ # 2.2.1: DS is correctly withdrawn from all parents.
+ # parental-agents: ns5, ns7
+ #
+ CheckDSTest(
+ zone=f"good.{checkds}.dsremoved.ns5-7",
+ logs_to_wait_for=(
+ f"empty DS response from {addr}",
+ "empty DS response from 10.53.0.7",
+ ),
+ expected_parent_state="DSRemoved",
+ ),
+ #
+ # 2.2.2: DS is not withdrawn from some parents.
+ # parental-agents: ns2, ns5, ns7
+ #
+ CheckDSTest(
+ zone=f"incomplete.{checkds}.dsremoved.ns2-5-7",
+ logs_to_wait_for=(
+ "DS response from 10.53.0.2",
+ f"empty DS response from {addr}",
+ "empty DS response from 10.53.0.7",
+ ),
+ expected_parent_state="!DSRemoved",
+ ),
+ #
+ # 2.2.3: One parental agent is badly configured.
+ # parental-agents: ns5, ns6, ns7
+ #
+ CheckDSTest(
+ zone=f"bad.{checkds}.dsremoved.ns5-6-7",
+ logs_to_wait_for=(
+ f"empty DS response from {addr}",
+ "empty DS response from 10.53.0.7",
+ "bad DS response from 10.53.0.6",
+ ),
+ expected_parent_state="!DSRemoved",
+ ),
+ #
+ # 2.2.4:: DS is removed completely, bogus signature.
+ #
+ # TBD
+ ]
-def test_checkds_no(named_port, servers):
- zone_check(servers["ns9"], "good.no.dspublish.ns2")
- keystate_check(servers["ns2"], "good.no.dspublish.ns2", "!DSPublish")
+checkds_no_tests = [
+ CheckDSTest(
+ zone="good.no.dspublish.ns2",
+ logs_to_wait_for=(),
+ expected_parent_state="!DSPublish",
+ ),
+ CheckDSTest(
+ zone="good.no.dspublish.ns2-4",
+ logs_to_wait_for=(),
+ expected_parent_state="!DSPublish",
+ ),
+ CheckDSTest(
+ zone="good.no.dsremoved.ns5",
+ logs_to_wait_for=(),
+ expected_parent_state="!DSRemoved",
+ ),
+ CheckDSTest(
+ zone="good.no.dsremoved.ns5-7",
+ logs_to_wait_for=(),
+ expected_parent_state="!DSRemoved",
+ ),
+]
+
+
+checkds_tests = (
+ parental_agents_tests
+ + no_ent_tests
+ + dspublished_tests("explicit", "10.53.0.8")
+ + dspublished_tests("yes", "10.53.0.2")
+ + dswithdrawn_tests("explicit", "10.53.0.10")
+ + dswithdrawn_tests("yes", "10.53.0.5")
+ + checkds_no_tests
+)
- zone_check(servers["ns9"], "good.no.dspublish.ns2-4")
- keystate_check(servers["ns2"], "good.no.dspublish.ns2-4", "!DSPublish")
- zone_check(servers["ns9"], "good.no.dsremoved.ns5")
- keystate_check(servers["ns2"], "good.no.dsremoved.ns5", "!DSRemoved")
+@pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone)
+def test_checkds(servers, params):
+ # Wait until the provided zone is signed and then verify its DNSSEC data.
+ zone_check(servers["ns9"], params.zone)
+
+ # Wait until all the expected log lines are found in the log file for the
+ # provided server.
+ for log_string in params.logs_to_wait_for:
+ for _ in range(10):
+ with servers["ns9"].watch_log_from_start() as watcher:
+ line = f"zone {params.zone}/IN (signed): checkds: {log_string}"
+ try:
+ watcher.wait_for_line(line, timeout=1)
+ except TimeoutError:
+ rekey(params.zone)
+ else:
+ break
+ else:
+ raise TimeoutError
- zone_check(servers["ns9"], "good.no.dsremoved.ns5-7")
- keystate_check(servers["ns2"], "good.no.dsremoved.ns5-7", "!DSRemoved")
+ # Check whether key states on the parent server provided match
+ # expectations.
+ keystate_check(servers["ns2"], params.zone, params.expected_parent_state)