def test_dnstap_dispatch_socket_addresses():
# Send some query to ns3 so that it records something in its dnstap file.
msg = dns.message.make_query("mail.example.", "A")
- res = isctest.query.tcp(msg, "10.53.0.2")
+ res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR)
assert res.answer == [
dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2")
]
# information regarding copyright ownership.
import os
-from typing import Optional
+import time
+from typing import Any, Callable, Optional
import dns.query
import dns.message
+import isctest.log
+
+# compatiblity with dnspython<2.0.0
+try:
+ # In dnspython>=2.0.0, dns.rcode.Rcode class is available
+ # pylint: disable=invalid-name
+ dns_rcode = dns.rcode.Rcode # type: Any
+except AttributeError:
+ # In dnspython<2.0.0, selected rcodes are available as integers directly
+ # from dns.rcode
+ dns_rcode = dns.rcode
QUERY_TIMEOUT = 10
-def udp(
+# pylint: disable=too-many-arguments
+def generic_query(
+ query_func: Callable[..., Any],
message: dns.message.Message,
ip: str,
port: Optional[int] = None,
source: Optional[str] = None,
timeout: int = QUERY_TIMEOUT,
-) -> dns.message.Message:
+ attempts: int = 10,
+ expected_rcode: dns_rcode = None,
+) -> Any:
if port is None:
port = int(os.environ["PORT"])
- return dns.query.udp(message, ip, timeout, port=port, source=source)
+ res = None
+ for attempt in range(attempts):
+ try:
+ isctest.log.debug(
+ f"{generic_query.__name__}(): ip={ip}, port={port}, source={source}, "
+ f"timeout={timeout}, attempts left={attempts-attempt}"
+ )
+ res = query_func(message, ip, timeout, port=port, source=source)
+ if res.rcode() == expected_rcode or expected_rcode is None:
+ return res
+ except (dns.exception.Timeout, ConnectionRefusedError) as e:
+ isctest.log.debug(f"{generic_query.__name__}(): the '{e}' exceptio raised")
+ time.sleep(1)
+ raise dns.exception.Timeout
-def tcp(
- message: dns.message.Message,
- ip: str,
- port: Optional[int] = None,
- source: Optional[str] = None,
- timeout: int = QUERY_TIMEOUT,
-) -> dns.message.Message:
- if port is None:
- port = int(os.environ["PORT"])
- return dns.query.tcp(message, ip, timeout, port=port, source=source)
+def udp(*args, **kwargs) -> Any:
+ return generic_query(dns.query.udp, *args, **kwargs)
+
+
+def tcp(*args, **kwargs) -> Any:
+ return generic_query(dns.query.tcp, *args, **kwargs)
import dns.message
-def wait_for_transfer(ip, port, client_ip, name, rrtype):
- msg = dns.message.make_query(name, rrtype)
- for _ in range(10):
- try:
- res = isctest.query.udp(msg, ip, source=client_ip)
- if res.rcode() == dns.rcode.NOERROR:
- break
- except dns.exception.Timeout:
- pass
- else:
- raise RuntimeError(
- "zone transfer failed: "
- f"client: {client_ip}, name: {name}, rrtype: {rrtype} from @{ip}:{port}"
- )
+# compatiblity with dnspython<2.0.0
+try:
+ # In dnspython>=2.0.0, dns.rcode.Rcode class is available
+ # pylint: disable=invalid-name
+ dns_rcode = dns.rcode.Rcode # type: Any
+except AttributeError:
+ # In dnspython<2.0.0, selected rcodes are available as integers directly
+ # from dns.rcode
+ dns_rcode = dns.rcode
@pytest.mark.parametrize(
("allowed.", "10.53.0.5", dns.rcode.NXDOMAIN),
],
)
-def test_rpz_multiple_views(qname, source, rcode, named_port):
- wait_for_transfer("10.53.0.3", named_port, "10.53.0.2", "rpz-external.local", "SOA")
- wait_for_transfer("10.53.0.3", named_port, "10.53.0.5", "rpz-external.local", "SOA")
+def test_rpz_multiple_views(qname, source, rcode):
+ # Wait for the rpz-external.local zone transfer
+ msg = dns.message.make_query("rpz-external.local", "SOA")
+ isctest.query.tcp(
+ msg,
+ ip="10.53.0.3",
+ source="10.53.0.2",
+ expected_rcode=dns_rcode.NOERROR,
+ )
+ isctest.query.tcp(
+ msg,
+ ip="10.53.0.3",
+ source="10.53.0.5",
+ expected_rcode=dns_rcode.NOERROR,
+ )
msg = dns.message.make_query(qname, "A")
- res = isctest.query.udp(msg, "10.53.0.3", source=source)
- assert res.rcode() == rcode
+ res = isctest.query.udp(msg, "10.53.0.3", source=source, expected_rcode=rcode)
if rcode == dns.rcode.NOERROR:
assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")]
# Should generate a log entry into rpz_passthru.txt
msg_allowed = dns.message.make_query("allowed.", "A")
- res_allowed = isctest.query.udp(msg_allowed, resolver_ip, source="10.53.0.1")
+ res_allowed = isctest.query.udp(
+ msg_allowed, resolver_ip, source="10.53.0.1", expected_rcode=dns.rcode.NOERROR
+ )
assert res_allowed.answer == [
dns.rrset.from_text("allowed.", 300, "IN", "A", "10.53.0.2")
]
# Should generate a log entry into rpz.txt
msg_not_allowed = dns.message.make_query("baddomain.", "A")
res_not_allowed = isctest.query.udp(
- msg_not_allowed, resolver_ip, source="10.53.0.1"
+ msg_not_allowed,
+ resolver_ip,
+ source="10.53.0.1",
+ expected_rcode=dns.rcode.NXDOMAIN,
)
isctest.check.nxdomain(res_not_allowed)
assert ret_code == 0
-def wait_for_named_loaded(resolver_ip, retries=10):
- msg = dns.message.make_query("version.bind", "TXT", "CH")
- for _ in range(retries):
- try:
- res = isctest.query.udp(msg, resolver_ip)
- if res.rcode() == dns.rcode.NOERROR:
- return True
- except dns.exception.Timeout:
- time.sleep(1)
- return False
-
-
def wait_for_proc_termination(proc, max_timeout=10):
for _ in range(max_timeout):
if proc.poll() is not None:
) as named_proc:
try:
assert named_proc.poll() is None, "named isn't running"
- assert wait_for_named_loaded(resolver_ip)
+ msg = dns.message.make_query("version.bind", "TXT", "CH")
+ res = isctest.query.tcp(msg, resolver_ip)
+ isctest.check.noerror(res)
do_work(
named_proc,
resolver_ip,
msg = create_msg("short.example.", "TXT")
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
- ans = isctest.query.udp(msg, statsip)
+ ans = isctest.query.udp(msg, statsip, attempts=1)
isctest.check.noerror(ans)
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
msg = create_msg("long.example.", "TXT")
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
- ans = isctest.query.udp(msg, statsip)
+ ans = isctest.query.udp(msg, statsip, attempts=1)
isctest.check.noerror(ans)
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
msg = create_msg("short.example.", "TXT")
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
- ans = isctest.query.tcp(msg, statsip)
+ ans = isctest.query.tcp(msg, statsip, attempts=1)
isctest.check.noerror(ans)
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
msg = create_msg("long.example.", "TXT")
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
- ans = isctest.query.tcp(msg, statsip)
+ ans = isctest.query.tcp(msg, statsip, attempts=1)
isctest.check.noerror(ans)
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)