timeout: int = QUERY_TIMEOUT,
attempts: int = 10,
expected_rcode: dns_rcode = None,
+ verify: bool = False,
) -> Any:
if port is None:
- port = int(os.environ["PORT"])
+ if query_func.__name__ == "tls":
+ port = int(os.environ["TLSPORT"])
+ else:
+ port = int(os.environ["PORT"])
+
+ query_args = {
+ "q": message,
+ "where": ip,
+ "timeout": timeout,
+ "port": port,
+ "source": source,
+ }
+ if query_func.__name__ == "tls":
+ query_args["verify"] = verify
+
res = None
for attempt in range(attempts):
+ isctest.log.debug(
+ f"{query_func.__name__}(): ip={ip}, port={port}, source={source}, "
+ f"timeout={timeout}, attempts left={attempts-attempt}"
+ )
try:
- isctest.log.debug(
- f"{query_func.__name__}(): ip={ip}, port={port}, source={source}, "
- f"timeout={timeout}, attempts left={attempts-attempt}"
- )
- res = query_func(message, ip, timeout, port=port, source=source)
+ res = query_func(**query_args)
+ except (dns.exception.Timeout, ConnectionRefusedError) as e:
+ isctest.log.debug(f"{query_func.__name__}(): the '{e}' exception raised")
+ else:
if res.rcode() == expected_rcode or expected_rcode is None:
return res
- except (dns.exception.Timeout, ConnectionRefusedError) as e:
- isctest.log.debug(f"{query_func.__name__}(): the '{e}' exceptio raised")
time.sleep(1)
+
if expected_rcode is not None:
last_rcode = dns_rcode.to_text(res.rcode()) if res else None
isctest.log.debug(
def tcp(*args, **kwargs) -> Any:
return generic_query(dns.query.tcp, *args, **kwargs)
+
+
+def tls(*args, **kwargs) -> Any:
+ try:
+ return generic_query(dns.query.tls, *args, **kwargs)
+ except TypeError as e:
+ raise RuntimeError(
+ "dnspython 2.5.0 or newer is required for isctest.query.tls()"
+ ) from e