import struct
import time
-import dns.flags
import dns.message
import dns.name
import dns.query
import dns.rrset
import pytest
-pytestmark = pytest.mark.extra_artifacts(
- [
- "ans*/ans.run",
- ]
-)
+import isctest
-TIMEOUT = 10
+pytestmark = pytest.mark.extra_artifacts(["ans*/ans.run"])
+TIMEOUT: int = 10
-def create_msg(qname, qtype, edns=-1):
- msg = dns.message.make_query(qname, qtype, use_edns=edns)
- return msg
-
-def timeout():
- return time.time() + TIMEOUT
-
-
-def create_socket(host, port):
+def create_socket(host: str, port: int) -> socket.socket:
sock = socket.create_connection((host, port), timeout=10)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
return sock
-def test_tcp_garbage(named_port):
- with create_socket("10.53.0.7", named_port) as sock:
- msg = create_msg("a.example.", "A")
- dns.query.send_tcp(sock, msg, timeout())
- dns.query.receive_tcp(sock, timeout())
+def tcp_round_trip(
+ sock: socket.socket, msg: dns.message.Message
+) -> dns.message.Message:
+ expiration = time.time() + TIMEOUT
+ dns.query.send_tcp(sock, msg, expiration)
+ response, _ = dns.query.receive_tcp(sock, expiration)
+ return response
- wire = msg.to_wire()
- assert len(wire) > 0
+
+def test_tcp_garbage(named_port: int) -> None:
+ with create_socket("10.53.0.7", named_port) as sock:
+ msg = isctest.query.create(
+ "a.example.", "A", dnssec=False, use_edns=-1, ad=False
+ )
+ tcp_round_trip(sock, msg)
# Send DNS message shorter than DNS message header (12),
# this should cause the connection to be terminated
with pytest.raises(EOFError):
try:
- dns.query.send_tcp(sock, msg, timeout())
- dns.query.receive_tcp(sock, timeout())
+ tcp_round_trip(sock, msg)
except ConnectionError as e:
raise EOFError from e
-def test_tcp_garbage_response(named_port):
+def test_tcp_garbage_response(named_port: int) -> None:
with create_socket("10.53.0.7", named_port) as sock:
- msg = create_msg("a.example.", "A")
- dns.query.send_tcp(sock, msg, timeout())
- dns.query.receive_tcp(sock, timeout())
-
- wire = msg.to_wire()
- assert len(wire) > 0
+ msg = isctest.query.create(
+ "a.example.", "A", dnssec=False, use_edns=-1, ad=False
+ )
+ tcp_round_trip(sock, msg)
# Send DNS response instead of DNS query, this should cause
# the connection to be terminated
rmsg = dns.message.make_response(msg)
- dns.query.send_tcp(sock, rmsg, timeout())
with pytest.raises(EOFError):
try:
- dns.query.receive_tcp(sock, timeout())
+ tcp_round_trip(sock, rmsg)
except ConnectionError as e:
raise EOFError from e
# Regression test for CVE-2022-0396
-def test_close_wait(named_port):
+def test_close_wait(named_port: int) -> None:
with create_socket("10.53.0.7", named_port) as sock:
- msg = create_msg("a.example.", "A")
- dns.query.send_tcp(sock, msg, timeout())
- dns.query.receive_tcp(sock, timeout())
+ msg = isctest.query.create(
+ "a.example.", "A", dnssec=False, use_edns=-1, ad=False
+ )
+ tcp_round_trip(sock, msg)
- msg = dns.message.make_query("a.example.", "A", use_edns=0, payload=1232)
- dns.query.send_tcp(sock, msg, timeout())
+ msg = isctest.query.create(
+ "a.example.", "A", dnssec=False, use_edns=0, payload=1232, ad=False
+ )
+ dns.query.send_tcp(sock, msg, time.time() + TIMEOUT)
# Shutdown the socket, but ignore the other side closing the socket
# first because we sent DNS message with EDNS0
# request. If it gets stuck in CLOSE_WAIT state, there is no connection
# available for the query below and it will time out.
with create_socket("10.53.0.7", named_port) as sock:
- msg = create_msg("a.example.", "A")
- dns.query.send_tcp(sock, msg, timeout())
- dns.query.receive_tcp(sock, timeout())
+ msg = isctest.query.create(
+ "a.example.", "A", dnssec=False, use_edns=-1, ad=False
+ )
+ tcp_round_trip(sock, msg)
# GL #4273
-def test_tcp_big(named_port):
+def test_tcp_big(named_port: int) -> None:
with create_socket("10.53.0.7", named_port) as sock:
- msg = dns.message.Message(id=0)
- msg.flags = dns.flags.RD
- msg.question.append(dns.rrset.from_text(dns.name.root, 0, 1, "URI"))
+ msg = isctest.query.create(
+ dns.name.root, "URI", dnssec=False, use_edns=-1, ad=False, message_id=0
+ )
msg.additional.append(
dns.rrset.from_text(dns.name.root, 0, 1, "URI", "0 0 " + "b" * 65503)
)
- dns.query.send_tcp(sock, msg, timeout())
- dns.query.receive_tcp(sock, timeout())
+ tcp_round_trip(sock, msg)
# Now check that the server is alive and well
- msg = create_msg("a.example.", "A")
- dns.query.send_tcp(sock, msg, timeout())
- dns.query.receive_tcp(sock, timeout())
+ msg = isctest.query.create(
+ "a.example.", "A", dnssec=False, use_edns=-1, ad=False
+ )
+ tcp_round_trip(sock, msg)