import pytest
pytest.importorskip('dns', minversion='2.0.0')
+import dns.edns
+import dns.message
+import dns.name
+import dns.query
+import dns.rdataclass
+import dns.rdatatype
TIMEOUT = 10
def create_msg(qname, qtype):
- import dns.message
msg = dns.message.make_query(qname, qtype, want_dnssec=True,
use_edns=0, payload=4096)
return msg
#
# The initial timeout is 2.5 seconds, so this should timeout
#
- import dns.query
-
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", named_port))
#
# The idle timeout is 5 seconds, so the third message should fail
#
- import dns.rcode
-
msg = create_msg("example.", "A")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", named_port))
#
# Keepalive is 7 seconds, so the third message should succeed.
#
- import dns.rcode
-
msg = create_msg("example.", "A")
kopt = dns.edns.GenericOption(11, b'\x00')
msg.use_edns(edns=True, payload=4096, options=[kopt])
#
# The pipelining should only timeout after the last message is received
#
- import dns.query
-
msg = create_msg("example.", "A")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", named_port))
# The timers should not fire during AXFR, thus the connection should not
# close abruptly
#
- import dns.query
- import dns.rdataclass
- import dns.rdatatype
-
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", named_port))
def test_send_timeout(named_port):
- import dns.query
-
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", named_port))
@pytest.mark.long
def test_max_transfer_idle_out(named_port):
- import dns.query
- import dns.rdataclass
- import dns.rdatatype
-
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", named_port))
@pytest.mark.long
def test_max_transfer_time_out(named_port):
- import dns.query
- import dns.rdataclass
- import dns.rdatatype
-
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", named_port))