]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
Add DDNS server example (#1246)
authordarkdragon-001 <darkdragon-001@users.noreply.github.com>
Fri, 9 Jan 2026 14:24:21 +0000 (15:24 +0100)
committerGitHub <noreply@github.com>
Fri, 9 Jan 2026 14:24:21 +0000 (06:24 -0800)
Adjust the default values of the DDNS client example to connect to the one from the server example.

examples/ddns_client.py [moved from examples/ddns.py with 84% similarity]
examples/ddns_server.py [new file with mode: 0644]

similarity index 84%
rename from examples/ddns.py
rename to examples/ddns_client.py
index d4d89a0371776f5eade3b5f5d89bb5efab5cd7d3..e0487840127a484900c0c8e8e9171f18621818c9 100755 (executable)
@@ -1,10 +1,9 @@
 #!/usr/bin/env python3
-
 #
 # Use a TSIG-signed DDNS update to update our hostname-to-address
 # mapping.
 #
-# usage: ddns.py <ip-address>
+# usage: ddns_client.py <ip-address>
 #
 # On linux systems, you can automatically update your DNS any time an
 # interface comes up by adding an ifup-local script that invokes this
@@ -44,6 +43,6 @@ update = dns.update.Update("example.", keyring=keyring)
 update.replace("host", 300, "A", sys.argv[1])
 
 #
-# Replace "10.0.0.1" with the IP address of your master server.
+# Replace "127.0.0.1" and 8053 with the hostname and port of your master server.
 #
-response = dns.query.tcp(update, "10.0.0.1", timeout=10)
+response = dns.query.tcp(update, where="127.0.0.1", timeout=10, port=8053)
diff --git a/examples/ddns_server.py b/examples/ddns_server.py
new file mode 100644 (file)
index 0000000..c69c994
--- /dev/null
@@ -0,0 +1,119 @@
+#!/usr/bin/env python3
+#
+# Create a DNS server to receive DDNS update queries
+#
+# usage: ddns_server.py
+#
+
+import asyncio
+import logging
+import struct
+import typing
+
+import dns.exception
+import dns.message
+import dns.name
+import dns.opcode
+import dns.rcode
+import dns.rdataclass
+import dns.rdatatype
+import dns.tsigkeyring
+import dns.update
+
+
+KEYRING = dns.tsigkeyring.from_text({"keyname.": "NjHwPsMKjdN++dOfE5iAiQ=="})
+
+TEST_ZONES = {
+    dns.name.from_text("example."): [
+        dns.name.from_text("keyname."),
+    ],
+}
+
+
+def response(msg, code=dns.rcode.SERVFAIL):
+    response = dns.message.make_response(msg)
+    response.set_rcode(code)
+    return response.to_wire()
+
+
+async def handle_nsupdate(data, addr):
+    cli = addr[0]
+    msg = dns.message.from_wire(data, keyring=KEYRING)
+    try:
+        if msg.opcode() != dns.opcode.UPDATE:
+            raise NotImplementedError("Opcode %s not implemented" % dns.opcode.to_text(msg.opcode()))
+        update_msg = typing.cast(dns.update.UpdateMessage, msg)
+        zone = update_msg.zone[0].name
+        if not msg.had_tsig or msg.keyname not in TEST_ZONES[zone]:
+            raise dns.exception.DeniedByPolicy(f"Key {msg.keyname} not allowed for zone {zone}")
+        for r in update_msg.update:
+            if r.deleting:
+                if r.deleting == dns.rdataclass.ANY and r.rdtype == dns.rdatatype.ANY:
+                    logging.info("%s: delete_all_rrsets %s" % (cli, r))
+                elif r.deleting == dns.rdataclass.ANY:
+                    logging.info("%s: delete_rrset %s" % (cli, r))
+                elif r.deleting == dns.rdataclass.NONE:
+                    logging.info("%s: delete_from_rrset %s" % (cli, r))
+            else:
+                logging.info("%s: add_to_rrset %s" % (cli, r))
+    except dns.exception.FormError:
+        logging.exception("Rejected %s: Error parsing message" % cli)
+        return response(msg, code=dns.rcode.FORMERR)
+    except dns.exception.DeniedByPolicy:
+        logging.exception("Rejected %s: Validation error" % cli)
+        return response(msg, code=dns.rcode.REFUSED)
+    except NotImplementedError:
+        logging.exception("Rejected %s: Not implemented error" % cli)
+        return response(msg, code=dns.rcode.NOTIMP)
+    except:
+        logging.exception("Rejected %s: Internal error" % cli)
+        return response(msg, code=dns.rcode.SERVFAIL)
+    return response(msg, code=dns.rcode.NOERROR)
+
+
+async def main():
+    hostname = "0.0.0.0"
+    port = 8053
+
+    logging.basicConfig(level=logging.INFO)
+    logging.info("Starting servers at {hostname}:{port}")
+    loop = asyncio.get_event_loop()
+
+    # Start UDP server
+    class DatagramProtocol(asyncio.DatagramProtocol):
+        def connection_made(self, transport):
+            self.transport = transport
+
+        def datagram_received(self, data, addr):
+            asyncio.ensure_future(self.handle(data, addr))
+
+        async def handle(self, data, addr):
+            result = await handle_nsupdate(data, addr)
+            self.transport.sendto(result, addr)
+
+    transport, _protocol = await loop.create_datagram_endpoint(lambda: DatagramProtocol(), local_addr=(hostname, port))
+
+    # Start TCP server
+    class StreamReaderProtocol(asyncio.StreamReaderProtocol):
+        def __init__(self):
+            super().__init__(asyncio.StreamReader(), self.handle_tcp)
+
+        async def handle_tcp(self, reader, writer):
+            addr = writer.transport.get_extra_info("peername")
+            while True:
+                try:
+                    (size,) = struct.unpack("!H", await reader.readexactly(2))
+                except asyncio.IncompleteReadError:
+                    break
+                data = await reader.readexactly(size)
+
+                result = await handle_nsupdate(data, addr)
+                bsize = struct.pack("!H", len(result))
+                writer.write(bsize)
+                writer.write(result)
+
+    server = await loop.create_server(lambda: StreamReaderProtocol(), hostname, port)
+    await server.serve_forever()
+
+
+asyncio.run(main())