$PERL "$TOP_SRCDIR/bin/tests/system/stop.pl" "$SYSTESTDIR" "$@"
}
-send() {
- $PERL "$TOP_SRCDIR/bin/tests/system/send.pl" "$@"
-}
-
#
# Useful functions in test scripts
#
+++ /dev/null
-#!/usr/bin/perl
-
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-#
-# Send a file to a given address and port using TCP. Used for
-# configuring the test server in ans.pl.
-#
-
-use IO::File;
-use IO::Socket;
-
-@ARGV == 2 or die "usage: send.pl host port [file ...]\n";
-
-my $host = shift @ARGV;
-my $port = shift @ARGV;
-
-my $sock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $port,
- Proto => "tcp",) or die "$!";
-while (<>) {
- $sock->syswrite($_, length $_);
-}
-
-$sock->close;
+++ /dev/null
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-############################################################################
-#
-# This tool allows an arbitrary number of TCP connections to be made to the
-# specified service and to keep them open until told otherwise. It is
-# controlled by writing text commands to a TCP socket (default port: 5309).
-#
-# Currently supported commands:
-#
-# - open <COUNT> <HOST> <PORT>
-#
-# Opens <COUNT> TCP connections to <HOST>:<PORT> and keeps them open.
-# <HOST> must be an IP address (IPv4 or IPv6).
-#
-# - close <COUNT>
-#
-# Close the oldest <COUNT> previously established connections.
-#
-############################################################################
-
-from __future__ import print_function
-
-import datetime
-import errno
-import os
-import select
-import signal
-import socket
-import sys
-import time
-
-# Timeout for establishing all connections requested by a single 'open' command.
-OPEN_TIMEOUT = 2
-VERSION_QUERY = b"\x00\x1e\xaf\xb8\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07version\x04bind\x00\x00\x10\x00\x03"
-
-
-def log(msg):
- print(datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f ") + msg)
-
-
-def open_connections(active_conns, count, host, port):
- queued = []
- errors = []
-
- try:
- socket.inet_aton(host)
- family = socket.AF_INET
- except socket.error:
- family = socket.AF_INET6
-
- log(f"Opening {count} connections...")
-
- for _ in range(count):
- sock = socket.socket(family, socket.SOCK_STREAM)
- sock.setblocking(0)
- err = sock.connect_ex((host, port))
- if err not in (0, errno.EINPROGRESS):
- log(f"{errno.errorcode[err]} on connect for socket {sock}")
- errors.append(sock)
- else:
- queued.append(sock)
-
- start = time.monotonic()
- while queued:
- now = time.monotonic()
- time_left = OPEN_TIMEOUT - (now - start)
- if time_left <= 0:
- break
- _, wsocks, _ = select.select([], queued, [], time_left)
- for sock in wsocks:
- queued.remove(sock)
- err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
- if err:
- log(f"{errno.errorcode[err]} for socket {sock}")
- errors.append(sock)
- else:
- sock.send(VERSION_QUERY)
- active_conns.append(sock)
-
- if errors:
- log(f"result=FAIL: {len(errors)} connection(s) failed")
- elif queued:
- log(f"result=FAIL: Timed out, aborting {len(queued)} pending connections")
- for sock in queued:
- sock.close()
- else:
- log(f"result=OK: Successfully opened {count} connections")
-
-
-def close_connections(active_conns, count):
- log(f"Closing {'all' if count == 0 else count} connections...")
- if count == 0:
- count = len(active_conns)
- for _ in range(count):
- sock = active_conns.pop(0)
- sock.close()
- log(f"result=OK: Successfully closed {count} connections")
-
-
-def sigterm(*_):
- log("SIGTERM received, shutting down")
- os.remove("ans.pid")
- sys.exit(0)
-
-
-def main():
- active_conns = []
-
- signal.signal(signal.SIGTERM, sigterm)
-
- with open("ans.pid", "w", encoding="utf-8") as pidfile:
- print(os.getpid(), file=pidfile)
-
- listenip = "10.53.0.6"
- try:
- port = int(os.environ["CONTROLPORT"])
- except KeyError:
- port = 5309
-
- log(f"Listening on {listenip}:{port}")
-
- ctlsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- ctlsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- ctlsock.bind((listenip, port))
- ctlsock.listen(1)
-
- while True:
- clientsock, _ = ctlsock.accept()
- log(f"Accepted control connection from {clientsock}")
- cmdline = clientsock.recv(512).decode("ascii").strip()
- if cmdline:
- log(f"Received command: {cmdline}")
- cmd = cmdline.split()
- if cmd[0] == "open":
- count, host, port = cmd[1:]
- open_connections(active_conns, int(count), host, int(port))
- elif cmd[0] == "close":
- (count,) = cmd[1:]
- close_connections(active_conns, int(count))
- else:
- log("result=FAIL: Unknown command")
- clientsock.close()
-
-
-if __name__ == "__main__":
- main()
status=0
n=0
-# -------- TCP high-water tests ----------
-refresh_tcp_stats() {
- rndccmd 10.53.0.5 status >rndc.out.$n || ret=1
- TCP_CUR="$(sed -n "s/^tcp clients: \([0-9][0-9]*\).*/\1/p" rndc.out.$n)"
- TCP_LIMIT="$(sed -n "s/^tcp clients: .*\/\([0-9][0-9]*\)/\1/p" rndc.out.$n)"
- TCP_HIGH="$(sed -n "s/^TCP high-water: \([0-9][0-9]*\)/\1/p" rndc.out.$n)"
- REC_HIGH="$(sed -n "s/^recursive high-water: \([0-9][0-9]*\)/\1/p" rndc.out.$n)"
-}
-
-# Send a command to the tool script listening on 10.53.0.6.
-send_command() {
- nextpart ans6/ans.run >/dev/null
- echo "$*" | send 10.53.0.6 "${CONTROLPORT}"
- wait_for_log_peek 10 "result=" ans6/ans.run || ret=1
- if ! nextpartpeek ans6/ans.run | grep -qF "result=OK"; then
- return 1
- fi
-}
-
-# Instructs ans6 to open $1 TCP connections to 10.53.0.5.
-open_connections() {
- send_command "open" "${1}" 10.53.0.5 "${PORT}" || return 1
-}
-
-# Instructs ans6 to close $1 TCP connections to 10.53.0.5.
-close_connections() {
- send_command "close" "${1}" || return 1
-}
-
-# Check TCP connections are working normally before opening
-# multiple connections
-n=$((n + 1))
-echo_i "checking TCP query repsonse ($n)"
-ret=0
-dig_with_opts +tcp @10.53.0.5 txt.example >dig.out.test$n
-grep "status: NXDOMAIN" dig.out.test$n >/dev/null || ret=1
-if [ $ret != 0 ]; then echo_i "failed"; fi
-status=$((status + ret))
-
-# Check TCP statistics after server startup before using them as a baseline for
-# subsequent checks.
-n=$((n + 1))
-echo_i "TCP and recursive high-water: check initial statistics ($n)"
-ret=0
-refresh_tcp_stats
-assert_int_equal "${TCP_CUR}" 0 "current TCP clients count" || ret=1
-# We compare initial tcp-highwater value with 1 because as part of the
-# system test startup, the script start.pl executes dig to check if target
-# named is running, and that increments tcp-quota by one.
-assert_int_equal "${TCP_HIGH}" 1 "tcp-highwater count" || ret=1
-assert_int_equal "${REC_HIGH}" 1 "recursive-highwater count" || ret=1
-if [ $ret != 0 ]; then echo_i "failed"; fi
-status=$((status + ret))
-
-# Reset TCP high-water statistics
-n=$((n + 1))
-echo_i "TCP and recursive high-water: reset ($n)"
-ret=0
-rndccmd 10.53.0.5 reset-stats tcp-high-water recursive-high-water || ret=1
-if [ $ret != 0 ]; then echo_i "failed"; fi
-status=$((status + ret))
-
-# Check TCP statistics after reset
-n=$((n + 1))
-echo_i "TCP and recursive high-water: check statistics after reset ($n)"
-ret=0
-refresh_tcp_stats
-assert_int_equal "${TCP_CUR}" 0 "current TCP clients count" || ret=1
-assert_int_equal "${TCP_HIGH}" 0 "tcp-highwater count" || ret=1
-assert_int_equal "${REC_HIGH}" 0 "recursive-highwater count" || ret=1
-if [ $ret != 0 ]; then echo_i "failed"; fi
-status=$((status + ret))
-
-# Ensure the TCP high-water statistic gets updated after some TCP connections
-# are established, and recursive high-water is unchanged.
-n=$((n + 1))
-echo_i "TCP and recursive high-water: check values after some TCP and UDP connections are established ($n)"
-ret=0
-OLD_TCP_CUR="${TCP_CUR}"
-OLD_REC_HIGH="${REC_HIGH}"
-TCP_ADDED=9
-REC_ADDED=1
-dig_with_opts +udp @10.53.0.5 recurse.example >dig.out.test$n
-open_connections "${TCP_ADDED}" || ret=1
-check_stats_added() {
- refresh_tcp_stats
- assert_int_equal "${TCP_CUR}" $((OLD_TCP_CUR + TCP_ADDED)) "current TCP clients count" || return 1
- assert_int_equal "${TCP_HIGH}" $((OLD_TCP_CUR + TCP_ADDED)) "TCP high-water value" || return 1
- assert_int_equal "${REC_HIGH}" $((OLD_REC_HIGH + REC_ADDED)) "recursive high-water value" || return 1
-}
-retry 2 check_stats_added || ret=1
-if [ $ret != 0 ]; then echo_i "failed"; fi
-status=$((status + ret))
-
-# Ensure the TCP high-water statistic remains unchanged after some TCP
-# connections are closed.
-n=$((n + 1))
-echo_i "TCP high-water: check value after some TCP connections are closed ($n)"
-ret=0
-OLD_TCP_CUR="${TCP_CUR}"
-OLD_TCP_HIGH="${TCP_HIGH}"
-TCP_REMOVED=5
-close_connections "${TCP_REMOVED}" || ret=1
-check_stats_removed() {
- refresh_tcp_stats
- assert_int_equal "${TCP_CUR}" $((OLD_TCP_CUR - TCP_REMOVED)) "current TCP clients count" || return 1
- assert_int_equal "${TCP_HIGH}" "${OLD_TCP_HIGH}" "TCP high-water value" || return 1
-}
-retry 2 check_stats_removed || ret=1
-if [ $ret != 0 ]; then echo_i "failed"; fi
-status=$((status + ret))
-
-# Ensure the TCP high-water statistic never exceeds the configured TCP clients
-# limit.
-n=$((n + 1))
-echo_i "TCP high-water: ensure tcp-clients is an upper bound ($n)"
-ret=0
-open_connections $((TCP_LIMIT + 1)) || ret=1
-check_stats_limit() {
- refresh_tcp_stats
- assert_int_equal "${TCP_CUR}" "${TCP_LIMIT}" "current TCP clients count" || return 1
- assert_int_equal "${TCP_HIGH}" "${TCP_LIMIT}" "TCP high-water value" || return 1
-}
-retry 2 check_stats_limit || ret=1
-if [ $ret != 0 ]; then echo_i "failed"; fi
-status=$((status + ret))
-
-# Check TCP connections are working normally before opening
-# multiple connections
-n=$((n + 1))
-echo_i "checking TCP response recovery ($n)"
-ret=0
-# "0" closes all connections
-close_connections 0 || ret=1
-dig_with_opts +tcp @10.53.0.5 txt.example >dig.out.test$n || ret=1
-grep "status: NXDOMAIN" dig.out.test$n >/dev/null || ret=1
-if [ $ret != 0 ]; then echo_i "failed"; fi
-status=$((status + ret))
-
####################################################
# NOTE: The next test resets the debug level to 1. #
####################################################
pytestmark = pytest.mark.extra_artifacts(
[
"dig.out.*",
- "rndc.out.*",
- "ans*/ans.run",
- "ans*/ans.run.prev",
]
)
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
+from collections.abc import Iterable
+from types import TracebackType
+from typing import NamedTuple
+
+import asyncio
import socket
import struct
import time
TIMEOUT: int = 10
+class TcpStatus(NamedTuple):
+ current: int
+ limit: int
+ high_water: int
+ recursive_high_water: int | None
+
+ def check(
+ self,
+ *,
+ current: int | None = None,
+ limit: int | None = None,
+ high_water: int | None = None,
+ recursive_high_water: int | None = None,
+ ) -> None:
+ if current is not None:
+ assert (
+ self.current == current
+ ), f"current TCP clients count: expected {current}, got {self.current}"
+ if limit is not None:
+ assert (
+ self.limit == limit
+ ), f"TCP clients limit: expected {limit}, got {self.limit}"
+ if high_water is not None:
+ assert (
+ self.high_water == high_water
+ ), f"TCP high-water value: expected {high_water}, got {self.high_water}"
+ if recursive_high_water is not None:
+ assert self.recursive_high_water == recursive_high_water, (
+ "recursive high-water value: "
+ f"expected {recursive_high_water}, got {self.recursive_high_water}"
+ )
+
+
+class TcpConnectionPool:
+ OPEN_TIMEOUT = 2
+
+ def __init__(self) -> None:
+ self.connections: list[socket.socket] = []
+
+ async def __aenter__(self) -> "TcpConnectionPool":
+ return self
+
+ async def __aexit__(
+ self,
+ _exc_type: type[BaseException] | None,
+ _exc: BaseException | None,
+ _tb: TracebackType | None,
+ ) -> None:
+ await self.close()
+
+ async def open(self, count: int, host: str, port: int) -> None:
+ tasks = [
+ asyncio.create_task(open_tcp_query_connection(host, port))
+ for _ in range(count)
+ ]
+
+ try:
+ results = await asyncio.wait_for(
+ asyncio.gather(*tasks, return_exceptions=True),
+ timeout=self.OPEN_TIMEOUT,
+ )
+ except asyncio.TimeoutError as exc:
+ for task in tasks:
+ task.cancel()
+ results = await asyncio.gather(*tasks, return_exceptions=True)
+ await close_tcp_connections(
+ result for result in results if isinstance(result, socket.socket)
+ )
+ raise AssertionError(f"timed out opening {count} TCP connections") from exc
+
+ connections = [
+ result for result in results if isinstance(result, socket.socket)
+ ]
+ errors = [result for result in results if isinstance(result, BaseException)]
+ if errors:
+ await close_tcp_connections(connections)
+ raise AssertionError(
+ f"{len(errors)} TCP connection(s) failed: {errors[0]!r}"
+ )
+
+ self.connections.extend(connections)
+
+ async def close(self, count: int = 0) -> None:
+ if count == 0:
+ count = len(self.connections)
+
+ assert count <= len(
+ self.connections
+ ), f"cannot close {count} of {len(self.connections)} active connection(s)"
+
+ closing = self.connections[:count]
+ del self.connections[:count]
+ await close_tcp_connections(closing)
+
+
def create_socket(host: str, port: int) -> socket.socket:
sock = socket.create_connection((host, port), timeout=10)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
return last_count
+def tcp_status(ns: NamedInstance) -> TcpStatus:
+ status = ns.rndc("status").out
+
+ def value(label: str) -> str:
+ matches = status.grep(f"{label}:")
+ assert matches, f"'{label}' not found in rndc status:\n{status}"
+ line = matches[0].string.strip()
+ _, _, result = line.partition(":")
+ return result.strip()
+
+ current, limit = value("tcp clients").split("/", maxsplit=1)
+
+ return TcpStatus(
+ current=int(current),
+ limit=int(limit),
+ high_water=int(value("TCP high-water")),
+ recursive_high_water=int(value("recursive high-water")),
+ )
+
+
+def check_tcp_response(server_ip: str) -> None:
+ msg = isctest.query.create("txt.example.", "A")
+ response = isctest.query.tcp(msg, server_ip)
+ isctest.check.nxdomain(response)
+
+
def tcp_round_trip(
sock: socket.socket, msg: dns.message.Message
) -> dns.message.Message:
return response
+async def open_tcp_query_connection(host: str, port: int) -> socket.socket:
+ try:
+ socket.inet_pton(socket.AF_INET, host)
+ family = socket.AF_INET
+ except OSError:
+ family = socket.AF_INET6
+
+ sock = socket.socket(family, socket.SOCK_STREAM)
+ sock.setblocking(False)
+
+ try:
+ loop = asyncio.get_running_loop()
+ await loop.sock_connect(sock, (host, port))
+ msg = isctest.query.create(
+ "version.bind.", "TXT", "CH", dnssec=False, use_edns=False, ad=False
+ )
+ await loop.sock_sendall(sock, msg.to_wire(prepend_length=True))
+ except OSError:
+ sock.close()
+ raise
+
+ return sock
+
+
+async def close_tcp_connections(connections: Iterable[socket.socket]) -> None:
+ for sock in connections:
+ sock.close()
+ await asyncio.sleep(0)
+
+
+def wait_for_tcp_status(
+ ns: NamedInstance,
+ *,
+ current: int | None = None,
+ limit: int | None = None,
+ high_water: int | None = None,
+ recursive_high_water: int | None = None,
+ timeout: int = 2,
+ delay: int = 1,
+) -> TcpStatus:
+ status: TcpStatus | None = None
+
+ def check() -> bool:
+ nonlocal status
+ status = tcp_status(ns)
+ status.check(
+ current=current,
+ limit=limit,
+ high_water=high_water,
+ recursive_high_water=recursive_high_water,
+ )
+ return True
+
+ isctest.run.retry_with_timeout(check, timeout=timeout, delay=delay)
+ assert status is not None
+ return status
+
+
def test_tcp_garbage(named_port: int) -> None:
with create_socket("10.53.0.7", named_port) as sock:
msg = isctest.query.create(
ns2_tcp_after_forwarder = tcp_requests_received(ns2)
assert ns1_tcp_after_resolver == ns1_tcp_after_forwarder
assert ns2_tcp_after_resolver < ns2_tcp_after_forwarder
+
+
+def test_tcp_high_water(named_port: int, ns5: NamedInstance) -> None:
+ async def run() -> None:
+ async with TcpConnectionPool() as pool:
+ isctest.log.info("checking TCP query response")
+ check_tcp_response(ns5.ip)
+
+ isctest.log.info("TCP and recursive high-water: check initial statistics")
+ status = tcp_status(ns5)
+ # start.pl checks whether each named is running using dig over TCP,
+ # which increments both high-water counters once before tests run.
+ status.check(current=0, high_water=1, recursive_high_water=1)
+
+ isctest.log.info("TCP and recursive high-water: reset")
+ ns5.rndc("reset-stats tcp-high-water recursive-high-water")
+
+ isctest.log.info(
+ "TCP and recursive high-water: check statistics after reset"
+ )
+ status = tcp_status(ns5)
+ status.check(current=0, high_water=0, recursive_high_water=0)
+
+ isctest.log.info(
+ "TCP and recursive high-water: check values after some TCP "
+ "and UDP connections are established"
+ )
+ old_status = status
+ tcp_added = 9
+ rec_added = 1
+ msg = isctest.query.create("recurse.example.", "A")
+ isctest.query.udp(msg, ns5.ip)
+ await pool.open(tcp_added, ns5.ip, named_port)
+ status = wait_for_tcp_status(
+ ns5,
+ current=old_status.current + tcp_added,
+ high_water=old_status.current + tcp_added,
+ recursive_high_water=old_status.recursive_high_water + rec_added,
+ )
+
+ isctest.log.info(
+ "TCP high-water: check value after some TCP connections are closed"
+ )
+ old_status = status
+ tcp_removed = 5
+ await pool.close(tcp_removed)
+ status = wait_for_tcp_status(
+ ns5,
+ current=old_status.current - tcp_removed,
+ high_water=old_status.high_water,
+ )
+
+ isctest.log.info("TCP high-water: ensure tcp-clients is an upper bound")
+ await pool.open(status.limit + 1, ns5.ip, named_port)
+ wait_for_tcp_status(
+ ns5,
+ current=status.limit,
+ high_water=status.limit,
+ )
+
+ isctest.log.info("checking TCP response recovery")
+ await pool.close()
+ check_tcp_response(ns5.ip)
+
+ asyncio.run(run())