]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Refactor the existing Python TCP system tests
authorŠtěpán Balážik <stepan@isc.org>
Sat, 25 Apr 2026 13:58:03 +0000 (15:58 +0200)
committerŠtěpán Balážik <stepan@isc.org>
Sat, 25 Apr 2026 18:06:35 +0000 (20:06 +0200)
Use isctest.query.create() and a shared round-trip helper in
bin/tests/system/tcp/tests_tcp.py, add type hints, and reorganize the
existing tests to follow current style.

bin/tests/system/isctest/query.py
bin/tests/system/tcp/tests_tcp.py

index a7e862b7f6c95fb5422dd84b45bb3f16ec5fbb91..8937e21f02f8e10625cbab257d0d66bb5b06793a 100644 (file)
@@ -136,13 +136,22 @@ def create(
     qtype,
     qclass=dns.rdataclass.IN,
     dnssec: bool = True,
+    use_edns: int | bool = True,
+    payload: int = 1232,
     rd: bool = True,
     cd: bool = False,
     ad: bool = True,
+    message_id: int | None = None,
 ) -> dns.message.Message:
     """Create DNS query with defaults suitable for our tests."""
     msg = dns.message.make_query(
-        qname, qtype, qclass, use_edns=True, want_dnssec=dnssec
+        qname,
+        qtype,
+        qclass,
+        use_edns=use_edns,
+        want_dnssec=dnssec,
+        payload=payload,
+        id=message_id,
     )
     msg.flags = 0
     if rd:
index 46c391c245569c18bc507232789e6b725d7eb255..6a05bc202a95f66f623aae0cbe9149d99621abe3 100644 (file)
@@ -15,45 +15,40 @@ import socket
 import struct
 import time
 
-import dns.flags
 import dns.message
 import dns.name
 import dns.query
 import dns.rrset
 import pytest
 
-pytestmark = pytest.mark.extra_artifacts(
-    [
-        "ans*/ans.run",
-    ]
-)
+import isctest
 
-TIMEOUT = 10
+pytestmark = pytest.mark.extra_artifacts(["ans*/ans.run"])
 
+TIMEOUT: int = 10
 
-def create_msg(qname, qtype, edns=-1):
-    msg = dns.message.make_query(qname, qtype, use_edns=edns)
-    return msg
 
-
-def timeout():
-    return time.time() + TIMEOUT
-
-
-def create_socket(host, port):
+def create_socket(host: str, port: int) -> socket.socket:
     sock = socket.create_connection((host, port), timeout=10)
     sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
     return sock
 
 
-def test_tcp_garbage(named_port):
-    with create_socket("10.53.0.7", named_port) as sock:
-        msg = create_msg("a.example.", "A")
-        dns.query.send_tcp(sock, msg, timeout())
-        dns.query.receive_tcp(sock, timeout())
+def tcp_round_trip(
+    sock: socket.socket, msg: dns.message.Message
+) -> dns.message.Message:
+    expiration = time.time() + TIMEOUT
+    dns.query.send_tcp(sock, msg, expiration)
+    response, _ = dns.query.receive_tcp(sock, expiration)
+    return response
 
-        wire = msg.to_wire()
-        assert len(wire) > 0
+
+def test_tcp_garbage(named_port: int) -> None:
+    with create_socket("10.53.0.7", named_port) as sock:
+        msg = isctest.query.create(
+            "a.example.", "A", dnssec=False, use_edns=-1, ad=False
+        )
+        tcp_round_trip(sock, msg)
 
         # Send DNS message shorter than DNS message header (12),
         # this should cause the connection to be terminated
@@ -62,43 +57,42 @@ def test_tcp_garbage(named_port):
 
         with pytest.raises(EOFError):
             try:
-                dns.query.send_tcp(sock, msg, timeout())
-                dns.query.receive_tcp(sock, timeout())
+                tcp_round_trip(sock, msg)
             except ConnectionError as e:
                 raise EOFError from e
 
 
-def test_tcp_garbage_response(named_port):
+def test_tcp_garbage_response(named_port: int) -> None:
     with create_socket("10.53.0.7", named_port) as sock:
-        msg = create_msg("a.example.", "A")
-        dns.query.send_tcp(sock, msg, timeout())
-        dns.query.receive_tcp(sock, timeout())
-
-        wire = msg.to_wire()
-        assert len(wire) > 0
+        msg = isctest.query.create(
+            "a.example.", "A", dnssec=False, use_edns=-1, ad=False
+        )
+        tcp_round_trip(sock, msg)
 
         # Send DNS response instead of DNS query, this should cause
         # the connection to be terminated
 
         rmsg = dns.message.make_response(msg)
-        dns.query.send_tcp(sock, rmsg, timeout())
 
         with pytest.raises(EOFError):
             try:
-                dns.query.receive_tcp(sock, timeout())
+                tcp_round_trip(sock, rmsg)
             except ConnectionError as e:
                 raise EOFError from e
 
 
 # Regression test for CVE-2022-0396
-def test_close_wait(named_port):
+def test_close_wait(named_port: int) -> None:
     with create_socket("10.53.0.7", named_port) as sock:
-        msg = create_msg("a.example.", "A")
-        dns.query.send_tcp(sock, msg, timeout())
-        dns.query.receive_tcp(sock, timeout())
+        msg = isctest.query.create(
+            "a.example.", "A", dnssec=False, use_edns=-1, ad=False
+        )
+        tcp_round_trip(sock, msg)
 
-        msg = dns.message.make_query("a.example.", "A", use_edns=0, payload=1232)
-        dns.query.send_tcp(sock, msg, timeout())
+        msg = isctest.query.create(
+            "a.example.", "A", dnssec=False, use_edns=0, payload=1232, ad=False
+        )
+        dns.query.send_tcp(sock, msg, time.time() + TIMEOUT)
 
         # Shutdown the socket, but ignore the other side closing the socket
         # first because we sent DNS message with EDNS0
@@ -115,24 +109,25 @@ def test_close_wait(named_port):
     # request. If it gets stuck in CLOSE_WAIT state, there is no connection
     # available for the query below and it will time out.
     with create_socket("10.53.0.7", named_port) as sock:
-        msg = create_msg("a.example.", "A")
-        dns.query.send_tcp(sock, msg, timeout())
-        dns.query.receive_tcp(sock, timeout())
+        msg = isctest.query.create(
+            "a.example.", "A", dnssec=False, use_edns=-1, ad=False
+        )
+        tcp_round_trip(sock, msg)
 
 
 # GL #4273
-def test_tcp_big(named_port):
+def test_tcp_big(named_port: int) -> None:
     with create_socket("10.53.0.7", named_port) as sock:
-        msg = dns.message.Message(id=0)
-        msg.flags = dns.flags.RD
-        msg.question.append(dns.rrset.from_text(dns.name.root, 0, 1, "URI"))
+        msg = isctest.query.create(
+            dns.name.root, "URI", dnssec=False, use_edns=-1, ad=False, message_id=0
+        )
         msg.additional.append(
             dns.rrset.from_text(dns.name.root, 0, 1, "URI", "0 0 " + "b" * 65503)
         )
-        dns.query.send_tcp(sock, msg, timeout())
-        dns.query.receive_tcp(sock, timeout())
+        tcp_round_trip(sock, msg)
 
         # Now check that the server is alive and well
-        msg = create_msg("a.example.", "A")
-        dns.query.send_tcp(sock, msg, timeout())
-        dns.query.receive_tcp(sock, timeout())
+        msg = isctest.query.create(
+            "a.example.", "A", dnssec=False, use_edns=-1, ad=False
+        )
+        tcp_round_trip(sock, msg)