]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Reimplement the FORMERR system test in Python
authorŠtěpán Balážik <stepan@isc.org>
Wed, 29 Apr 2026 07:52:23 +0000 (09:52 +0200)
committerŠtěpán Balážik <stepan@isc.org>
Wed, 29 Apr 2026 08:36:45 +0000 (10:36 +0200)
Replace the shell and Perl based FORMERR system test with a Python
test that constructs the malformed DNS packets directly and checks
the responses.

Remove the legacy shell and Perl test script and the intermediate
packet files in hex, leaving the packet construction inline in
tests_formerr.py.

Preserve the same wire for all packets sent to the server, but
construct them in a more explicit and readable way.

bin/tests/system/formerr/formerr.pl [deleted file]
bin/tests/system/formerr/tests.sh [deleted file]
bin/tests/system/formerr/tests_formerr.py [new file with mode: 0644]
bin/tests/system/formerr/tests_sh_formerr.py [deleted file]

diff --git a/bin/tests/system/formerr/formerr.pl b/bin/tests/system/formerr/formerr.pl
deleted file mode 100644 (file)
index fd7d298..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-#!/usr/bin/perl
-
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0.  If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-# This is a tool for sending an arbitrary packet via UDP or TCP to an
-# arbitrary address and port.  The packet is specified in a file or on
-# the standard input, in the form of a series of bytes in hexadecimal.
-# Whitespace is ignored, as is anything following a '#' symbol.
-#
-# For example, the following input would generate normal query for
-# isc.org/NS/IN":
-#
-#     # QID:
-#     0c d8
-#     # header:
-#     01 00 00 01 00 00 00 00 00 00
-#     # qname isc.org:
-#     03 69 73 63 03 6f 72 67 00
-#     # qtype NS:
-#     00 02
-#     # qclass IN:
-#     00 01
-#
-# Note that we do not wait for a response for the server.  This is simply
-# a way of injecting arbitrary packets to test server resposnes.
-#
-# Usage: packet.pl [-a <address>] [-p <port>] [-t (udp|tcp)] [filename]
-#
-# If not specified, address defaults to 127.0.0.1, port to 53, protocol
-# to udp, and file to stdin.
-#
-# XXX: Doesn't support IPv6 yet
-
-require 5.006_001;
-
-use strict;
-use Getopt::Std;
-use IO::File;
-use IO::Socket;
-
-sub usage {
-    print ("Usage: packet.pl [-a address] [-p port] [file]\n");
-    exit 1;
-}
-
-my %options={};
-getopts("a:p:", \%options);
-
-my $addr = "127.0.0.1";
-$addr = $options{a} if defined $options{a};
-
-my $port = 53;
-$port = $options{p} if defined $options{p};
-
-my $file = "STDIN";
-if (@ARGV >= 1) {
-    my $filename = shift @ARGV;
-    open FH, "<$filename" or die "$filename: $!";
-    $file = "FH";
-}
-
-my $input = "";
-while (defined(my $line = <$file>) ) {
-    chomp $line;
-    $line =~ s/#.*$//;
-    $input .= $line;
-}
-
-$input =~ s/\s+//g;
-my $data = pack("H*", $input);
-my $len = length $data;
-
-my $output = unpack("H*", $data);
-print ("sending: $output\n");
-
-my $sock = IO::Socket::INET->new(PeerAddr => $addr, PeerPort => $port,
-                                Proto => "tcp") or die "$!";
-
-my $bytes;
-$bytes = $sock->syswrite(pack("n", $len), 2);
-$bytes = $sock->syswrite($data, $len);
-$bytes = $sock->sysread($data, 2);
-$len = unpack("n", $data);
-$bytes = $sock->sysread($data, $len);
-print "got: ", unpack("H*", $data). "\n";
-
-$sock->close;
-close $file;
diff --git a/bin/tests/system/formerr/tests.sh b/bin/tests/system/formerr/tests.sh
deleted file mode 100644 (file)
index a30198d..0000000
+++ /dev/null
@@ -1,177 +0,0 @@
-#!/bin/sh
-
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0.  If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-set -e
-
-. ../conf.sh
-
-status=0
-
-echo_i "test name too long"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} nametoolong >nametoolong.out
-ans=$(grep got: nametoolong.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$((status + 1))
-fi
-
-echo_i "two question names"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} twoquestionnames >twoquestionnames.out
-ans=$(grep got: twoquestionnames.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$((status + 1))
-fi
-
-echo_i "two question types"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} twoquestiontypes >twoquestiontypes.out
-ans=$(grep got: twoquestiontypes.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$((status + 1))
-fi
-
-echo_i "duplicate questions"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} dupquestion >dupquestion.out
-ans=$(grep got: dupquestion.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "duplicate answer"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} dupans >dupans.out
-ans=$(grep got: dupans.out)
-if [ "${ans}" != "got: 0000800100010000000000000000060001" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "question only type in answer"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} qtypeasanswer >qtypeasanswer.out
-ans=$(grep got: qtypeasanswer.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-# this would be NOERROR if it included a COOKIE option,
-# but is a FORMERR without one.
-echo_i "empty question section (and no COOKIE option)"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} noquestions >noquestions.out
-ans=$(grep got: noquestions.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$((status + 1))
-fi
-
-echo_i "bad nsec3 owner"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} badnsec3owner >badnsec3owner.out
-ans=$(grep got: badnsec3owner.out)
-# SERVFAIL (2) rather than FORMERR (1)
-if [ "${ans}" != "got: 0008800200010000000000000000010001" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "short record before rdata "
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} shortrecord >shortrecord.out
-ans=$(grep got: shortrecord.out)
-if [ "${ans}" != "got: 000980010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "short question"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} shortquestion >shortquestion.out
-ans=$(grep got: shortquestion.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "mismatch classes in question section"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} questionclass >questionclass.out
-ans=$(grep got: questionclass.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "bad record owner name"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} badrecordname >badrecordname.out
-ans=$(grep got: badrecordname.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "mismatched class in record"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} wrongclass >wrongclass.out
-ans=$(grep got: wrongclass.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "mismatched KEY class"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} keyclass >keyclass.out
-ans=$(grep got: keyclass.out)
-if [ "${ans}" != "got: 0000800100010000000000000000010001" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "OPT wrong owner name"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} optwrongname >optwrongname.out
-ans=$(grep got: optwrongname.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "RRSIG invalid covers"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} malformedrrsig >malformedrrsig.out
-ans=$(grep got: malformedrrsig.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "UPDATE malformed delete type"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} malformeddeltype >malformeddeltype.out
-ans=$(grep got: malformeddeltype.out)
-if [ "${ans}" != "got: 0000a8010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "TSIG wrong class"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} tsigwrongclass >tsigwrongclass.out
-ans=$(grep got: tsigwrongclass.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "TSIG not last"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} tsignotlast >tsignotlast.out
-ans=$(grep got: tsignotlast.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
-  echo_i "failed"
-  status=$(expr $status + 1)
-fi
-
-echo_i "exit status: $status"
-
-[ $status -eq 0 ] || exit 1
diff --git a/bin/tests/system/formerr/tests_formerr.py b/bin/tests/system/formerr/tests_formerr.py
new file mode 100644 (file)
index 0000000..5c41251
--- /dev/null
@@ -0,0 +1,556 @@
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0.  If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+from pathlib import Path
+from typing import SupportsInt
+
+import socket
+import time
+
+import dns.flags
+import dns.message
+import dns.name
+import dns.opcode
+import dns.query
+import dns.rcode
+import dns.rdataclass
+import dns.rdatatype
+import dns.rdtypes.ANY.NSEC3
+import dns.rdtypes.ANY.OPT
+import dns.rdtypes.ANY.RRSIG
+import dns.rdtypes.ANY.SOA
+import dns.rdtypes.ANY.TSIG
+import dns.rdtypes.IN.A
+import pytest
+
+
+def wire(*parts: bytes) -> bytes:
+    return b"".join(parts)
+
+
+def u16(value: SupportsInt) -> bytes:
+    return int(value).to_bytes(2, byteorder="big")
+
+
+def u32(value: int) -> bytes:
+    return value.to_bytes(4, byteorder="big")
+
+
+def name_wire(name: str) -> bytes:
+    return dns.name.from_text(name).to_wire()
+
+
+def root_wire() -> bytes:
+    return dns.name.root.to_wire()
+
+
+def dns_header(
+    message_id: int = 0,
+    flags: int = 0,
+    opcode: int = dns.opcode.QUERY,
+    rcode: dns.rcode.Rcode = dns.rcode.NOERROR,
+    qdcount: int = 0,
+    ancount: int = 0,
+    nscount: int = 0,
+    arcount: int = 0,
+) -> bytes:
+    return wire(
+        u16(message_id),
+        u16(flags | dns.opcode.to_flags(opcode) | int(rcode)),
+        u16(qdcount),
+        u16(ancount),
+        u16(nscount),
+        u16(arcount),
+    )
+
+
+def formerr_response_header(
+    message_id: int = 0,
+    opcode: int = dns.opcode.QUERY,
+    rcode: dns.rcode.Rcode = dns.rcode.FORMERR,
+    qdcount: int = 0,
+    ancount: int = 0,
+    nscount: int = 0,
+    arcount: int = 0,
+) -> bytes:
+    return dns_header(
+        message_id=message_id,
+        flags=dns.flags.QR,
+        opcode=opcode,
+        rcode=rcode,
+        qdcount=qdcount,
+        ancount=ancount,
+        nscount=nscount,
+        arcount=arcount,
+    )
+
+
+def question_wire(
+    qname_wire: bytes,
+    qtype: dns.rdatatype.RdataType = dns.rdatatype.RdataType.A,
+    qclass: dns.rdataclass.RdataClass = dns.rdataclass.RdataClass.IN,
+) -> bytes:
+    return wire(
+        qname_wire,
+        u16(qtype),
+        u16(qclass),
+    )
+
+
+def rr_wire(
+    owner: bytes,
+    rrtype: SupportsInt,
+    rrclass: SupportsInt,
+    *,
+    ttl: int,
+    rdata: bytes = b"",
+) -> bytes:
+    return wire(
+        question_wire(
+            owner, dns.rdatatype.RdataType(rrtype), dns.rdataclass.RdataClass(rrclass)
+        ),
+        u32(ttl),
+        u16(len(rdata)),
+        rdata,
+    )
+
+
+def oversized_name_wire() -> bytes:
+    labels = [bytes([15]) + b"A" * 15 for _ in range(15)]
+    labels.append(bytes([14]) + b"A" * 14 + root_wire())
+    return wire(*labels)
+
+
+def soa_rr(
+    *,
+    expire: int,
+    minimum: int,
+) -> bytes:
+    return rr_wire(
+        root_wire(),
+        dns.rdatatype.RdataType.SOA,
+        dns.rdataclass.RdataClass.IN,
+        ttl=1,
+        rdata=dns.rdtypes.ANY.SOA.SOA(
+            dns.rdataclass.RdataClass.IN,
+            dns.rdatatype.RdataType.SOA,
+            dns.name.root,
+            dns.name.root,
+            1,
+            2,
+            3,
+            expire,
+            minimum,
+        ).to_wire(),
+    )
+
+
+def nsec3_rr(
+    *,
+    owner: bytes,
+) -> bytes:
+    return rr_wire(
+        owner,
+        dns.rdatatype.RdataType.NSEC3,
+        dns.rdataclass.RdataClass.IN,
+        ttl=1,
+        rdata=dns.rdtypes.ANY.NSEC3.NSEC3(
+            dns.rdataclass.RdataClass.IN,
+            dns.rdatatype.RdataType.NSEC3,
+            algorithm=240,
+            flags=0,
+            iterations=0,
+            salt=b"",
+            next=b"\xff",
+            windows=[],
+        ).to_wire(),
+    )
+
+
+def key_rdata(
+    *,
+    flags: int,
+    protocol: int,
+    algorithm: int,
+    keydata: bytes,
+) -> bytes:
+    # No dns.rdtypes.ANY.KEY.KEY class, so construct the rdata manually
+    return wire(u16(flags), bytes([protocol, algorithm]), keydata)
+
+
+def key_rr(*, rdclass: dns.rdataclass.RdataClass) -> bytes:
+    return rr_wire(
+        root_wire(),
+        dns.rdatatype.RdataType.KEY,
+        rdclass,
+        ttl=1,
+        rdata=key_rdata(
+            flags=0,
+            protocol=0,
+            algorithm=248,
+            keydata=b"\x00",
+        ),
+    )
+
+
+def malformed_rrsig_rr() -> bytes:
+    return rr_wire(
+        root_wire(),
+        dns.rdatatype.RdataType.RRSIG,
+        dns.rdataclass.RdataClass.IN,
+        ttl=1,
+        rdata=dns.rdtypes.ANY.RRSIG.RRSIG(
+            dns.rdataclass.RdataClass.IN,
+            dns.rdatatype.RdataType.RRSIG,
+            0,
+            240,
+            0,
+            1,
+            2,
+            3,
+            0,
+            dns.name.root,
+            b"\x00",
+        ).to_wire(),
+    )
+
+
+def tsig_rr(
+    *,
+    owner: bytes = root_wire(),
+    rdclass: dns.rdataclass.RdataClass = dns.rdataclass.RdataClass.ANY,
+    algorithm: dns.name.Name = dns.name.root,
+    time_signed: int = 0x010203040506,
+    fudge: int = 0x0102,
+    mac: bytes = b"\x00",
+    original_id: int = 0,
+    error: int = 0,
+    other: bytes = b"",
+) -> bytes:
+    return rr_wire(
+        owner,
+        dns.rdatatype.RdataType.TSIG,
+        rdclass,
+        ttl=1,
+        rdata=dns.rdtypes.ANY.TSIG.TSIG(
+            rdclass,
+            dns.rdatatype.RdataType.TSIG,
+            algorithm,
+            time_signed,
+            fudge,
+            mac,
+            original_id,
+            error,
+            other,
+        ).to_wire(),
+    )
+
+
+def opt_rr(*, owner: bytes) -> bytes:
+    return rr_wire(
+        owner,
+        dns.rdatatype.RdataType.OPT,
+        dns.rdataclass.RdataClass.IN,
+        ttl=0,
+        rdata=dns.rdtypes.ANY.OPT.OPT(
+            dns.rdataclass.RdataClass.IN,
+            dns.rdatatype.RdataType.OPT,
+            [],
+        ).to_wire(),
+    )
+
+
+def a_rdata(ipv4_bytes: bytes = b"\x00\x00\x00\x00") -> bytes:
+    return dns.rdtypes.IN.A.A(
+        dns.rdataclass.RdataClass.IN,
+        dns.rdatatype.RdataType.A,
+        ipv4_bytes,
+    ).to_wire()
+
+
+def a_rr(owner: bytes = root_wire()) -> bytes:
+    return rr_wire(
+        owner,
+        dns.rdatatype.RdataType.A,
+        dns.rdataclass.RdataClass.IN,
+        ttl=1,
+        rdata=a_rdata(),
+    )
+
+
+def query_raw_tcp(host: str, port: int, packet_wire: bytes) -> dns.message.Message:
+    with socket.create_connection((host, port), timeout=10) as sock:
+        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
+        sock.sendall(u16(len(packet_wire)) + packet_wire)
+        response, _ = dns.query.receive_tcp(sock, time.time() + 10)
+    return response
+
+
+PACKET_DIR = Path(__file__).parent
+
+
+def parse_packet_file(packet_name: str) -> bytes:
+    text = (PACKET_DIR / packet_name).read_text(encoding="utf-8")
+    hex_bytes = []
+    for line in text.splitlines():
+        line = line.split("#", 1)[0].strip()
+        if line:
+            hex_bytes.append(line)
+    return bytes.fromhex(" ".join(hex_bytes))
+
+
+def validate_packet(packet_name: str, generated_wire: bytes) -> None:
+    assert generated_wire == parse_packet_file(packet_name)
+
+
+@pytest.mark.parametrize(
+    "packet_name,query_wire,expected_wire",
+    [
+        pytest.param(
+            "nametoolong",
+            wire(
+                dns_header(qdcount=1),
+                question_wire(oversized_name_wire()),
+            ),
+            formerr_response_header(),
+            id="nametoolong",
+        ),
+        pytest.param(
+            "twoquestionnames",
+            wire(
+                dns_header(qdcount=2),
+                question_wire(name_wire("AAAAAAAAAAAAAA."), dns.rdatatype.RdataType.A),
+                # Two names concatenated in the QNAME field
+                question_wire(
+                    wire(name_wire("AAAAAAAAAAAAAA."), name_wire("AAAAAAAAAAAAAB.")),
+                    dns.rdatatype.RdataType.A,
+                ),
+            ),
+            formerr_response_header(),
+            id="twoquestionnames",
+        ),
+        pytest.param(
+            "twoquestiontypes",
+            wire(
+                dns_header(qdcount=2),
+                question_wire(name_wire("AAAAAAAAAAAAAA."), dns.rdatatype.RdataType.A),
+                question_wire(name_wire("AAAAAAAAAAAAAA."), dns.rdatatype.RdataType.NS),
+            ),
+            formerr_response_header(),
+            id="twoquestiontypes",
+        ),
+        pytest.param(
+            "dupquestion",
+            wire(
+                dns_header(qdcount=2),
+                question_wire(name_wire("AAAAAAAAAAAAAA."), dns.rdatatype.RdataType.A),
+                question_wire(name_wire("AAAAAAAAAAAAAA."), dns.rdatatype.RdataType.A),
+            ),
+            formerr_response_header(),
+            id="dupquestion",
+        ),
+        pytest.param(
+            "dupans",
+            wire(
+                dns_header(qdcount=1, ancount=2),
+                question_wire(root_wire(), dns.rdatatype.RdataType.SOA),
+                soa_rr(expire=4, minimum=5),
+                soa_rr(expire=4, minimum=6),
+            ),
+            wire(
+                formerr_response_header(qdcount=1),
+                question_wire(root_wire(), dns.rdatatype.RdataType.SOA),
+            ),
+            id="dupans",
+        ),
+        pytest.param(
+            "qtypeasanswer",
+            wire(
+                dns_header(ancount=1),
+                rr_wire(
+                    root_wire(),
+                    dns.rdatatype.RdataType.MAILB,
+                    dns.rdataclass.RdataClass.IN,
+                    ttl=1,
+                ),
+            ),
+            formerr_response_header(),
+            id="qtypeasanswer",
+        ),
+        pytest.param(
+            "noquestions",
+            dns_header(),
+            # This would be NOERROR if it included a COOKIE option,
+            # but is a FORMERR without one.
+            formerr_response_header(),
+            id="noquestions",
+        ),
+        pytest.param(
+            "badnsec3owner",
+            wire(
+                dns_header(message_id=8, qdcount=1, nscount=1),
+                question_wire(root_wire(), dns.rdatatype.RdataType.A),
+                # Bad NSEC3 owner: X. is not in the base32hex alphabet.
+                nsec3_rr(owner=name_wire("X.")),
+            ),
+            wire(
+                formerr_response_header(
+                    message_id=8, rcode=dns.rcode.SERVFAIL, qdcount=1
+                ),
+                question_wire(root_wire(), dns.rdatatype.RdataType.A),
+            ),
+            id="badnsec3owner",
+        ),
+        pytest.param(
+            "shortrecord",
+            wire(
+                dns_header(message_id=9, arcount=1),
+                # Truncated A record (no ttl, length or data)
+                question_wire(root_wire(), dns.rdatatype.RdataType.A),
+            ),
+            formerr_response_header(message_id=9),
+            id="shortrecord",
+        ),
+        pytest.param(
+            "shortquestion",
+            wire(
+                dns_header(qdcount=1),
+                # Truncated question (no class)
+                root_wire(),
+                u16(dns.rdatatype.RdataType.A),
+            ),
+            formerr_response_header(),
+            id="shortquestion",
+        ),
+        pytest.param(
+            "questionclass",
+            wire(
+                dns_header(qdcount=2),
+                question_wire(
+                    root_wire(),
+                    dns.rdatatype.RdataType.A,
+                    dns.rdataclass.RdataClass.IN,
+                ),
+                question_wire(
+                    root_wire(),
+                    dns.rdatatype.RdataType.A,
+                    dns.rdataclass.RdataClass(2),
+                ),
+            ),
+            formerr_response_header(),
+            id="questionclass",
+        ),
+        pytest.param(
+            "badrecordname",
+            wire(
+                dns_header(arcount=1),
+                a_rr(owner=oversized_name_wire()),
+            ),
+            formerr_response_header(),
+            id="badrecordname",
+        ),
+        pytest.param(
+            "wrongclass",
+            wire(
+                dns_header(arcount=2),
+                a_rr(),
+                rr_wire(
+                    root_wire(),
+                    dns.rdatatype.RdataType(65280),
+                    dns.rdataclass.RdataClass(256),
+                    ttl=33554433,
+                    rdata=a_rdata(),
+                ),
+            ),
+            formerr_response_header(),
+            id="wrongclass",
+        ),
+        pytest.param(
+            "keyclass",
+            wire(
+                dns_header(qdcount=1, arcount=1),
+                question_wire(root_wire(), dns.rdatatype.RdataType.A),
+                key_rr(rdclass=dns.rdataclass.RdataClass(2)),
+            ),
+            wire(
+                formerr_response_header(qdcount=1),
+                question_wire(root_wire(), dns.rdatatype.RdataType.A),
+            ),
+            id="keyclass",
+        ),
+        pytest.param(
+            "optwrongname",
+            wire(
+                dns_header(arcount=1),
+                # OPT owner should be root
+                opt_rr(owner=name_wire("A.")),
+            ),
+            formerr_response_header(),
+            id="optwrongname",
+        ),
+        pytest.param(
+            "malformedrrsig",
+            wire(
+                dns_header(arcount=1),
+                malformed_rrsig_rr(),
+            ),
+            formerr_response_header(),
+            id="malformedrrsig",
+        ),
+        pytest.param(
+            "malformeddeltype",
+            wire(
+                dns_header(opcode=dns.opcode.UPDATE, nscount=1),
+                rr_wire(
+                    root_wire(),
+                    dns.rdatatype.RdataType.A,
+                    dns.rdataclass.RdataClass.ANY,
+                    ttl=0,
+                    # Non-empty rdata for DELETE type
+                    rdata=b"\x00",
+                ),
+            ),
+            formerr_response_header(opcode=dns.opcode.UPDATE),
+            id="malformeddeltype",
+        ),
+        pytest.param(
+            "tsigwrongclass",
+            wire(
+                dns_header(arcount=1),
+                # Class should be ANY not IN
+                tsig_rr(rdclass=dns.rdataclass.RdataClass.IN),
+            ),
+            formerr_response_header(),
+            id="tsigwrongclass",
+        ),
+        pytest.param(
+            "tsignotlast",
+            wire(
+                dns_header(arcount=2),
+                tsig_rr(),
+                # TSIG should be the last record
+                a_rr(),
+            ),
+            formerr_response_header(),
+            id="tsignotlast",
+        ),
+    ],
+)
+def test_formerr(
+    packet_name: str,
+    query_wire: bytes,
+    expected_wire: bytes,
+    named_port: int,
+    ns1,
+) -> None:
+    validate_packet(packet_name, query_wire)
+    response = query_raw_tcp(ns1.ip, named_port, query_wire)
+    assert response.to_wire() == expected_wire
diff --git a/bin/tests/system/formerr/tests_sh_formerr.py b/bin/tests/system/formerr/tests_sh_formerr.py
deleted file mode 100644 (file)
index a091546..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0.  If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-import pytest
-
-pytestmark = pytest.mark.extra_artifacts(
-    [
-        "badnsec3owner.out",
-        "badrecordname.out",
-        "dupans.out",
-        "dupquestion.out",
-        "keyclass.out",
-        "malformeddeltype.out",
-        "malformedrrsig.out",
-        "nametoolong.out",
-        "noquestions.out",
-        "optwrongname.out",
-        "qtypeasanswer.out",
-        "questionclass.out",
-        "shortquestion.out",
-        "shortrecord.out",
-        "tsignotlast.out",
-        "tsigwrongclass.out",
-        "twoquestionnames.out",
-        "twoquestiontypes.out",
-        "wrongclass.out",
-    ]
-)
-
-
-def test_formerr(run_tests_sh):
-    run_tests_sh()