# information regarding copyright ownership.
from functools import total_ordering
+import glob
import os
from pathlib import Path
import re
DEFAULT_TTL = 300
+NEXT_KEY_EVENT_THRESHOLD = 100
+
def _query(server, qname, qtype):
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)
+def next_key_event_equals(server, zone, next_event):
+ if next_event is None:
+ # No next key event check.
+ return True
+
+ val = int(next_event.total_seconds())
+ if val == 3600:
+ waitfor = rf".*zone {zone}.*: next key event in (.*) seconds"
+ else:
+ # Don't want default loadkeys interval.
+ waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds"
+
+ with server.watch_log_from_start() as watcher:
+ watcher.wait_for_line(re.compile(waitfor))
+
+ # WMM: The with code below is extracting the line the watcher was
+ # waiting for. If WatchLog.wait_for_line()` returned the matched string,
+ # we can use it directly on `re.match`.
+ next_found = False
+ minval = val - NEXT_KEY_EVENT_THRESHOLD
+ maxval = val + NEXT_KEY_EVENT_THRESHOLD
+ with open(f"{server.identifier}/named.run", "r", encoding="utf-8") as fp:
+ for line in fp:
+ match = re.match(waitfor, line)
+ if match is not None:
+ nextval = int(match.group(1))
+ if minval <= nextval <= maxval:
+ next_found = True
+ break
+
+ isctest.log.debug(
+ f"check next key event: expected {val} in: {line.strip()}"
+ )
+
+ return next_found
+
+
def keydir_to_keylist(
zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
) -> List[Key]: