# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-import time
import os
-
import pytest
pytest.importorskip("dns", minversion="2.0.0")
-import dns.resolver
+import isctest
+import dns.message
-def wait_for_transfer(ip, port, client_ip, name, rrtype):
- resolver = dns.resolver.Resolver()
- resolver.nameservers = [ip]
- resolver.port = port
+def wait_for_transfer(ip, port, client_ip, name, rrtype):
+ msg = dns.message.make_query(name, rrtype)
for _ in range(10):
try:
- resolver.resolve(name, rrtype, source=client_ip)
- except dns.resolver.NoNameservers:
- time.sleep(1)
- else:
- break
+ res = isctest.query.udp(msg, ip, source=client_ip)
+ if res.rcode() == dns.rcode.NOERROR:
+ break
+ except dns.exception.Timeout:
+ pass
else:
raise RuntimeError(
"zone transfer failed: "
- f"client {client_ip} got NXDOMAIN for {name} {rrtype} from @{ip}:{port}"
+ f"client: {client_ip}, name: {name}, rrtype: {rrtype} from @{ip}:{port}"
)
-def test_rpz_multiple_views(named_port):
- resolver = dns.resolver.Resolver()
- resolver.nameservers = ["10.53.0.3"]
- resolver.port = named_port
-
+@pytest.mark.parametrize(
+ "qname,source,rcode",
+ [
+ # For 10.53.0.1 source IP:
+ # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
+ # - gooddomain.com is allowed
+ # - allowed. is allowed
+ ("baddomain.", "10.53.0.1", dns.rcode.NXDOMAIN),
+ ("gooddomain.", "10.53.0.1", dns.rcode.NOERROR),
+ ("allowed.", "10.53.0.1", dns.rcode.NOERROR),
+ # For 10.53.0.2 source IP:
+ # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
+ # - baddomain.com is allowed
+ # - gooddomain.com is allowed
+ ("baddomain.", "10.53.0.2", dns.rcode.NOERROR),
+ ("gooddomain.", "10.53.0.2", dns.rcode.NOERROR),
+ ("allowed.", "10.53.0.2", dns.rcode.NXDOMAIN),
+ # For 10.53.0.3 source IP:
+ # - gooddomain.com is allowed
+ # - baddomain.com is allowed
+ # - allowed. is allowed
+ ("baddomain.", "10.53.0.3", dns.rcode.NOERROR),
+ ("gooddomain.", "10.53.0.3", dns.rcode.NOERROR),
+ ("allowed.", "10.53.0.3", dns.rcode.NOERROR),
+ # For 10.53.0.4 source IP:
+ # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
+ # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
+ # - allowed. is allowed
+ ("baddomain.", "10.53.0.4", dns.rcode.NXDOMAIN),
+ ("gooddomain.", "10.53.0.4", dns.rcode.NXDOMAIN),
+ ("allowed.", "10.53.0.4", dns.rcode.NOERROR),
+ # For 10.53.0.5 (any) source IP:
+ # - baddomain.com is allowed
+ # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
+ # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
+ ("baddomain.", "10.53.0.5", dns.rcode.NOERROR),
+ ("gooddomain.", "10.53.0.5", dns.rcode.NXDOMAIN),
+ ("allowed.", "10.53.0.5", dns.rcode.NXDOMAIN),
+ ],
+)
+def test_rpz_multiple_views(qname, source, rcode, named_port):
wait_for_transfer("10.53.0.3", named_port, "10.53.0.2", "rpz-external.local", "SOA")
wait_for_transfer("10.53.0.3", named_port, "10.53.0.5", "rpz-external.local", "SOA")
- # For 10.53.0.1 source IP:
- # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
- # - gooddomain.com is allowed
- # - allowed. is allowed
- with pytest.raises(dns.resolver.NXDOMAIN):
- resolver.resolve("baddomain.", "A", source="10.53.0.1")
-
- ans = resolver.resolve("gooddomain.", "A", source="10.53.0.1")
- assert ans[0].address == "10.53.0.2"
-
- ans = resolver.resolve("allowed.", "A", source="10.53.0.1")
- assert ans[0].address == "10.53.0.2"
-
- # For 10.53.0.2 source IP:
- # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
- # - baddomain.com is allowed
- # - gooddomain.com is allowed
- ans = resolver.resolve("baddomain.", "A", source="10.53.0.2")
- assert ans[0].address == "10.53.0.2"
-
- ans = resolver.resolve("gooddomain.", "A", source="10.53.0.2")
- assert ans[0].address == "10.53.0.2"
-
- with pytest.raises(dns.resolver.NXDOMAIN):
- resolver.resolve("allowed.", "A", source="10.53.0.2")
-
- # For 10.53.0.3 source IP:
- # - gooddomain.com is allowed
- # - baddomain.com is allowed
- # - allowed. is allowed
- ans = resolver.resolve("baddomain.", "A", source="10.53.0.3")
- assert ans[0].address == "10.53.0.2"
-
- ans = resolver.resolve("gooddomain.", "A", source="10.53.0.3")
- assert ans[0].address == "10.53.0.2"
-
- ans = resolver.resolve("allowed.", "A", source="10.53.0.3")
- assert ans[0].address == "10.53.0.2"
-
- # For 10.53.0.4 source IP:
- # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
- # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
- # - allowed. is allowed
- with pytest.raises(dns.resolver.NXDOMAIN):
- resolver.resolve("baddomain.", "A", source="10.53.0.4")
-
- with pytest.raises(dns.resolver.NXDOMAIN):
- resolver.resolve("gooddomain.", "A", source="10.53.0.4")
-
- ans = resolver.resolve("allowed.", "A", source="10.53.0.4")
- assert ans[0].address == "10.53.0.2"
-
- # For 10.53.0.5 (any) source IP:
- # - baddomain.com is allowed
- # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
- # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
- ans = resolver.resolve("baddomain.", "A", source="10.53.0.5")
- assert ans[0].address == "10.53.0.2"
-
- with pytest.raises(dns.resolver.NXDOMAIN):
- resolver.resolve("gooddomain.", "A", source="10.53.0.5")
-
- with pytest.raises(dns.resolver.NXDOMAIN):
- resolver.resolve("allowed.", "A", source="10.53.0.5")
+ msg = dns.message.make_query(qname, "A")
+ res = isctest.query.udp(msg, "10.53.0.3", source=source)
+ assert res.rcode() == rcode
+ if rcode == dns.rcode.NOERROR:
+ assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")]
-def test_rpz_passthru_logging(named_port):
- resolver = dns.resolver.Resolver()
- resolver.nameservers = ["10.53.0.3"]
- resolver.port = named_port
+def test_rpz_passthru_logging():
+ resolver_ip = "10.53.0.3"
# Should generate a log entry into rpz_passthru.txt
- ans = resolver.resolve("allowed.", "A", source="10.53.0.1")
- assert ans[0].address == "10.53.0.2"
+ msg_allowed = dns.message.make_query("allowed.", "A")
+ res_allowed = isctest.query.udp(msg_allowed, resolver_ip, source="10.53.0.1")
+ assert res_allowed.answer == [
+ dns.rrset.from_text("allowed.", 300, "IN", "A", "10.53.0.2")
+ ]
# baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
# Should generate a log entry into rpz.txt
- with pytest.raises(dns.resolver.NXDOMAIN):
- resolver.resolve("baddomain.", "A", source="10.53.0.1")
+ msg_not_allowed = dns.message.make_query("baddomain.", "A")
+ res_not_allowed = isctest.query.udp(
+ msg_not_allowed, resolver_ip, source="10.53.0.1"
+ )
+ isctest.check.nxdomain(res_not_allowed)
rpz_passthru_logfile = os.path.join("ns3", "rpz_passthru.txt")
rpz_logfile = os.path.join("ns3", "rpz.txt")
pytest.importorskip("dns", minversion="2.0.0")
import dns.exception
-import dns.resolver
import isctest
-def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
+def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries):
"""Creates a number of A queries to run in parallel
in order simulate a slightly more realistic test scenario.
:param named_proc: named process instance
:type named_proc: subprocess.Popen
- :param resolver: target resolver
- :type resolver: dns.resolver.Resolver
+ :param resolver_ip: target resolver's IP address
+ :type resolver_ip: str
:param instance: the named instance to send RNDC commands to
:type instance: isctest.instance.NamedInstance
return -1
# We're going to execute queries in parallel by means of a thread pool.
- # dnspython functions block, so we need to circunvent that.
+ # dnspython functions block, so we need to circumvent that.
with ThreadPoolExecutor(n_workers + 1) as executor:
# Helper dict, where keys=Future objects and values are tags used
# to process results later.
# 50% of work will be A queries.
# 1 work will be rndc stop.
# Remaining work will be rndc status (so we test parallel control
- # connections that were crashing named).
+ # connections that were crashing named).
shutdown = True
for i in range(n_queries):
if i < (n_queries // 2):
)
qname = relname + ".test"
- futures[executor.submit(resolver.resolve, qname, "A")] = tag
+ msg = dns.message.make_query(qname, "A")
+ futures[executor.submit(isctest.query.udp, msg, resolver_ip)] = tag
elif shutdown: # We attempt to stop named in the middle
shutdown = False
if kill_method == "rndc":
# named process exited gracefully after SIGTERM signal.
if futures[future] == "stop":
ret_code = result
-
- except (
- dns.resolver.NXDOMAIN,
- dns.resolver.NoNameservers,
- dns.exception.Timeout,
- ):
+ except dns.exception.Timeout:
pass
if kill_method == "rndc":
assert ret_code == 0
-def wait_for_named_loaded(resolver, retries=10):
+def wait_for_named_loaded(resolver_ip, retries=10):
+ msg = dns.message.make_query("version.bind", "TXT", "CH")
for _ in range(retries):
try:
- resolver.resolve("version.bind", "TXT", "CH")
- return True
- except (dns.resolver.NoNameservers, dns.exception.Timeout):
+ res = isctest.query.udp(msg, resolver_ip)
+ if res.rcode() == dns.rcode.NOERROR:
+ return True
+ except dns.exception.Timeout:
time.sleep(1)
return False
named_ports = isctest.instance.NamedPorts.from_env()
instance = isctest.instance.NamedInstance("ns3", named_ports)
- # We create a resolver instance that will be used to send queries.
- resolver = dns.resolver.Resolver()
- resolver.nameservers = ["10.53.0.3"]
- resolver.port = named_ports.dns
-
+ resolver_ip = "10.53.0.3"
named_cmdline = [named, "-c", cfg_file, "-d", "99", "-g"]
with open(os.path.join(cfg_dir, "named.run"), "ab") as named_log:
with subprocess.Popen(
) as named_proc:
try:
assert named_proc.poll() is None, "named isn't running"
- assert wait_for_named_loaded(resolver)
+ assert wait_for_named_loaded(resolver_ip)
do_work(
named_proc,
- resolver,
+ resolver_ip,
instance,
kill_method,
n_workers=12,