import re
import subprocess
import time
-from typing import List, Optional, Union
+from typing import Dict, List, Optional, Union
from datetime import datetime, timedelta, timezone
return result
+class KeyProperties:
+ """
+ Represent the (expected) properties a key should have.
+ """
+
+ def __init__(
+ self,
+ name: str,
+ properties: dict,
+ metadata: dict,
+ timing: Dict[str, KeyTimingMetadata],
+ ):
+ self.name = name
+ self.key = None
+ self.properties = properties
+ self.metadata = metadata
+ self.timing = timing
+
+ def __repr__(self):
+ return self.name
+
+ def __str__(self) -> str:
+ return self.name
+
+ @staticmethod
+ def default(with_state=True) -> "KeyProperties":
+ properties = {
+ "expect": True,
+ "private": True,
+ "legacy": False,
+ "role": "csk",
+ "role_full": "key-signing",
+ "dnskey_ttl": 3600,
+ "flags": 257,
+ }
+ metadata = {
+ "Algorithm": isctest.vars.algorithms.ECDSAP256SHA256.number,
+ "Length": 256,
+ "Lifetime": 0,
+ "KSK": "yes",
+ "ZSK": "yes",
+ }
+ timing: Dict[str, KeyTimingMetadata] = {}
+
+ result = KeyProperties(
+ name="DEFAULT", properties=properties, metadata=metadata, timing=timing
+ )
+ result.name = "DEFAULT"
+ result.key = None
+ if with_state:
+ result.metadata["GoalState"] = "omnipresent"
+ result.metadata["DNSKEYState"] = "rumoured"
+ result.metadata["KRRSIGState"] = "rumoured"
+ result.metadata["ZRRSIGState"] = "rumoured"
+ result.metadata["DSState"] = "hidden"
+
+ return result
+
+ def Ipub(self, config):
+ ipub = timedelta(0)
+
+ if self.key.get_metadata("Predecessor", must_exist=False) != "undefined":
+ # Ipub = Dprp + TTLkey
+ ipub = (
+ config["dnskey-ttl"]
+ + config["zone-propagation-delay"]
+ + config["publish-safety"]
+ )
+
+ self.timing["Active"] = self.timing["Published"] + ipub
+
+ def IpubC(self, config):
+ if not self.key.is_ksk():
+ return
+
+ ttl1 = config["dnskey-ttl"] + config["publish-safety"]
+ ttl2 = timedelta(0)
+
+ if self.key.get_metadata("Predecessor", must_exist=False) == "undefined":
+ # If this is the first key, we also need to wait until the zone
+ # signatures are omnipresent. Use max-zone-ttl instead of
+ # dnskey-ttl, and no publish-safety (because we are looking at
+ # signatures here, not the public key).
+ ttl2 = config["max-zone-ttl"]
+
+ # IpubC = DprpC + TTLkey
+ ipubc = config["zone-propagation-delay"] + max(ttl1, ttl2)
+
+ self.timing["PublishCDS"] = self.timing["Published"] + ipubc
+
+ if self.metadata["Lifetime"] != 0:
+ self.timing["DeleteCDS"] = (
+ self.timing["PublishCDS"] + self.metadata["Lifetime"]
+ )
+
+ def Iret(self, config):
+ if self.metadata["Lifetime"] == 0:
+ return
+
+ sign_delay = config["signatures-validity"] - config["signatures-refresh"]
+ safety_interval = config["retire-safety"]
+
+ iretKSK = timedelta(0)
+ iretZSK = timedelta(0)
+ if self.key.is_ksk():
+ # Iret = DprpP + TTLds
+ iretKSK = (
+ config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
+ )
+ if self.key.is_zsk():
+ # Iret = Dsgn + Dprp + TTLsig
+ iretZSK = (
+ sign_delay
+ + config["zone-propagation-delay"]
+ + config["max-zone-ttl"]
+ + safety_interval
+ )
+
+ self.timing["Removed"] = self.timing["Retired"] + max(iretKSK, iretZSK)
+
+ def set_expected_keytimes(self, config, offset=None, pregenerated=False):
+ if self.key is None:
+ raise ValueError("KeyProperties must be attached to a Key")
+
+ if self.properties["legacy"]:
+ return
+
+ if offset is None:
+ offset = self.properties["offset"]
+
+ self.timing["Generated"] = self.key.get_timing("Created")
+
+ self.timing["Published"] = self.timing["Generated"]
+ if pregenerated:
+ self.timing["Published"] = self.key.get_timing("Publish")
+ self.timing["Published"] = self.timing["Published"] + offset
+ self.Ipub(config)
+
+ # Set Retired timing metadata if key has lifetime.
+ if self.metadata["Lifetime"] != 0:
+ self.timing["Retired"] = self.timing["Active"] + self.metadata["Lifetime"]
+
+ self.IpubC(config)
+ self.Iret(config)
+
+ # Key state change times must exist, but since we cannot reliably tell
+ # when named made the actual state change, we don't care what the
+ # value is. Set it to None will verify that the metadata exists, but
+ # without actual checking the value.
+ self.timing["DNSKEYChange"] = None
+
+ if self.key.is_ksk():
+ self.timing["DSChange"] = None
+ self.timing["KRRSIGChange"] = None
+
+ if self.key.is_zsk():
+ self.timing["ZRRSIGChange"] = None
+
+
@total_ordering
class Key:
"""
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
+def keydir_to_keylist(
+ zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
+) -> List[Key]:
+ """
+ Retrieve all keys from the key files in a directory. If 'zone' is None,
+ retrieve all keys in the directory, otherwise only those matching the
+ zone name. If 'keydir' is None, search the current directory.
+ """
+ if zone is None:
+ zone = ""
+
+ all_keys = []
+ if keydir is None:
+ regex = rf"(K{zone}\.\+.*\+.*)\.key"
+ for filename in glob.glob(f"K{zone}.+*+*.key"):
+ match = re.match(regex, filename)
+ if match is not None:
+ all_keys.append(Key(match.group(1)))
+ else:
+ regex = rf"{keydir}/(K{zone}\.\+.*\+.*)\.key"
+ for filename in glob.glob(f"{keydir}/K{zone}.+*+*.key"):
+ match = re.match(regex, filename)
+ if match is not None:
+ all_keys.append(Key(match.group(1), keydir))
+
+ states = ["GoalState", "DNSKEYState", "KRRSIGState", "ZRRSIGState", "DSState"]
+
+ def used(kk):
+ if not in_use:
+ return True
+
+ for state in states:
+ val = kk.get_metadata(state, must_exist=False)
+ if val not in ["undefined", "hidden"]:
+ isctest.log.debug(f"key {kk} in use")
+ return True
+
+ return False
+
+ return [k for k in all_keys if used(k)]
+
+
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
return [Key(name, keydir) for name in keystr.split()]