--- /dev/null
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+import socket
+import struct
+
+from dns import rdataclass, rdatatype, update
+
+import pytest
+
+import isctest
+
+pytestmark = pytest.mark.extra_artifacts(
+ [
+ "*/*.db",
+ ]
+)
+
+
+def encode_name(name: str) -> bytes:
+ out = b""
+ for label in name.rstrip(".").split("."):
+ out += bytes([len(label)]) + label.encode("ascii")
+ return out + b"\x00"
+
+
+@pytest.mark.parametrize(
+ "rdtype,rdclass,ttl,rdata",
+ [
+ (rdatatype.SRV, rdataclass.NONE, 0, b"\x00"),
+ (rdatatype.KX, rdataclass.NONE, 0, b""),
+ (rdatatype.PX, rdataclass.NONE, 0, b""),
+ (rdatatype.NSAP, rdataclass.NONE, 0, b""),
+ (rdatatype.NSAP_PTR, rdataclass.NONE, 0, b""),
+ (31, rdataclass.NONE, 0, b""), # dnspython doesn't define type EID
+ ],
+)
+def test_class_invalid(rdtype, rdclass, ttl, rdata, named_port):
+ # these update messages are badly formatted, so we construct
+ # them manually instead of using dnspython.
+
+ # opcode=UPDATE, 1 RRset in ZONE, 1 RRset in UPDATE
+ header = struct.pack("!HHHHHH", 0, 0x2800, 1, 0, 1, 0)
+
+ # ZONE section: QNAME=<zone>, QTYPE=SOA, QCLASS=ANY
+ zone_q = encode_name("1.0.0.127.in-addr.arpa") + struct.pack("!HH", 6, 255)
+
+ # UPDATE section RR:
+ update_rr = (
+ encode_name("1.0.0.127.in-addr.arpa")
+ + struct.pack("!HHIH", rdtype, rdclass, ttl, len(rdata))
+ + rdata
+ )
+
+ m = header + zone_q + update_rr
+ packet = struct.pack("!H", len(m)) + m
+
+ with socket.create_connection(
+ ("10.53.0.2", named_port), source_address=("127.0.0.1", 0), timeout=2.0
+ ) as s:
+ s.sendall(packet)
+ try:
+ rwire = s.recv(4096)
+ res = dns.message.from_wire(rwire)
+ isctest.check.formerr(res)
+ except Exception: # pylint: disable=broad-except
+ pass
+
+ # check the server is answering
+ msg = isctest.query.create("1.0.0.127.in-addr.arpa", "SRV")
+ res = isctest.query.udp(msg, "10.53.0.2")
+ isctest.check.noerror(res)
+ isctest.check.rr_count_eq(res.answer, 0)
+
+
+@pytest.mark.parametrize(
+ "rdtype,rdata",
+ [
+ (rdatatype.SVCB, "\\# 02 0000"),
+ (rdatatype.WKS, "\\# 02 4142"),
+ (rdatatype.WKS, "\\# 02 4344"),
+ ],
+)
+def test_class_chaosupdate(rdtype, rdata):
+ up = update.UpdateMessage("example.", rdclass=rdataclass.CHAOS)
+ up.add("foo.example.", 300, rdtype, rdata)
+ res = isctest.query.tcp(up, "10.53.0.2")
+ isctest.check.notimp(res)