]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
add example reading TCP-like stream of wire formats from a file (#995)
authorPetr Špaček <pspacek@isc.org>
Fri, 13 Oct 2023 12:08:02 +0000 (14:08 +0200)
committerGitHub <noreply@github.com>
Fri, 13 Oct 2023 12:08:02 +0000 (05:08 -0700)
examples/wire_read_tcp.py [new file with mode: 0755]

diff --git a/examples/wire_read_tcp.py b/examples/wire_read_tcp.py
new file mode 100755 (executable)
index 0000000..e571a65
--- /dev/null
@@ -0,0 +1,49 @@
+#!/usr/bin/python
+import argparse
+import sys
+
+import dns.exception
+import dns.message
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="Read sequence of DNS wire formats prefixed with 2-byte "
+        "length field - like from TCP socket - and print the messages. This "
+        "format is used e.g. by dnsperf -B option and dnsgen."
+    )
+    parser.add_argument("infile")
+    args = parser.parse_args()
+    ok_msgs = 0
+    bad_msgs = 0
+
+    with open(args.infile, "rb") as infile:
+        while True:
+            offset = infile.tell()
+            len_wire = infile.read(2)
+            if len(len_wire) == 0:  # end of file - expected
+                break
+            if len(len_wire) == 1:
+                raise ValueError("incomplete length preamble, offset", offset)
+            len_msg = int.from_bytes(len_wire, byteorder="big", signed=False)
+            print(f"; msg offset 0x{offset + 2:x}, length {len_msg} bytes")
+            msg_wire = infile.read(len_msg)
+            if len(msg_wire) != len_msg:
+                raise ValueError(
+                    f"incomplete message: expected {len_msg} != got {len(msg_wire)}, "
+                    f"length field offset 0x{offset:x}",
+                )
+            try:
+                msg = dns.message.from_wire(msg_wire)
+                ok_msgs += 1
+                print(msg)
+            except dns.exception.DNSException as ex:
+                print(f"; invalid message, skipping: {ex}")
+                bad_msgs += 1
+        print(f"; read {ok_msgs} valid and {bad_msgs} invalid messages")
+        if bad_msgs:
+            sys.exit(1)
+
+
+if __name__ == "__main__":
+    main()