]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Add isctest.query.tls() function
authorMichal Nowak <mnowak@isc.org>
Wed, 17 Jan 2024 19:47:42 +0000 (20:47 +0100)
committerMichal Nowak <mnowak@isc.org>
Fri, 24 Jan 2025 08:56:36 +0000 (08:56 +0000)
When explicitly set to True, the "verify" argument lets dnspython verify
certificates used for the connection. As most certificates in the system
test will inevitably be self-signed, the "verify" argument defaults to
False.

The "verify" argument is present in dnspython since the version 2.5.0.

(cherry picked from commit df8c419058e1e43265928b59f4ae54293f9a32de)

bin/tests/system/isctest/query.py

index 4c389af2d9f9c6cc3025d19fe5b7afad5e4f943c..6e84c8188d209ae63727adcc58a057e1edb26280 100644 (file)
@@ -31,22 +31,39 @@ def generic_query(
     timeout: int = QUERY_TIMEOUT,
     attempts: int = 10,
     expected_rcode: dns_rcode = None,
+    verify: bool = False,
 ) -> Any:
     if port is None:
-        port = int(os.environ["PORT"])
+        if query_func.__name__ == "tls":
+            port = int(os.environ["TLSPORT"])
+        else:
+            port = int(os.environ["PORT"])
+
+    query_args = {
+        "q": message,
+        "where": ip,
+        "timeout": timeout,
+        "port": port,
+        "source": source,
+    }
+    if query_func.__name__ == "tls":
+        query_args["verify"] = verify
+
     res = None
     for attempt in range(attempts):
+        isctest.log.debug(
+            f"{query_func.__name__}(): ip={ip}, port={port}, source={source}, "
+            f"timeout={timeout}, attempts left={attempts-attempt}"
+        )
         try:
-            isctest.log.debug(
-                f"{query_func.__name__}(): ip={ip}, port={port}, source={source}, "
-                f"timeout={timeout}, attempts left={attempts-attempt}"
-            )
-            res = query_func(message, ip, timeout, port=port, source=source)
+            res = query_func(**query_args)
+        except (dns.exception.Timeout, ConnectionRefusedError) as e:
+            isctest.log.debug(f"{query_func.__name__}(): the '{e}' exception raised")
+        else:
             if res.rcode() == expected_rcode or expected_rcode is None:
                 return res
-        except (dns.exception.Timeout, ConnectionRefusedError) as e:
-            isctest.log.debug(f"{query_func.__name__}(): the '{e}' exceptio raised")
         time.sleep(1)
+
     if expected_rcode is not None:
         last_rcode = dns_rcode.to_text(res.rcode()) if res else None
         isctest.log.debug(
@@ -61,3 +78,12 @@ def udp(*args, **kwargs) -> Any:
 
 def tcp(*args, **kwargs) -> Any:
     return generic_query(dns.query.tcp, *args, **kwargs)
+
+
+def tls(*args, **kwargs) -> Any:
+    try:
+        return generic_query(dns.query.tls, *args, **kwargs)
+    except TypeError as e:
+        raise RuntimeError(
+            "dnspython 2.5.0 or newer is required for isctest.query.tls()"
+        ) from e