From: Štěpán Balážik Date: Sat, 25 Apr 2026 14:03:37 +0000 (+0200) Subject: Port TCP high-water checks to Python X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=236fb2cb3bc5316678c578776d46e2d599864512;p=thirdparty%2Fbind9.git Port TCP high-water checks to Python Add Python helpers for inspecting `rndc status`, opening probe connections, and waiting for counter changes, then use them to port the TCP and recursive high-water checks from the shell script. Connections are now managed by the test script directly removing the need for the ans6 server. This also removes the need for the send.pl script and the respective shell test helper as they were used to control said server. --- diff --git a/bin/tests/system/conf.sh b/bin/tests/system/conf.sh index c4b90a4bbb3..5a2aaf8dbf9 100644 --- a/bin/tests/system/conf.sh +++ b/bin/tests/system/conf.sh @@ -95,10 +95,6 @@ stop_server() { $PERL "$TOP_SRCDIR/bin/tests/system/stop.pl" "$SYSTESTDIR" "$@" } -send() { - $PERL "$TOP_SRCDIR/bin/tests/system/send.pl" "$@" -} - # # Useful functions in test scripts # diff --git a/bin/tests/system/send.pl b/bin/tests/system/send.pl deleted file mode 100644 index 62b4f7ac48c..00000000000 --- a/bin/tests/system/send.pl +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/perl - -# Copyright (C) Internet Systems Consortium, Inc. ("ISC") -# -# SPDX-License-Identifier: MPL-2.0 -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, you can obtain one at https://mozilla.org/MPL/2.0/. -# -# See the COPYRIGHT file distributed with this work for additional -# information regarding copyright ownership. - -# -# Send a file to a given address and port using TCP. Used for -# configuring the test server in ans.pl. -# - -use IO::File; -use IO::Socket; - -@ARGV == 2 or die "usage: send.pl host port [file ...]\n"; - -my $host = shift @ARGV; -my $port = shift @ARGV; - -my $sock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $port, - Proto => "tcp",) or die "$!"; -while (<>) { - $sock->syswrite($_, length $_); -} - -$sock->close; diff --git a/bin/tests/system/tcp/ans6/ans.py b/bin/tests/system/tcp/ans6/ans.py deleted file mode 100644 index 210ae16c98a..00000000000 --- a/bin/tests/system/tcp/ans6/ans.py +++ /dev/null @@ -1,156 +0,0 @@ -# Copyright (C) Internet Systems Consortium, Inc. ("ISC") -# -# SPDX-License-Identifier: MPL-2.0 -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, you can obtain one at https://mozilla.org/MPL/2.0/. -# -# See the COPYRIGHT file distributed with this work for additional -# information regarding copyright ownership. - -############################################################################ -# -# This tool allows an arbitrary number of TCP connections to be made to the -# specified service and to keep them open until told otherwise. It is -# controlled by writing text commands to a TCP socket (default port: 5309). -# -# Currently supported commands: -# -# - open -# -# Opens TCP connections to : and keeps them open. -# must be an IP address (IPv4 or IPv6). -# -# - close -# -# Close the oldest previously established connections. -# -############################################################################ - -from __future__ import print_function - -import datetime -import errno -import os -import select -import signal -import socket -import sys -import time - -# Timeout for establishing all connections requested by a single 'open' command. -OPEN_TIMEOUT = 2 -VERSION_QUERY = b"\x00\x1e\xaf\xb8\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07version\x04bind\x00\x00\x10\x00\x03" - - -def log(msg): - print(datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f ") + msg) - - -def open_connections(active_conns, count, host, port): - queued = [] - errors = [] - - try: - socket.inet_aton(host) - family = socket.AF_INET - except socket.error: - family = socket.AF_INET6 - - log(f"Opening {count} connections...") - - for _ in range(count): - sock = socket.socket(family, socket.SOCK_STREAM) - sock.setblocking(0) - err = sock.connect_ex((host, port)) - if err not in (0, errno.EINPROGRESS): - log(f"{errno.errorcode[err]} on connect for socket {sock}") - errors.append(sock) - else: - queued.append(sock) - - start = time.monotonic() - while queued: - now = time.monotonic() - time_left = OPEN_TIMEOUT - (now - start) - if time_left <= 0: - break - _, wsocks, _ = select.select([], queued, [], time_left) - for sock in wsocks: - queued.remove(sock) - err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - if err: - log(f"{errno.errorcode[err]} for socket {sock}") - errors.append(sock) - else: - sock.send(VERSION_QUERY) - active_conns.append(sock) - - if errors: - log(f"result=FAIL: {len(errors)} connection(s) failed") - elif queued: - log(f"result=FAIL: Timed out, aborting {len(queued)} pending connections") - for sock in queued: - sock.close() - else: - log(f"result=OK: Successfully opened {count} connections") - - -def close_connections(active_conns, count): - log(f"Closing {'all' if count == 0 else count} connections...") - if count == 0: - count = len(active_conns) - for _ in range(count): - sock = active_conns.pop(0) - sock.close() - log(f"result=OK: Successfully closed {count} connections") - - -def sigterm(*_): - log("SIGTERM received, shutting down") - os.remove("ans.pid") - sys.exit(0) - - -def main(): - active_conns = [] - - signal.signal(signal.SIGTERM, sigterm) - - with open("ans.pid", "w", encoding="utf-8") as pidfile: - print(os.getpid(), file=pidfile) - - listenip = "10.53.0.6" - try: - port = int(os.environ["CONTROLPORT"]) - except KeyError: - port = 5309 - - log(f"Listening on {listenip}:{port}") - - ctlsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - ctlsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - ctlsock.bind((listenip, port)) - ctlsock.listen(1) - - while True: - clientsock, _ = ctlsock.accept() - log(f"Accepted control connection from {clientsock}") - cmdline = clientsock.recv(512).decode("ascii").strip() - if cmdline: - log(f"Received command: {cmdline}") - cmd = cmdline.split() - if cmd[0] == "open": - count, host, port = cmd[1:] - open_connections(active_conns, int(count), host, int(port)) - elif cmd[0] == "close": - (count,) = cmd[1:] - close_connections(active_conns, int(count)) - else: - log("result=FAIL: Unknown command") - clientsock.close() - - -if __name__ == "__main__": - main() diff --git a/bin/tests/system/tcp/tests.sh b/bin/tests/system/tcp/tests.sh index 3e0f203c5e3..fc829f77f48 100644 --- a/bin/tests/system/tcp/tests.sh +++ b/bin/tests/system/tcp/tests.sh @@ -27,145 +27,6 @@ rndccmd() { status=0 n=0 -# -------- TCP high-water tests ---------- -refresh_tcp_stats() { - rndccmd 10.53.0.5 status >rndc.out.$n || ret=1 - TCP_CUR="$(sed -n "s/^tcp clients: \([0-9][0-9]*\).*/\1/p" rndc.out.$n)" - TCP_LIMIT="$(sed -n "s/^tcp clients: .*\/\([0-9][0-9]*\)/\1/p" rndc.out.$n)" - TCP_HIGH="$(sed -n "s/^TCP high-water: \([0-9][0-9]*\)/\1/p" rndc.out.$n)" - REC_HIGH="$(sed -n "s/^recursive high-water: \([0-9][0-9]*\)/\1/p" rndc.out.$n)" -} - -# Send a command to the tool script listening on 10.53.0.6. -send_command() { - nextpart ans6/ans.run >/dev/null - echo "$*" | send 10.53.0.6 "${CONTROLPORT}" - wait_for_log_peek 10 "result=" ans6/ans.run || ret=1 - if ! nextpartpeek ans6/ans.run | grep -qF "result=OK"; then - return 1 - fi -} - -# Instructs ans6 to open $1 TCP connections to 10.53.0.5. -open_connections() { - send_command "open" "${1}" 10.53.0.5 "${PORT}" || return 1 -} - -# Instructs ans6 to close $1 TCP connections to 10.53.0.5. -close_connections() { - send_command "close" "${1}" || return 1 -} - -# Check TCP connections are working normally before opening -# multiple connections -n=$((n + 1)) -echo_i "checking TCP query repsonse ($n)" -ret=0 -dig_with_opts +tcp @10.53.0.5 txt.example >dig.out.test$n -grep "status: NXDOMAIN" dig.out.test$n >/dev/null || ret=1 -if [ $ret != 0 ]; then echo_i "failed"; fi -status=$((status + ret)) - -# Check TCP statistics after server startup before using them as a baseline for -# subsequent checks. -n=$((n + 1)) -echo_i "TCP and recursive high-water: check initial statistics ($n)" -ret=0 -refresh_tcp_stats -assert_int_equal "${TCP_CUR}" 0 "current TCP clients count" || ret=1 -# We compare initial tcp-highwater value with 1 because as part of the -# system test startup, the script start.pl executes dig to check if target -# named is running, and that increments tcp-quota by one. -assert_int_equal "${TCP_HIGH}" 1 "tcp-highwater count" || ret=1 -assert_int_equal "${REC_HIGH}" 1 "recursive-highwater count" || ret=1 -if [ $ret != 0 ]; then echo_i "failed"; fi -status=$((status + ret)) - -# Reset TCP high-water statistics -n=$((n + 1)) -echo_i "TCP and recursive high-water: reset ($n)" -ret=0 -rndccmd 10.53.0.5 reset-stats tcp-high-water recursive-high-water || ret=1 -if [ $ret != 0 ]; then echo_i "failed"; fi -status=$((status + ret)) - -# Check TCP statistics after reset -n=$((n + 1)) -echo_i "TCP and recursive high-water: check statistics after reset ($n)" -ret=0 -refresh_tcp_stats -assert_int_equal "${TCP_CUR}" 0 "current TCP clients count" || ret=1 -assert_int_equal "${TCP_HIGH}" 0 "tcp-highwater count" || ret=1 -assert_int_equal "${REC_HIGH}" 0 "recursive-highwater count" || ret=1 -if [ $ret != 0 ]; then echo_i "failed"; fi -status=$((status + ret)) - -# Ensure the TCP high-water statistic gets updated after some TCP connections -# are established, and recursive high-water is unchanged. -n=$((n + 1)) -echo_i "TCP and recursive high-water: check values after some TCP and UDP connections are established ($n)" -ret=0 -OLD_TCP_CUR="${TCP_CUR}" -OLD_REC_HIGH="${REC_HIGH}" -TCP_ADDED=9 -REC_ADDED=1 -dig_with_opts +udp @10.53.0.5 recurse.example >dig.out.test$n -open_connections "${TCP_ADDED}" || ret=1 -check_stats_added() { - refresh_tcp_stats - assert_int_equal "${TCP_CUR}" $((OLD_TCP_CUR + TCP_ADDED)) "current TCP clients count" || return 1 - assert_int_equal "${TCP_HIGH}" $((OLD_TCP_CUR + TCP_ADDED)) "TCP high-water value" || return 1 - assert_int_equal "${REC_HIGH}" $((OLD_REC_HIGH + REC_ADDED)) "recursive high-water value" || return 1 -} -retry 2 check_stats_added || ret=1 -if [ $ret != 0 ]; then echo_i "failed"; fi -status=$((status + ret)) - -# Ensure the TCP high-water statistic remains unchanged after some TCP -# connections are closed. -n=$((n + 1)) -echo_i "TCP high-water: check value after some TCP connections are closed ($n)" -ret=0 -OLD_TCP_CUR="${TCP_CUR}" -OLD_TCP_HIGH="${TCP_HIGH}" -TCP_REMOVED=5 -close_connections "${TCP_REMOVED}" || ret=1 -check_stats_removed() { - refresh_tcp_stats - assert_int_equal "${TCP_CUR}" $((OLD_TCP_CUR - TCP_REMOVED)) "current TCP clients count" || return 1 - assert_int_equal "${TCP_HIGH}" "${OLD_TCP_HIGH}" "TCP high-water value" || return 1 -} -retry 2 check_stats_removed || ret=1 -if [ $ret != 0 ]; then echo_i "failed"; fi -status=$((status + ret)) - -# Ensure the TCP high-water statistic never exceeds the configured TCP clients -# limit. -n=$((n + 1)) -echo_i "TCP high-water: ensure tcp-clients is an upper bound ($n)" -ret=0 -open_connections $((TCP_LIMIT + 1)) || ret=1 -check_stats_limit() { - refresh_tcp_stats - assert_int_equal "${TCP_CUR}" "${TCP_LIMIT}" "current TCP clients count" || return 1 - assert_int_equal "${TCP_HIGH}" "${TCP_LIMIT}" "TCP high-water value" || return 1 -} -retry 2 check_stats_limit || ret=1 -if [ $ret != 0 ]; then echo_i "failed"; fi -status=$((status + ret)) - -# Check TCP connections are working normally before opening -# multiple connections -n=$((n + 1)) -echo_i "checking TCP response recovery ($n)" -ret=0 -# "0" closes all connections -close_connections 0 || ret=1 -dig_with_opts +tcp @10.53.0.5 txt.example >dig.out.test$n || ret=1 -grep "status: NXDOMAIN" dig.out.test$n >/dev/null || ret=1 -if [ $ret != 0 ]; then echo_i "failed"; fi -status=$((status + ret)) - #################################################### # NOTE: The next test resets the debug level to 1. # #################################################### diff --git a/bin/tests/system/tcp/tests_sh_tcp.py b/bin/tests/system/tcp/tests_sh_tcp.py index f2430db9eee..8aba3c456c0 100644 --- a/bin/tests/system/tcp/tests_sh_tcp.py +++ b/bin/tests/system/tcp/tests_sh_tcp.py @@ -14,9 +14,6 @@ import pytest pytestmark = pytest.mark.extra_artifacts( [ "dig.out.*", - "rndc.out.*", - "ans*/ans.run", - "ans*/ans.run.prev", ] ) diff --git a/bin/tests/system/tcp/tests_tcp.py b/bin/tests/system/tcp/tests_tcp.py index bcf6a60dc53..c4353657429 100644 --- a/bin/tests/system/tcp/tests_tcp.py +++ b/bin/tests/system/tcp/tests_tcp.py @@ -11,6 +11,11 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. +from collections.abc import Iterable +from types import TracebackType +from typing import NamedTuple + +import asyncio import socket import struct import time @@ -30,6 +35,101 @@ pytestmark = pytest.mark.extra_artifacts(["ans*/ans.run", "ns*/named.stats"]) TIMEOUT: int = 10 +class TcpStatus(NamedTuple): + current: int + limit: int + high_water: int + recursive_high_water: int | None + + def check( + self, + *, + current: int | None = None, + limit: int | None = None, + high_water: int | None = None, + recursive_high_water: int | None = None, + ) -> None: + if current is not None: + assert ( + self.current == current + ), f"current TCP clients count: expected {current}, got {self.current}" + if limit is not None: + assert ( + self.limit == limit + ), f"TCP clients limit: expected {limit}, got {self.limit}" + if high_water is not None: + assert ( + self.high_water == high_water + ), f"TCP high-water value: expected {high_water}, got {self.high_water}" + if recursive_high_water is not None: + assert self.recursive_high_water == recursive_high_water, ( + "recursive high-water value: " + f"expected {recursive_high_water}, got {self.recursive_high_water}" + ) + + +class TcpConnectionPool: + OPEN_TIMEOUT = 2 + + def __init__(self) -> None: + self.connections: list[socket.socket] = [] + + async def __aenter__(self) -> "TcpConnectionPool": + return self + + async def __aexit__( + self, + _exc_type: type[BaseException] | None, + _exc: BaseException | None, + _tb: TracebackType | None, + ) -> None: + await self.close() + + async def open(self, count: int, host: str, port: int) -> None: + tasks = [ + asyncio.create_task(open_tcp_query_connection(host, port)) + for _ in range(count) + ] + + try: + results = await asyncio.wait_for( + asyncio.gather(*tasks, return_exceptions=True), + timeout=self.OPEN_TIMEOUT, + ) + except asyncio.TimeoutError as exc: + for task in tasks: + task.cancel() + results = await asyncio.gather(*tasks, return_exceptions=True) + await close_tcp_connections( + result for result in results if isinstance(result, socket.socket) + ) + raise AssertionError(f"timed out opening {count} TCP connections") from exc + + connections = [ + result for result in results if isinstance(result, socket.socket) + ] + errors = [result for result in results if isinstance(result, BaseException)] + if errors: + await close_tcp_connections(connections) + raise AssertionError( + f"{len(errors)} TCP connection(s) failed: {errors[0]!r}" + ) + + self.connections.extend(connections) + + async def close(self, count: int = 0) -> None: + if count == 0: + count = len(self.connections) + + assert count <= len( + self.connections + ), f"cannot close {count} of {len(self.connections)} active connection(s)" + + closing = self.connections[:count] + del self.connections[:count] + await close_tcp_connections(closing) + + def create_socket(host: str, port: int) -> socket.socket: sock = socket.create_connection((host, port), timeout=10) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) @@ -50,6 +150,32 @@ def tcp_requests_received(ns: NamedInstance) -> int: return last_count +def tcp_status(ns: NamedInstance) -> TcpStatus: + status = ns.rndc("status").out + + def value(label: str) -> str: + matches = status.grep(f"{label}:") + assert matches, f"'{label}' not found in rndc status:\n{status}" + line = matches[0].string.strip() + _, _, result = line.partition(":") + return result.strip() + + current, limit = value("tcp clients").split("/", maxsplit=1) + + return TcpStatus( + current=int(current), + limit=int(limit), + high_water=int(value("TCP high-water")), + recursive_high_water=int(value("recursive high-water")), + ) + + +def check_tcp_response(server_ip: str) -> None: + msg = isctest.query.create("txt.example.", "A") + response = isctest.query.tcp(msg, server_ip) + isctest.check.nxdomain(response) + + def tcp_round_trip( sock: socket.socket, msg: dns.message.Message ) -> dns.message.Message: @@ -59,6 +185,64 @@ def tcp_round_trip( return response +async def open_tcp_query_connection(host: str, port: int) -> socket.socket: + try: + socket.inet_pton(socket.AF_INET, host) + family = socket.AF_INET + except OSError: + family = socket.AF_INET6 + + sock = socket.socket(family, socket.SOCK_STREAM) + sock.setblocking(False) + + try: + loop = asyncio.get_running_loop() + await loop.sock_connect(sock, (host, port)) + msg = isctest.query.create( + "version.bind.", "TXT", "CH", dnssec=False, use_edns=False, ad=False + ) + await loop.sock_sendall(sock, msg.to_wire(prepend_length=True)) + except OSError: + sock.close() + raise + + return sock + + +async def close_tcp_connections(connections: Iterable[socket.socket]) -> None: + for sock in connections: + sock.close() + await asyncio.sleep(0) + + +def wait_for_tcp_status( + ns: NamedInstance, + *, + current: int | None = None, + limit: int | None = None, + high_water: int | None = None, + recursive_high_water: int | None = None, + timeout: int = 2, + delay: int = 1, +) -> TcpStatus: + status: TcpStatus | None = None + + def check() -> bool: + nonlocal status + status = tcp_status(ns) + status.check( + current=current, + limit=limit, + high_water=high_water, + recursive_high_water=recursive_high_water, + ) + return True + + isctest.run.retry_with_timeout(check, timeout=timeout, delay=delay) + assert status is not None + return status + + def test_tcp_garbage(named_port: int) -> None: with create_socket("10.53.0.7", named_port) as sock: msg = isctest.query.create( @@ -175,3 +359,68 @@ def test_tcp_request_statistics( ns2_tcp_after_forwarder = tcp_requests_received(ns2) assert ns1_tcp_after_resolver == ns1_tcp_after_forwarder assert ns2_tcp_after_resolver < ns2_tcp_after_forwarder + + +def test_tcp_high_water(named_port: int, ns5: NamedInstance) -> None: + async def run() -> None: + async with TcpConnectionPool() as pool: + isctest.log.info("checking TCP query response") + check_tcp_response(ns5.ip) + + isctest.log.info("TCP and recursive high-water: check initial statistics") + status = tcp_status(ns5) + # start.pl checks whether each named is running using dig over TCP, + # which increments both high-water counters once before tests run. + status.check(current=0, high_water=1, recursive_high_water=1) + + isctest.log.info("TCP and recursive high-water: reset") + ns5.rndc("reset-stats tcp-high-water recursive-high-water") + + isctest.log.info( + "TCP and recursive high-water: check statistics after reset" + ) + status = tcp_status(ns5) + status.check(current=0, high_water=0, recursive_high_water=0) + + isctest.log.info( + "TCP and recursive high-water: check values after some TCP " + "and UDP connections are established" + ) + old_status = status + tcp_added = 9 + rec_added = 1 + msg = isctest.query.create("recurse.example.", "A") + isctest.query.udp(msg, ns5.ip) + await pool.open(tcp_added, ns5.ip, named_port) + status = wait_for_tcp_status( + ns5, + current=old_status.current + tcp_added, + high_water=old_status.current + tcp_added, + recursive_high_water=old_status.recursive_high_water + rec_added, + ) + + isctest.log.info( + "TCP high-water: check value after some TCP connections are closed" + ) + old_status = status + tcp_removed = 5 + await pool.close(tcp_removed) + status = wait_for_tcp_status( + ns5, + current=old_status.current - tcp_removed, + high_water=old_status.high_water, + ) + + isctest.log.info("TCP high-water: ensure tcp-clients is an upper bound") + await pool.open(status.limit + 1, ns5.ip, named_port) + wait_for_tcp_status( + ns5, + current=status.limit, + high_water=status.limit, + ) + + isctest.log.info("checking TCP response recovery") + await pool.close() + check_tcp_response(ns5.ip) + + asyncio.run(run())