num += 1
+def check_key_bundle(bundle_keys, bundle_lines, cdnskey=False):
+ count = 0
+ for key in bundle_keys:
+ found = False
+ for line in bundle_lines:
+ if key.dnskey_equals(line, cdnskey):
+ found = True
+ count += 1
+ assert found
+
+ assert count == len(bundle_keys)
+ assert count == len(bundle_lines)
+
+
+def check_cds_bundle(bundle_keys, bundle_lines, expected_cds):
+ count = 0
+ for key in bundle_keys:
+ found = False
+ # the cds of this ksk must be in the ksr
+ for line in bundle_lines:
+ for alg in expected_cds:
+ if key.cds_equals(line, alg.strip()):
+ found = True
+ count += 1
+ assert found
+
+ assert count == len(expected_cds) * len(bundle_keys)
+ assert count == len(bundle_lines)
+
+
+def check_rrsig_bundle(bundle_keys, bundle_lines, zone, rrtype, sigend, sigstart):
+ count = 0
+ for key in bundle_keys:
+ found = False
+ alg = key.get_metadata("Algorithm")
+ expect = f"{zone}. 3600 IN RRSIG {rrtype} {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
+ # there must be a signature of this ksk
+ for line in bundle_lines:
+ rrsig = " ".join(line.split())
+ if expect in rrsig:
+ found = True
+ count += 1
+ assert found
+
+ assert count == len(bundle_keys)
+ assert count == len(bundle_lines)
+
+
def check_keysigningrequest(out, zsks, start, end):
lines = out.split("\n")
line_no = 0
# expect bundle header
assert f";; KeySigningRequest 1.0 {inception}" in lines[line_no]
line_no += 1
+ bundle_keys = []
+ bundle_lines = []
# expect zsks
- for key in sorted(zsks):
+ for key in zsks:
published = key.get_timing("Publish")
if between(published, inception, next_bundle):
next_bundle = published
if removed is not None and inception >= removed:
continue
- # this zsk must be in the ksr
- assert key.dnskey_equals(lines[line_no])
+ # collect keys that should be in this bundle
+ # collect lines that should be in this bundle
+ bundle_keys.append(key)
+ bundle_lines.append(lines[line_no])
line_no += 1
+ check_key_bundle(bundle_keys, bundle_lines)
+
inception = next_bundle
# ksr footer
line_no += 1
# expect ksks
- for key in sorted(ksks):
+ bundle_keys = []
+ bundle_lines = []
+ for key in ksks:
published = key.get_timing("Publish")
if between(published, inception, next_bundle):
next_bundle = published
if between(removed, inception, next_bundle):
next_bundle = removed
- # this ksk must be in the ksr
- assert key.dnskey_equals(lines[line_no])
+ # collect keys that should be in this bundle
+ # collect lines that should be in this bundle
+ bundle_keys.append(key)
+ bundle_lines.append(lines[line_no])
line_no += 1
+ check_key_bundle(bundle_keys, bundle_lines)
+
# expect zsks
- for key in sorted(zsks):
+ bundle_keys = []
+ bundle_lines = []
+ for key in zsks:
published = key.get_timing("Publish")
if between(published, inception, next_bundle):
next_bundle = published
if removed is not None and inception >= removed:
continue
- # this zsk must be in the ksr
- assert key.dnskey_equals(lines[line_no])
+ # collect keys that should be in this bundle
+ # collect lines that should be in this bundle
+ bundle_keys.append(key)
+ bundle_lines.append(lines[line_no])
line_no += 1
+ check_key_bundle(bundle_keys, bundle_lines)
+
# expect rrsig(dnskey)
- for key in sorted(ksks):
+ bundle_keys = []
+ bundle_lines = []
+ for key in ksks:
active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
if active > inception:
if inactive is not None and inception >= inactive:
continue
- # there must be a signature of this ksk
- alg = key.get_metadata("Algorithm")
- expect = f"{zone}. 3600 IN RRSIG DNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
- rrsig = " ".join(lines[line_no].split())
- assert expect in rrsig
+ # collect keys that should be in this bundle
+ # collect lines that should be in this bundle
+ bundle_keys.append(key)
+ bundle_lines.append(lines[line_no])
line_no += 1
+ check_rrsig_bundle(bundle_keys, bundle_lines, zone, "DNSKEY", sigend, sigstart)
+
# expect cdnskey
have_cdnskey = False
if cdnskey:
- for key in sorted(ksks):
+ bundle_keys = []
+ bundle_lines = []
+ for key in ksks:
published = key.get_timing("SyncPublish")
if between(published, inception, next_bundle):
next_bundle = published
if removed is not None and inception >= removed:
continue
- # the cdnskey of this ksk must be in the ksr
- assert key.dnskey_equals(lines[line_no], cdnskey=True)
+ # collect keys that should be in this bundle
+ # collect lines that should be in this bundle
+ bundle_keys.append(key)
+ bundle_lines.append(lines[line_no])
line_no += 1
have_cdnskey = True
+ check_key_bundle(bundle_keys, bundle_lines, cdnskey=True)
+
if have_cdnskey:
# expect rrsig(cdnskey)
- for key in sorted(ksks):
+ bundle_keys = []
+ bundle_lines = []
+ for key in ksks:
active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
if active > inception:
if inactive is not None and inception >= inactive:
continue
- # there must be a signature of this ksk
- alg = key.get_metadata("Algorithm")
- expect = f"{zone}. 3600 IN RRSIG CDNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
- rrsig = " ".join(lines[line_no].split())
- assert expect in rrsig
+ # collect keys that should be in this bundle
+ # collect lines that should be in this bundle
+ bundle_keys.append(key)
+ bundle_lines.append(lines[line_no])
line_no += 1
+ check_rrsig_bundle(
+ bundle_keys, bundle_lines, zone, "CDNSKEY", sigend, sigstart
+ )
+
# expect cds
have_cds = False
if cds != "":
- for key in sorted(ksks):
+ bundle_keys = []
+ bundle_lines = []
+ expected_cds = cds.split(",")
+ for key in ksks:
published = key.get_timing("SyncPublish")
if between(published, inception, next_bundle):
next_bundle = published
if removed is not None and inception >= removed:
continue
- # the cds of this ksk must be in the ksr
- expected_cds = cds.split(",")
- for alg in expected_cds:
- assert key.cds_equals(lines[line_no], alg.strip())
+ # collect keys that should be in this bundle
+ # collect lines that should be in this bundle
+ bundle_keys.append(key)
+ # pylint: disable=unused-variable
+ for _arg in expected_cds:
+ bundle_lines.append(lines[line_no])
line_no += 1
have_cds = True
+ check_cds_bundle(bundle_keys, bundle_lines, expected_cds)
+
if have_cds:
# expect rrsig(cds)
- for key in sorted(ksks):
+ bundle_keys = []
+ bundle_lines = []
+ for key in ksks:
active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
if active > inception:
if inactive is not None and inception >= inactive:
continue
- # there must be a signature of this ksk
- alg = key.get_metadata("Algorithm")
- expect = f"{zone}. 3600 IN RRSIG CDS {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
- rrsig = " ".join(lines[line_no].split())
- assert expect in rrsig
+ # collect keys that should be in this bundle
+ # collect lines that should be in this bundle
+ bundle_keys.append(key)
+ bundle_lines.append(lines[line_no])
line_no += 1
+ check_rrsig_bundle(bundle_keys, bundle_lines, zone, "CDS", sigend, sigstart)
+
inception = next_bundle
# skr footer