]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Introduce pytest check_next_key_event, get_keyids
authorMatthijs Mekking <matthijs@isc.org>
Fri, 14 Mar 2025 10:10:22 +0000 (11:10 +0100)
committerMatthijs Mekking <matthijs@isc.org>
Thu, 10 Apr 2025 20:44:31 +0000 (15:44 -0500)
For the kasp tests we need a new utility that can retrieve a list of
Keys from a given directory, belonging to a specific zone. This is
'keydir_to_keylist' and is the replacement of 'kasp.sh:get_keyids()'.

'next_key_event_eqauls' is a method to check when the next key event is
scheduled, needed for the rollover tests, and is the equivalent of shell
script 'check_next_key_event'.

bin/tests/system/isctest/kasp.py

index 1cab813896143099b8774fca8b3582aaa5472ec9..fa3c297f787ff3336ce155ba1255190a50789aa4 100644 (file)
@@ -10,6 +10,7 @@
 # information regarding copyright ownership.
 
 from functools import total_ordering
+import glob
 import os
 from pathlib import Path
 import re
@@ -25,6 +26,8 @@ import isctest.query
 
 DEFAULT_TTL = 300
 
+NEXT_KEY_EVENT_THRESHOLD = 100
+
 
 def _query(server, qname, qtype):
     query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
@@ -1010,6 +1013,43 @@ def check_subdomain(server, zone, ksks, zsks):
     check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
 
 
+def next_key_event_equals(server, zone, next_event):
+    if next_event is None:
+        # No next key event check.
+        return True
+
+    val = int(next_event.total_seconds())
+    if val == 3600:
+        waitfor = rf".*zone {zone}.*: next key event in (.*) seconds"
+    else:
+        # Don't want default loadkeys interval.
+        waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds"
+
+    with server.watch_log_from_start() as watcher:
+        watcher.wait_for_line(re.compile(waitfor))
+
+    # WMM: The with code below is extracting the line the watcher was
+    # waiting for. If WatchLog.wait_for_line()` returned the matched string,
+    # we can use it directly on `re.match`.
+    next_found = False
+    minval = val - NEXT_KEY_EVENT_THRESHOLD
+    maxval = val + NEXT_KEY_EVENT_THRESHOLD
+    with open(f"{server.identifier}/named.run", "r", encoding="utf-8") as fp:
+        for line in fp:
+            match = re.match(waitfor, line)
+            if match is not None:
+                nextval = int(match.group(1))
+                if minval <= nextval <= maxval:
+                    next_found = True
+                    break
+
+                isctest.log.debug(
+                    f"check next key event: expected {val} in: {line.strip()}"
+                )
+
+    return next_found
+
+
 def keydir_to_keylist(
     zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
 ) -> List[Key]: