From: Štěpán Balážik Date: Sat, 25 Apr 2026 13:58:03 +0000 (+0200) Subject: Refactor the existing Python TCP system tests X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=10af6bb970dba4955d96d2237f783cbb84d98c97;p=thirdparty%2Fbind9.git Refactor the existing Python TCP system tests Use isctest.query.create() and a shared round-trip helper in bin/tests/system/tcp/tests_tcp.py, add type hints, and reorganize the existing tests to follow current style. --- diff --git a/bin/tests/system/isctest/query.py b/bin/tests/system/isctest/query.py index 650c1fc637e..19086df1a81 100644 --- a/bin/tests/system/isctest/query.py +++ b/bin/tests/system/isctest/query.py @@ -147,13 +147,22 @@ def create( qtype, qclass=dns.rdataclass.IN, dnssec: bool = True, + use_edns: int | bool = True, + payload: int = 1232, rd: bool = True, cd: bool = False, ad: bool = True, + message_id: int | None = None, ) -> dns.message.Message: """Create DNS query with defaults suitable for our tests.""" msg = dns.message.make_query( - qname, qtype, qclass, use_edns=True, want_dnssec=dnssec + qname, + qtype, + qclass, + use_edns=use_edns, + want_dnssec=dnssec, + payload=payload, + id=message_id, ) msg.flags = 0 if rd: diff --git a/bin/tests/system/tcp/tests_tcp.py b/bin/tests/system/tcp/tests_tcp.py index 46c391c2455..940b89f7c74 100644 --- a/bin/tests/system/tcp/tests_tcp.py +++ b/bin/tests/system/tcp/tests_tcp.py @@ -15,45 +15,42 @@ import socket import struct import time -import dns.flags import dns.message import dns.name import dns.query import dns.rrset import pytest -pytestmark = pytest.mark.extra_artifacts( - [ - "ans*/ans.run", - ] -) +from isctest.instance import NamedInstance -TIMEOUT = 10 +import isctest +pytestmark = pytest.mark.extra_artifacts(["ans*/ans.run"]) -def create_msg(qname, qtype, edns=-1): - msg = dns.message.make_query(qname, qtype, use_edns=edns) - return msg +TIMEOUT: int = 10 -def timeout(): - return time.time() + TIMEOUT - - -def create_socket(host, port): +def create_socket(host: str, port: int) -> socket.socket: sock = socket.create_connection((host, port), timeout=10) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) return sock -def test_tcp_garbage(named_port): - with create_socket("10.53.0.7", named_port) as sock: - msg = create_msg("a.example.", "A") - dns.query.send_tcp(sock, msg, timeout()) - dns.query.receive_tcp(sock, timeout()) +def tcp_round_trip( + sock: socket.socket, msg: dns.message.Message +) -> dns.message.Message: + expiration = time.time() + TIMEOUT + dns.query.send_tcp(sock, msg, expiration) + response, _ = dns.query.receive_tcp(sock, expiration) + return response + - wire = msg.to_wire() - assert len(wire) > 0 +def test_tcp_garbage(ns7: NamedInstance, named_port: int) -> None: + with create_socket(ns7.ip, named_port) as sock: + msg = isctest.query.create( + "a.example.", "A", dnssec=False, use_edns=-1, ad=False + ) + tcp_round_trip(sock, msg) # Send DNS message shorter than DNS message header (12), # this should cause the connection to be terminated @@ -62,43 +59,42 @@ def test_tcp_garbage(named_port): with pytest.raises(EOFError): try: - dns.query.send_tcp(sock, msg, timeout()) - dns.query.receive_tcp(sock, timeout()) + tcp_round_trip(sock, msg) except ConnectionError as e: raise EOFError from e -def test_tcp_garbage_response(named_port): - with create_socket("10.53.0.7", named_port) as sock: - msg = create_msg("a.example.", "A") - dns.query.send_tcp(sock, msg, timeout()) - dns.query.receive_tcp(sock, timeout()) - - wire = msg.to_wire() - assert len(wire) > 0 +def test_tcp_garbage_response(ns7: NamedInstance, named_port: int) -> None: + with create_socket(ns7.ip, named_port) as sock: + msg = isctest.query.create( + "a.example.", "A", dnssec=False, use_edns=-1, ad=False + ) + tcp_round_trip(sock, msg) # Send DNS response instead of DNS query, this should cause # the connection to be terminated rmsg = dns.message.make_response(msg) - dns.query.send_tcp(sock, rmsg, timeout()) with pytest.raises(EOFError): try: - dns.query.receive_tcp(sock, timeout()) + tcp_round_trip(sock, rmsg) except ConnectionError as e: raise EOFError from e # Regression test for CVE-2022-0396 -def test_close_wait(named_port): - with create_socket("10.53.0.7", named_port) as sock: - msg = create_msg("a.example.", "A") - dns.query.send_tcp(sock, msg, timeout()) - dns.query.receive_tcp(sock, timeout()) +def test_close_wait(ns7: NamedInstance, named_port: int) -> None: + with create_socket(ns7.ip, named_port) as sock: + msg = isctest.query.create( + "a.example.", "A", dnssec=False, use_edns=-1, ad=False + ) + tcp_round_trip(sock, msg) - msg = dns.message.make_query("a.example.", "A", use_edns=0, payload=1232) - dns.query.send_tcp(sock, msg, timeout()) + msg = isctest.query.create( + "a.example.", "A", dnssec=False, use_edns=0, payload=1232, ad=False + ) + dns.query.send_tcp(sock, msg, time.time() + TIMEOUT) # Shutdown the socket, but ignore the other side closing the socket # first because we sent DNS message with EDNS0 @@ -114,25 +110,26 @@ def test_close_wait(named_port): # ns7/named.dropedns and close the socket, making room for the next # request. If it gets stuck in CLOSE_WAIT state, there is no connection # available for the query below and it will time out. - with create_socket("10.53.0.7", named_port) as sock: - msg = create_msg("a.example.", "A") - dns.query.send_tcp(sock, msg, timeout()) - dns.query.receive_tcp(sock, timeout()) + with create_socket(ns7.ip, named_port) as sock: + msg = isctest.query.create( + "a.example.", "A", dnssec=False, use_edns=-1, ad=False + ) + tcp_round_trip(sock, msg) # GL #4273 -def test_tcp_big(named_port): - with create_socket("10.53.0.7", named_port) as sock: - msg = dns.message.Message(id=0) - msg.flags = dns.flags.RD - msg.question.append(dns.rrset.from_text(dns.name.root, 0, 1, "URI")) +def test_tcp_big(ns7: NamedInstance, named_port: int) -> None: + with create_socket(ns7.ip, named_port) as sock: + msg = isctest.query.create( + dns.name.root, "URI", dnssec=False, use_edns=-1, ad=False, message_id=0 + ) msg.additional.append( dns.rrset.from_text(dns.name.root, 0, 1, "URI", "0 0 " + "b" * 65503) ) - dns.query.send_tcp(sock, msg, timeout()) - dns.query.receive_tcp(sock, timeout()) + tcp_round_trip(sock, msg) # Now check that the server is alive and well - msg = create_msg("a.example.", "A") - dns.query.send_tcp(sock, msg, timeout()) - dns.query.receive_tcp(sock, timeout()) + msg = isctest.query.create( + "a.example.", "A", dnssec=False, use_edns=-1, ad=False + ) + tcp_round_trip(sock, msg)