return digest_fromfile == digest_fromwire
+ def is_metadata_consistent(self, key, metadata, checkval=True):
+ """
+ If 'key' exists in 'metadata' then it must also exist in the state
+ meta data. Otherwise, it must not exist in the state meta data.
+ If 'checkval' is True, the meta data values must also match.
+ """
+ if key in metadata:
+ if checkval:
+ value = self.get_metadata(key)
+ if value != f"{metadata[key]}":
+ isctest.log.debug(
+ f"{self.name} {key} METADATA MISMATCH: {value} - {metadata[key]}"
+ )
+ return value == f"{metadata[key]}"
+
+ return self.get_metadata(key) != "undefined"
+
+ value = self.get_metadata(key, must_exist=False)
+ if value != "undefined":
+ isctest.log.debug(f"{self.name} {key} METADATA UNEXPECTED: {value}")
+ return value == "undefined"
+
+ def is_timing_consistent(self, key, timing, file, comment=False):
+ """
+ If 'key' exists in 'timing' then it must match the value in the state
+ timing data. Otherwise, it must also not exist in the state timing data.
+ """
+ if key in timing:
+ value = self.get_metadata(key, file=file, comment=comment)
+ if value != str(timing[key]):
+ isctest.log.debug(
+ f"{self.name} {key} TIMING MISMATCH: {value} - {timing[key]}"
+ )
+ return value == str(timing[key])
+
+ value = self.get_metadata(key, file=file, comment=comment, must_exist=False)
+ if value != "undefined":
+ isctest.log.debug(f"{self.name} {key} TIMING UNEXPECTED: {value}")
+ return value == "undefined"
+
+ def match_properties(self, zone, properties):
+ """
+ Check the key with given properties.
+ """
+ if not properties.properties["expect"]:
+ return False
+
+ # Check file existence.
+ # Noop. If file is missing then the get_metadata calls will fail.
+
+ # Check the public key file.
+ role = properties.properties["role_full"]
+ comment = f"This is a {role} key, keyid {self.tag}, for {zone}."
+ if not isctest.util.file_contents_contain(self.keyfile, comment):
+ isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'")
+ return False
+
+ ttl = properties.properties["dnskey_ttl"]
+ flags = properties.properties["flags"]
+ alg = properties.metadata["Algorithm"]
+ dnskey = f"{zone}. {ttl} IN DNSKEY {flags} 3 {alg}"
+ if not isctest.util.file_contents_contain(self.keyfile, dnskey):
+ isctest.log.debug(f"{self.name} DNSKEY MISMATCH: expected '{dnskey}'")
+ return False
+
+ # Now check the private key file.
+ if properties.properties["private"]:
+ # Retrieve creation date.
+ created = self.get_metadata("Generated")
+
+ pval = self.get_metadata("Created", file=self.privatefile)
+ if pval != created:
+ isctest.log.debug(
+ f"{self.name} Created METADATA MISMATCH: {pval} - {created}"
+ )
+ return False
+ pval = self.get_metadata("Private-key-format", file=self.privatefile)
+ if pval != "v1.3":
+ isctest.log.debug(
+ f"{self.name} Private-key-format METADATA MISMATCH: {pval} - v1.3"
+ )
+ return False
+ pval = self.get_metadata("Algorithm", file=self.privatefile)
+ if pval != f"{alg}":
+ isctest.log.debug(
+ f"{self.name} Algorithm METADATA MISMATCH: {pval} - {alg}"
+ )
+ return False
+
+ # Now check the key state file.
+ if properties.properties["legacy"]:
+ return True
+
+ comment = f"This is the state of key {self.tag}, for {zone}."
+ if not isctest.util.file_contents_contain(self.statefile, comment):
+ isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'")
+ return False
+
+ attributes = [
+ "Lifetime",
+ "Algorithm",
+ "Length",
+ "KSK",
+ "ZSK",
+ "GoalState",
+ "DNSKEYState",
+ "KRRSIGState",
+ "ZRRSIGState",
+ "DSState",
+ ]
+ for key in attributes:
+ if not self.is_metadata_consistent(key, properties.metadata):
+ return False
+
+ # A match is found.
+ return True
+
+ def match_timingmetadata(self, timings, file=None, comment=False):
+ if file is None:
+ file = self.statefile
+
+ attributes = [
+ "Generated",
+ "Created",
+ "Published",
+ "Publish",
+ "PublishCDS",
+ "SyncPublish",
+ "Active",
+ "Activate",
+ "Retired",
+ "Inactive",
+ "Revoked",
+ "Removed",
+ "Delete",
+ ]
+ for key in attributes:
+ if not self.is_timing_consistent(key, timings, file, comment=comment):
+ isctest.log.debug(f"{self.name} TIMING METADATA MISMATCH: {key}")
+ return False
+
+ return True
+
def __lt__(self, other: "Key"):
return self.name < other.name
assert signed
+def verify_keys(zone, keys, expected):
+ """
+ Checks keys for a configured zone. This verifies:
+ 1. The expected number of keys exist in 'keys'.
+ 2. The keys match the expected properties.
+ """
+
+ def _verify_keys():
+ # check number of keys matches expected.
+ if len(keys) != len(expected):
+ return False
+
+ if len(keys) == 0:
+ return True
+
+ for expect in expected:
+ expect.key = None
+
+ for key in keys:
+ found = False
+ i = 0
+ while not found and i < len(expected):
+ if expected[i].key is None:
+ found = key.match_properties(zone, expected[i])
+ if found:
+ key.external = expected[i].properties["legacy"]
+ expected[i].key = key
+ i += 1
+ if not found:
+ return False
+
+ return True
+
+ isctest.run.retry_with_timeout(_verify_keys, timeout=10)
+
+
+def check_keytimes(keys, expected):
+ """
+ Check the key timing metadata for all keys in 'keys'.
+ """
+ assert len(keys) == len(expected)
+
+ if len(keys) == 0:
+ return
+
+ for key in keys:
+ for expect in expected:
+ if expect.properties["legacy"]:
+ continue
+
+ if not key is expect.key:
+ continue
+
+ synonyms = {}
+ if "Generated" in expect.timing:
+ synonyms["Created"] = expect.timing["Generated"]
+ if "Published" in expect.timing:
+ synonyms["Publish"] = expect.timing["Published"]
+ if "PublishCDS" in expect.timing:
+ synonyms["SyncPublish"] = expect.timing["PublishCDS"]
+ if "Active" in expect.timing:
+ synonyms["Activate"] = expect.timing["Active"]
+ if "Retired" in expect.timing:
+ synonyms["Inactive"] = expect.timing["Retired"]
+ if "DeleteCDS" in expect.timing:
+ synonyms["SyncDelete"] = expect.timing["DeleteCDS"]
+ if "Revoked" in expect.timing:
+ synonyms["Revoked"] = expect.timing["Revoked"]
+ if "Removed" in expect.timing:
+ synonyms["Delete"] = expect.timing["Removed"]
+
+ assert key.match_timingmetadata(synonyms, file=key.keyfile, comment=True)
+ if expect.properties["private"]:
+ assert key.match_timingmetadata(synonyms, file=key.privatefile)
+ if not expect.properties["legacy"]:
+ assert key.match_timingmetadata(expect.timing)
+
+ state_changes = [
+ "DNSKEYChange",
+ "KRRSIGChange",
+ "ZRRSIGChange",
+ "DSChange",
+ ]
+ for change in state_changes:
+ assert key.is_metadata_consistent(
+ change, expect.timing, checkval=False
+ )
+
+
+def check_keyrelationships(keys, expected):
+ """
+ Check the key relationships (Successor and Predecessor metadata).
+ """
+ for key in keys:
+ for expect in expected:
+ if expect.properties["legacy"]:
+ continue
+
+ if not key is expect.key:
+ continue
+
+ relationship_status = ["Predecessor", "Successor"]
+ for status in relationship_status:
+ assert key.is_metadata_consistent(status, expect.metadata)
+
+
def check_dnssec_verify(server, zone):
# Check if zone if DNSSEC valid with dnssec-verify.
fqdn = f"{zone}."