From: Štěpán Balážik Date: Tue, 10 Jun 2025 22:33:39 +0000 (+0200) Subject: Use isctest.asyncserver in the "tsig" test X-Git-Tag: v9.21.11~47^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e34e831cabe655c49151e846f427a2ad7293be87;p=thirdparty%2Fbind9.git Use isctest.asyncserver in the "tsig" test Replace the custom DNS server used in the "tsig" system test with new code based on the isctest.asyncserver module. --- diff --git a/bin/tests/system/tsig/ans2/ans.pl b/bin/tests/system/tsig/ans2/ans.pl deleted file mode 100644 index 09ab29b949c..00000000000 --- a/bin/tests/system/tsig/ans2/ans.pl +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (C) Internet Systems Consortium, Inc. ("ISC") -# -# SPDX-License-Identifier: MPL-2.0 -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, you can obtain one at https://mozilla.org/MPL/2.0/. -# -# See the COPYRIGHT file distributed with this work for additional -# information regarding copyright ownership. - -# -# An adhoc server that returns a TC=1 response with the final byte -# removed to generate UNEXPECTEDEND form dns_message_parse. -# - -use IO::File; -use IO::Socket; - -my $localport = int($ENV{'PORT'}); -if (!$localport) { $localport = 5300; } -printf "localport %u\n", $localport; - -my $sock = IO::Socket::INET->new(LocalAddr => "10.53.0.2", - LocalPort => $localport, Proto => "udp") or die "$!"; - -my $pidf = new IO::File "ans.pid", "w" or die "cannot open pid file: $!"; -print $pidf "$$\n" or die "cannot write pid file: $!"; -$pidf->close or die "cannot close pid file: $!"; -sub rmpid { unlink "ans.pid"; exit 1; }; - -$SIG{INT} = \&rmpid; -$SIG{TERM} = \&rmpid; - -sub arraystring { - my $string = join("", @_); - return $string; -} - -for (;;) { - $from = $sock->recv($buf, 512); - ($port, $ip_address) = unpack_sockaddr_in($from); - $l = length($buf); - printf "received %u bytes from %s#%u\n", $l, inet_ntoa($ip_address), $port; - @up = unpack("C[$l]", $buf); - $up[2] |= 0x80; # QR - $up[2] |= 0x02; # TC - $up[3] |= 0x80; # RA - $l -= 1; # truncate the response 1 byte - $replydata = pack("C[$l]", @up); - printf "sent %u bytes\n", $sock->send($replydata); -} diff --git a/bin/tests/system/tsig/ans2/ans.py b/bin/tests/system/tsig/ans2/ans.py new file mode 100644 index 00000000000..65548e69ef3 --- /dev/null +++ b/bin/tests/system/tsig/ans2/ans.py @@ -0,0 +1,49 @@ +""" +Copyright (C) Internet Systems Consortium, Inc. ("ISC") + +SPDX-License-Identifier: MPL-2.0 + +This Source Code Form is subject to the terms of the Mozilla Public +License, v. 2.0. If a copy of the MPL was not distributed with this +file, you can obtain one at https://mozilla.org/MPL/2.0/. + +See the COPYRIGHT file distributed with this work for additional +information regarding copyright ownership. +""" + +from typing import AsyncGenerator + +import dns.flags + +from isctest.asyncserver import ( + AsyncDnsServer, + BytesResponseSend, + QueryContext, + ResponseHandler, +) + + +class TruncatedWithLastByteDroppedHandler(ResponseHandler): + """ + Return a TC=1 response with the final byte removed to make + dns_message_parse() return ISC_R_UNEXPECTEDEND. + """ + + async def get_responses( + self, qctx: QueryContext + ) -> AsyncGenerator[BytesResponseSend, None]: + tc_response = qctx.query + tc_response.flags |= dns.flags.QR + tc_response.flags |= dns.flags.TC + tc_response.flags |= dns.flags.RA + yield BytesResponseSend(tc_response.to_wire()[:-1]) + + +def main() -> None: + server = AsyncDnsServer(acknowledge_tsig_dnspython_hacks=True) + server.install_response_handler(TruncatedWithLastByteDroppedHandler()) + server.run() + + +if __name__ == "__main__": + main() diff --git a/bin/tests/system/tsig/tests_sh_tsig.py b/bin/tests/system/tsig/tests_sh_tsig.py index 4c0a800bb59..23dad0ada48 100644 --- a/bin/tests/system/tsig/tests_sh_tsig.py +++ b/bin/tests/system/tsig/tests_sh_tsig.py @@ -21,6 +21,9 @@ pytestmark = pytest.mark.extra_artifacts( ] ) +# TSIG queries not supported by isctest.asyncserver with old dnspython +pytest.importorskip("dns", minversion="2.0.0") + def test_tsig(run_tests_sh): run_tests_sh()