if wantsigs:
r.answer.append(sigs[-1])
else:
- for (i, sig) in rrs:
+ for i, sig in rrs:
if sig and not wantsigs:
continue
elif sig:
from dns.rdatatype import *
from dns.tsig import *
+
# Log query to file
def logquery(type, qname):
with open("qlog", "a") as f:
dopass2 = False
+
############################################################################
#
# This server will serve valid and spoofed answers. A spoofed answer will
from dns.rcode import *
from dns.name import *
+
# Log query to file
def logquery(type, qname):
with open("qlog", "a") as f:
from dns.rcode import *
from dns.name import *
+
# Log query to file
def logquery(type, qname):
with open("qlog", "a") as f:
def test_unsigned_serial_number():
-
"""
Check whether all signed zone files in the "ns8" subdirectory contain the
serial number of the unsigned version of the zone in the raw-format header.
# We're going to execute queries in parallel by means of a thread pool.
# dnspython functions block, so we need to circunvent that.
with ThreadPoolExecutor(n_workers + 1) as executor:
-
# Helper dict, where keys=Future objects and values are tags used
# to process results later.
futures = {}
def zone_mtime(zonedir, name):
-
try:
si = os.stat(os.path.join(zonedir, "{}.db".format(name)))
except FileNotFoundError:
def test_zone_timers_primary(fetch_zones, load_timers, **kwargs):
-
statsip = kwargs["statsip"]
statsport = kwargs["statsport"]
zonedir = kwargs["zonedir"]
def test_zone_timers_secondary(fetch_zones, load_timers, **kwargs):
-
statsip = kwargs["statsip"]
statsport = kwargs["statsport"]
zonedir = kwargs["zonedir"]
def test_zone_with_many_keys(fetch_zones, load_zone, **kwargs):
-
statsip = kwargs["statsip"]
statsport = kwargs["statsport"]
def udp_query(ip, port, msg):
-
ans = dns.query.udp(msg, ip, TIMEOUT, port=port)
assert ans.rcode() == dns.rcode.NOERROR
def tcp_query(ip, port, msg):
-
ans = dns.query.tcp(msg, ip, TIMEOUT, port=port)
assert ans.rcode() == dns.rcode.NOERROR
def test_traffic(fetch_traffic, **kwargs):
-
statsip = kwargs["statsip"]
statsport = kwargs["statsport"]
port = kwargs["port"]
# JSON helper functions
def fetch_zones_json(statsip, statsport):
-
r = requests.get(
"http://{}:{}/json/v1/zones".format(statsip, statsport), timeout=600
)
def fetch_traffic_json(statsip, statsport):
-
r = requests.get(
"http://{}:{}/json/v1/traffic".format(statsip, statsport), timeout=600
)
def load_timers_json(zone, primary=True):
-
name = zone["name"]
# Check if the primary zone timer exists
# XML helper functions
def fetch_zones_xml(statsip, statsport):
-
r = requests.get(
"http://{}:{}/xml/v3/zones".format(statsip, statsport), timeout=600
)
def load_timers_xml(zone, primary=True):
-
name = zone.attrib["name"]
loaded_el = zone.find("loaded")
def test_tcp_garbage(named_port):
with create_socket("10.53.0.7", named_port) as sock:
-
msg = create_msg("a.example.", "A")
(sbytes, stime) = dns.query.send_tcp(sock, msg, timeout())
(response, rtime) = dns.query.receive_tcp(sock, timeout())
def test_tcp_garbage_response(named_port):
with create_socket("10.53.0.7", named_port) as sock:
-
msg = create_msg("a.example.", "A")
(sbytes, stime) = dns.query.send_tcp(sock, msg, timeout())
(response, rtime) = dns.query.receive_tcp(sock, timeout())
# Regression test for CVE-2022-0396
def test_close_wait(named_port):
with create_socket("10.53.0.7", named_port) as sock:
-
msg = create_msg("a.example.", "A")
(sbytes, stime) = dns.query.send_tcp(sock, msg, timeout())
(response, rtime) = dns.query.receive_tcp(sock, timeout())
# request. If it gets stuck in CLOSE_WAIT state, there is no connection
# available for the query below and it will time out.
with create_socket("10.53.0.7", named_port) as sock:
-
msg = create_msg("a.example.", "A")
(sbytes, stime) = dns.query.send_tcp(sock, msg, timeout())
(response, rtime) = dns.query.receive_tcp(sock, timeout())