+++ /dev/null
-#!/usr/bin/perl
-
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-# This is a tool for sending an arbitrary packet via UDP or TCP to an
-# arbitrary address and port. The packet is specified in a file or on
-# the standard input, in the form of a series of bytes in hexadecimal.
-# Whitespace is ignored, as is anything following a '#' symbol.
-#
-# For example, the following input would generate normal query for
-# isc.org/NS/IN":
-#
-# # QID:
-# 0c d8
-# # header:
-# 01 00 00 01 00 00 00 00 00 00
-# # qname isc.org:
-# 03 69 73 63 03 6f 72 67 00
-# # qtype NS:
-# 00 02
-# # qclass IN:
-# 00 01
-#
-# Note that we do not wait for a response for the server. This is simply
-# a way of injecting arbitrary packets to test server resposnes.
-#
-# Usage: packet.pl [-a <address>] [-p <port>] [-t (udp|tcp)] [filename]
-#
-# If not specified, address defaults to 127.0.0.1, port to 53, protocol
-# to udp, and file to stdin.
-#
-# XXX: Doesn't support IPv6 yet
-
-require 5.006_001;
-
-use strict;
-use Getopt::Std;
-use IO::File;
-use IO::Socket;
-
-sub usage {
- print ("Usage: packet.pl [-a address] [-p port] [file]\n");
- exit 1;
-}
-
-my %options={};
-getopts("a:p:", \%options);
-
-my $addr = "127.0.0.1";
-$addr = $options{a} if defined $options{a};
-
-my $port = 53;
-$port = $options{p} if defined $options{p};
-
-my $file = "STDIN";
-if (@ARGV >= 1) {
- my $filename = shift @ARGV;
- open FH, "<$filename" or die "$filename: $!";
- $file = "FH";
-}
-
-my $input = "";
-while (defined(my $line = <$file>) ) {
- chomp $line;
- $line =~ s/#.*$//;
- $input .= $line;
-}
-
-$input =~ s/\s+//g;
-my $data = pack("H*", $input);
-my $len = length $data;
-
-my $output = unpack("H*", $data);
-print ("sending: $output\n");
-
-my $sock = IO::Socket::INET->new(PeerAddr => $addr, PeerPort => $port,
- Proto => "tcp") or die "$!";
-
-my $bytes;
-$bytes = $sock->syswrite(pack("n", $len), 2);
-$bytes = $sock->syswrite($data, $len);
-$bytes = $sock->sysread($data, 2);
-$len = unpack("n", $data);
-$bytes = $sock->sysread($data, $len);
-print "got: ", unpack("H*", $data). "\n";
-
-$sock->close;
-close $file;
+++ /dev/null
-#!/bin/sh
-
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-set -e
-
-. ../conf.sh
-
-status=0
-
-echo_i "test name too long"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} nametoolong >nametoolong.out
-ans=$(grep got: nametoolong.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$((status + 1))
-fi
-
-echo_i "two question names"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} twoquestionnames >twoquestionnames.out
-ans=$(grep got: twoquestionnames.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$((status + 1))
-fi
-
-echo_i "two question types"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} twoquestiontypes >twoquestiontypes.out
-ans=$(grep got: twoquestiontypes.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$((status + 1))
-fi
-
-echo_i "duplicate questions"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} dupquestion >dupquestion.out
-ans=$(grep got: dupquestion.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "duplicate answer"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} dupans >dupans.out
-ans=$(grep got: dupans.out)
-if [ "${ans}" != "got: 0000800100010000000000000000060001" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "question only type in answer"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} qtypeasanswer >qtypeasanswer.out
-ans=$(grep got: qtypeasanswer.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-# this would be NOERROR if it included a COOKIE option,
-# but is a FORMERR without one.
-echo_i "empty question section (and no COOKIE option)"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} noquestions >noquestions.out
-ans=$(grep got: noquestions.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$((status + 1))
-fi
-
-echo_i "bad nsec3 owner"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} badnsec3owner >badnsec3owner.out
-ans=$(grep got: badnsec3owner.out)
-# SERVFAIL (2) rather than FORMERR (1)
-if [ "${ans}" != "got: 0008800200010000000000000000010001" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "short record before rdata "
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} shortrecord >shortrecord.out
-ans=$(grep got: shortrecord.out)
-if [ "${ans}" != "got: 000980010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "short question"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} shortquestion >shortquestion.out
-ans=$(grep got: shortquestion.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "mismatch classes in question section"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} questionclass >questionclass.out
-ans=$(grep got: questionclass.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "bad record owner name"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} badrecordname >badrecordname.out
-ans=$(grep got: badrecordname.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "mismatched class in record"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} wrongclass >wrongclass.out
-ans=$(grep got: wrongclass.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "mismatched KEY class"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} keyclass >keyclass.out
-ans=$(grep got: keyclass.out)
-if [ "${ans}" != "got: 0000800100010000000000000000010001" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "OPT wrong owner name"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} optwrongname >optwrongname.out
-ans=$(grep got: optwrongname.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "RRSIG invalid covers"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} malformedrrsig >malformedrrsig.out
-ans=$(grep got: malformedrrsig.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "UPDATE malformed delete type"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} malformeddeltype >malformeddeltype.out
-ans=$(grep got: malformeddeltype.out)
-if [ "${ans}" != "got: 0000a8010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "TSIG wrong class"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} tsigwrongclass >tsigwrongclass.out
-ans=$(grep got: tsigwrongclass.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "TSIG not last"
-$PERL formerr.pl -a 10.53.0.1 -p ${PORT} tsignotlast >tsignotlast.out
-ans=$(grep got: tsignotlast.out)
-if [ "${ans}" != "got: 000080010000000000000000" ]; then
- echo_i "failed"
- status=$(expr $status + 1)
-fi
-
-echo_i "exit status: $status"
-
-[ $status -eq 0 ] || exit 1
--- /dev/null
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+from pathlib import Path
+from typing import SupportsInt
+
+import socket
+import time
+
+import dns.flags
+import dns.message
+import dns.name
+import dns.opcode
+import dns.query
+import dns.rcode
+import dns.rdataclass
+import dns.rdatatype
+import dns.rdtypes.ANY.NSEC3
+import dns.rdtypes.ANY.OPT
+import dns.rdtypes.ANY.RRSIG
+import dns.rdtypes.ANY.SOA
+import dns.rdtypes.ANY.TSIG
+import dns.rdtypes.IN.A
+import pytest
+
+
+def wire(*parts: bytes) -> bytes:
+ return b"".join(parts)
+
+
+def u16(value: SupportsInt) -> bytes:
+ return int(value).to_bytes(2, byteorder="big")
+
+
+def u32(value: int) -> bytes:
+ return value.to_bytes(4, byteorder="big")
+
+
+def name_wire(name: str) -> bytes:
+ return dns.name.from_text(name).to_wire()
+
+
+def root_wire() -> bytes:
+ return dns.name.root.to_wire()
+
+
+def dns_header(
+ message_id: int = 0,
+ flags: int = 0,
+ opcode: int = dns.opcode.QUERY,
+ rcode: dns.rcode.Rcode = dns.rcode.NOERROR,
+ qdcount: int = 0,
+ ancount: int = 0,
+ nscount: int = 0,
+ arcount: int = 0,
+) -> bytes:
+ return wire(
+ u16(message_id),
+ u16(flags | dns.opcode.to_flags(opcode) | int(rcode)),
+ u16(qdcount),
+ u16(ancount),
+ u16(nscount),
+ u16(arcount),
+ )
+
+
+def formerr_response_header(
+ message_id: int = 0,
+ opcode: int = dns.opcode.QUERY,
+ rcode: dns.rcode.Rcode = dns.rcode.FORMERR,
+ qdcount: int = 0,
+ ancount: int = 0,
+ nscount: int = 0,
+ arcount: int = 0,
+) -> bytes:
+ return dns_header(
+ message_id=message_id,
+ flags=dns.flags.QR,
+ opcode=opcode,
+ rcode=rcode,
+ qdcount=qdcount,
+ ancount=ancount,
+ nscount=nscount,
+ arcount=arcount,
+ )
+
+
+def question_wire(
+ qname_wire: bytes,
+ qtype: dns.rdatatype.RdataType = dns.rdatatype.RdataType.A,
+ qclass: dns.rdataclass.RdataClass = dns.rdataclass.RdataClass.IN,
+) -> bytes:
+ return wire(
+ qname_wire,
+ u16(qtype),
+ u16(qclass),
+ )
+
+
+def rr_wire(
+ owner: bytes,
+ rrtype: SupportsInt,
+ rrclass: SupportsInt,
+ *,
+ ttl: int,
+ rdata: bytes = b"",
+) -> bytes:
+ return wire(
+ question_wire(
+ owner, dns.rdatatype.RdataType(rrtype), dns.rdataclass.RdataClass(rrclass)
+ ),
+ u32(ttl),
+ u16(len(rdata)),
+ rdata,
+ )
+
+
+def oversized_name_wire() -> bytes:
+ labels = [bytes([15]) + b"A" * 15 for _ in range(15)]
+ labels.append(bytes([14]) + b"A" * 14 + root_wire())
+ return wire(*labels)
+
+
+def soa_rr(
+ *,
+ expire: int,
+ minimum: int,
+) -> bytes:
+ return rr_wire(
+ root_wire(),
+ dns.rdatatype.RdataType.SOA,
+ dns.rdataclass.RdataClass.IN,
+ ttl=1,
+ rdata=dns.rdtypes.ANY.SOA.SOA(
+ dns.rdataclass.RdataClass.IN,
+ dns.rdatatype.RdataType.SOA,
+ dns.name.root,
+ dns.name.root,
+ 1,
+ 2,
+ 3,
+ expire,
+ minimum,
+ ).to_wire(),
+ )
+
+
+def nsec3_rr(
+ *,
+ owner: bytes,
+) -> bytes:
+ return rr_wire(
+ owner,
+ dns.rdatatype.RdataType.NSEC3,
+ dns.rdataclass.RdataClass.IN,
+ ttl=1,
+ rdata=dns.rdtypes.ANY.NSEC3.NSEC3(
+ dns.rdataclass.RdataClass.IN,
+ dns.rdatatype.RdataType.NSEC3,
+ algorithm=240,
+ flags=0,
+ iterations=0,
+ salt=b"",
+ next=b"\xff",
+ windows=[],
+ ).to_wire(),
+ )
+
+
+def key_rdata(
+ *,
+ flags: int,
+ protocol: int,
+ algorithm: int,
+ keydata: bytes,
+) -> bytes:
+ # No dns.rdtypes.ANY.KEY.KEY class, so construct the rdata manually
+ return wire(u16(flags), bytes([protocol, algorithm]), keydata)
+
+
+def key_rr(*, rdclass: dns.rdataclass.RdataClass) -> bytes:
+ return rr_wire(
+ root_wire(),
+ dns.rdatatype.RdataType.KEY,
+ rdclass,
+ ttl=1,
+ rdata=key_rdata(
+ flags=0,
+ protocol=0,
+ algorithm=248,
+ keydata=b"\x00",
+ ),
+ )
+
+
+def malformed_rrsig_rr() -> bytes:
+ return rr_wire(
+ root_wire(),
+ dns.rdatatype.RdataType.RRSIG,
+ dns.rdataclass.RdataClass.IN,
+ ttl=1,
+ rdata=dns.rdtypes.ANY.RRSIG.RRSIG(
+ dns.rdataclass.RdataClass.IN,
+ dns.rdatatype.RdataType.RRSIG,
+ 0,
+ 240,
+ 0,
+ 1,
+ 2,
+ 3,
+ 0,
+ dns.name.root,
+ b"\x00",
+ ).to_wire(),
+ )
+
+
+def tsig_rr(
+ *,
+ owner: bytes = root_wire(),
+ rdclass: dns.rdataclass.RdataClass = dns.rdataclass.RdataClass.ANY,
+ algorithm: dns.name.Name = dns.name.root,
+ time_signed: int = 0x010203040506,
+ fudge: int = 0x0102,
+ mac: bytes = b"\x00",
+ original_id: int = 0,
+ error: int = 0,
+ other: bytes = b"",
+) -> bytes:
+ return rr_wire(
+ owner,
+ dns.rdatatype.RdataType.TSIG,
+ rdclass,
+ ttl=1,
+ rdata=dns.rdtypes.ANY.TSIG.TSIG(
+ rdclass,
+ dns.rdatatype.RdataType.TSIG,
+ algorithm,
+ time_signed,
+ fudge,
+ mac,
+ original_id,
+ error,
+ other,
+ ).to_wire(),
+ )
+
+
+def opt_rr(*, owner: bytes) -> bytes:
+ return rr_wire(
+ owner,
+ dns.rdatatype.RdataType.OPT,
+ dns.rdataclass.RdataClass.IN,
+ ttl=0,
+ rdata=dns.rdtypes.ANY.OPT.OPT(
+ dns.rdataclass.RdataClass.IN,
+ dns.rdatatype.RdataType.OPT,
+ [],
+ ).to_wire(),
+ )
+
+
+def a_rdata(ipv4_bytes: bytes = b"\x00\x00\x00\x00") -> bytes:
+ return dns.rdtypes.IN.A.A(
+ dns.rdataclass.RdataClass.IN,
+ dns.rdatatype.RdataType.A,
+ ipv4_bytes,
+ ).to_wire()
+
+
+def a_rr(owner: bytes = root_wire()) -> bytes:
+ return rr_wire(
+ owner,
+ dns.rdatatype.RdataType.A,
+ dns.rdataclass.RdataClass.IN,
+ ttl=1,
+ rdata=a_rdata(),
+ )
+
+
+def query_raw_tcp(host: str, port: int, packet_wire: bytes) -> dns.message.Message:
+ with socket.create_connection((host, port), timeout=10) as sock:
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
+ sock.sendall(u16(len(packet_wire)) + packet_wire)
+ response, _ = dns.query.receive_tcp(sock, time.time() + 10)
+ return response
+
+
+PACKET_DIR = Path(__file__).parent
+
+
+def parse_packet_file(packet_name: str) -> bytes:
+ text = (PACKET_DIR / packet_name).read_text(encoding="utf-8")
+ hex_bytes = []
+ for line in text.splitlines():
+ line = line.split("#", 1)[0].strip()
+ if line:
+ hex_bytes.append(line)
+ return bytes.fromhex(" ".join(hex_bytes))
+
+
+def validate_packet(packet_name: str, generated_wire: bytes) -> None:
+ assert generated_wire == parse_packet_file(packet_name)
+
+
+@pytest.mark.parametrize(
+ "packet_name,query_wire,expected_wire",
+ [
+ pytest.param(
+ "nametoolong",
+ wire(
+ dns_header(qdcount=1),
+ question_wire(oversized_name_wire()),
+ ),
+ formerr_response_header(),
+ id="nametoolong",
+ ),
+ pytest.param(
+ "twoquestionnames",
+ wire(
+ dns_header(qdcount=2),
+ question_wire(name_wire("AAAAAAAAAAAAAA."), dns.rdatatype.RdataType.A),
+ # Two names concatenated in the QNAME field
+ question_wire(
+ wire(name_wire("AAAAAAAAAAAAAA."), name_wire("AAAAAAAAAAAAAB.")),
+ dns.rdatatype.RdataType.A,
+ ),
+ ),
+ formerr_response_header(),
+ id="twoquestionnames",
+ ),
+ pytest.param(
+ "twoquestiontypes",
+ wire(
+ dns_header(qdcount=2),
+ question_wire(name_wire("AAAAAAAAAAAAAA."), dns.rdatatype.RdataType.A),
+ question_wire(name_wire("AAAAAAAAAAAAAA."), dns.rdatatype.RdataType.NS),
+ ),
+ formerr_response_header(),
+ id="twoquestiontypes",
+ ),
+ pytest.param(
+ "dupquestion",
+ wire(
+ dns_header(qdcount=2),
+ question_wire(name_wire("AAAAAAAAAAAAAA."), dns.rdatatype.RdataType.A),
+ question_wire(name_wire("AAAAAAAAAAAAAA."), dns.rdatatype.RdataType.A),
+ ),
+ formerr_response_header(),
+ id="dupquestion",
+ ),
+ pytest.param(
+ "dupans",
+ wire(
+ dns_header(qdcount=1, ancount=2),
+ question_wire(root_wire(), dns.rdatatype.RdataType.SOA),
+ soa_rr(expire=4, minimum=5),
+ soa_rr(expire=4, minimum=6),
+ ),
+ wire(
+ formerr_response_header(qdcount=1),
+ question_wire(root_wire(), dns.rdatatype.RdataType.SOA),
+ ),
+ id="dupans",
+ ),
+ pytest.param(
+ "qtypeasanswer",
+ wire(
+ dns_header(ancount=1),
+ rr_wire(
+ root_wire(),
+ dns.rdatatype.RdataType.MAILB,
+ dns.rdataclass.RdataClass.IN,
+ ttl=1,
+ ),
+ ),
+ formerr_response_header(),
+ id="qtypeasanswer",
+ ),
+ pytest.param(
+ "noquestions",
+ dns_header(),
+ # This would be NOERROR if it included a COOKIE option,
+ # but is a FORMERR without one.
+ formerr_response_header(),
+ id="noquestions",
+ ),
+ pytest.param(
+ "badnsec3owner",
+ wire(
+ dns_header(message_id=8, qdcount=1, nscount=1),
+ question_wire(root_wire(), dns.rdatatype.RdataType.A),
+ # Bad NSEC3 owner: X. is not in the base32hex alphabet.
+ nsec3_rr(owner=name_wire("X.")),
+ ),
+ wire(
+ formerr_response_header(
+ message_id=8, rcode=dns.rcode.SERVFAIL, qdcount=1
+ ),
+ question_wire(root_wire(), dns.rdatatype.RdataType.A),
+ ),
+ id="badnsec3owner",
+ ),
+ pytest.param(
+ "shortrecord",
+ wire(
+ dns_header(message_id=9, arcount=1),
+ # Truncated A record (no ttl, length or data)
+ question_wire(root_wire(), dns.rdatatype.RdataType.A),
+ ),
+ formerr_response_header(message_id=9),
+ id="shortrecord",
+ ),
+ pytest.param(
+ "shortquestion",
+ wire(
+ dns_header(qdcount=1),
+ # Truncated question (no class)
+ root_wire(),
+ u16(dns.rdatatype.RdataType.A),
+ ),
+ formerr_response_header(),
+ id="shortquestion",
+ ),
+ pytest.param(
+ "questionclass",
+ wire(
+ dns_header(qdcount=2),
+ question_wire(
+ root_wire(),
+ dns.rdatatype.RdataType.A,
+ dns.rdataclass.RdataClass.IN,
+ ),
+ question_wire(
+ root_wire(),
+ dns.rdatatype.RdataType.A,
+ dns.rdataclass.RdataClass(2),
+ ),
+ ),
+ formerr_response_header(),
+ id="questionclass",
+ ),
+ pytest.param(
+ "badrecordname",
+ wire(
+ dns_header(arcount=1),
+ a_rr(owner=oversized_name_wire()),
+ ),
+ formerr_response_header(),
+ id="badrecordname",
+ ),
+ pytest.param(
+ "wrongclass",
+ wire(
+ dns_header(arcount=2),
+ a_rr(),
+ rr_wire(
+ root_wire(),
+ dns.rdatatype.RdataType(65280),
+ dns.rdataclass.RdataClass(256),
+ ttl=33554433,
+ rdata=a_rdata(),
+ ),
+ ),
+ formerr_response_header(),
+ id="wrongclass",
+ ),
+ pytest.param(
+ "keyclass",
+ wire(
+ dns_header(qdcount=1, arcount=1),
+ question_wire(root_wire(), dns.rdatatype.RdataType.A),
+ key_rr(rdclass=dns.rdataclass.RdataClass(2)),
+ ),
+ wire(
+ formerr_response_header(qdcount=1),
+ question_wire(root_wire(), dns.rdatatype.RdataType.A),
+ ),
+ id="keyclass",
+ ),
+ pytest.param(
+ "optwrongname",
+ wire(
+ dns_header(arcount=1),
+ # OPT owner should be root
+ opt_rr(owner=name_wire("A.")),
+ ),
+ formerr_response_header(),
+ id="optwrongname",
+ ),
+ pytest.param(
+ "malformedrrsig",
+ wire(
+ dns_header(arcount=1),
+ malformed_rrsig_rr(),
+ ),
+ formerr_response_header(),
+ id="malformedrrsig",
+ ),
+ pytest.param(
+ "malformeddeltype",
+ wire(
+ dns_header(opcode=dns.opcode.UPDATE, nscount=1),
+ rr_wire(
+ root_wire(),
+ dns.rdatatype.RdataType.A,
+ dns.rdataclass.RdataClass.ANY,
+ ttl=0,
+ # Non-empty rdata for DELETE type
+ rdata=b"\x00",
+ ),
+ ),
+ formerr_response_header(opcode=dns.opcode.UPDATE),
+ id="malformeddeltype",
+ ),
+ pytest.param(
+ "tsigwrongclass",
+ wire(
+ dns_header(arcount=1),
+ # Class should be ANY not IN
+ tsig_rr(rdclass=dns.rdataclass.RdataClass.IN),
+ ),
+ formerr_response_header(),
+ id="tsigwrongclass",
+ ),
+ pytest.param(
+ "tsignotlast",
+ wire(
+ dns_header(arcount=2),
+ tsig_rr(),
+ # TSIG should be the last record
+ a_rr(),
+ ),
+ formerr_response_header(),
+ id="tsignotlast",
+ ),
+ ],
+)
+def test_formerr(
+ packet_name: str,
+ query_wire: bytes,
+ expected_wire: bytes,
+ named_port: int,
+ ns1,
+) -> None:
+ validate_packet(packet_name, query_wire)
+ response = query_raw_tcp(ns1.ip, named_port, query_wire)
+ assert response.to_wire() == expected_wire