From 084795163e2aa51a2b33f0cb0808c793592e7181 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Fri, 10 May 2024 17:01:50 +0100 Subject: [PATCH] unbound-dhcp-leases-bridge: Implement a worker thread to handle all events Signed-off-by: Michael Tremer --- config/unbound/unbound-dhcp-leases-bridge | 110 +++++++++++++++++++++- 1 file changed, 105 insertions(+), 5 deletions(-) diff --git a/config/unbound/unbound-dhcp-leases-bridge b/config/unbound/unbound-dhcp-leases-bridge index 1f46ea3f96..b4e212b385 100644 --- a/config/unbound/unbound-dhcp-leases-bridge +++ b/config/unbound/unbound-dhcp-leases-bridge @@ -36,6 +36,7 @@ import stat import subprocess import sys import tempfile +import threading import time LOCAL_TTL = 60 @@ -91,6 +92,9 @@ class UnboundDHCPLeasesBridge(object): # Create a queue for all received events self.queue = queue.Queue() + # Initialize the worker + self.worker = Worker(self.queue, callback=self._handle_message) + self.unbound = UnboundConfigWriter(unbound_leases_file) # Load all required data @@ -99,6 +103,9 @@ class UnboundDHCPLeasesBridge(object): def run(self): log.info("Unbound DHCP Leases Bridge started on %s" % self.leases_file) + # Launch the worker + self.worker.start() + # Open the server socket self.socket = self._open_socket(self.socket_path) @@ -119,11 +126,6 @@ class UnboundDHCPLeasesBridge(object): # Decode the data message = self._decode_message(data) - # Log the received message - log.debug("Received message:") - for key in message: - log.debug(" %-20s = %s" % (key, message[key])) - # Add the message to the queue self.queue.put(message) @@ -140,6 +142,10 @@ class UnboundDHCPLeasesBridge(object): finally: conn.close() + # Terminate the worker + self.queue.put(None) + self.worker.join() + log.info("Unbound DHCP Leases Bridge terminated") def _open_socket(self, path): @@ -193,6 +199,45 @@ class UnboundDHCPLeasesBridge(object): return message + def _handle_message(self, message): + log.debug("Handling message:") + for key in message: + log.debug(" %-20s = %s" % (key, message[key])) + + # Extract the event type + event = message.get("EVENT") + + # Check if event is set + if not event: + raise ValueError("The message does not have EVENT set") + + # COMMIT + elif event == "commit": + address = message.get("ADDRESS") + name = message.get("NAME") + + # Create a new lease + lease = Lease(address, { + "client-hostname" : name, + }) + + # Apply the lease + self.unbound.apply_lease(lease) + + # RELEASE/EXPIRY + elif event in ("release", "expiry"): + address = message.get("ADDRESS") + + # Create a new lease + lease = Lease(address, {}) + + # Remove the lease + self.unbound.remove_lease(lease) + + # Raise an error if the event is not supported + else: + raise ValueError("Unsupported event: %s" % event) + def update_dhcp_leases(self): leases = [] @@ -276,6 +321,36 @@ class UnboundDHCPLeasesBridge(object): self.socket.close() +class Worker(threading.Thread): + """ + The worker is launched in a separate thread + which allows us to perform some tasks asynchronously. + """ + def __init__(self, queue, callback): + super().__init__() + + self.queue = queue + self.callback = callback + + def run(self): + log.debug("Worker %s launched" % self.native_id) + + while True: + message = self.queue.get() + + # If the message is None, we have to quit + if message is None: + break + + # Call the callback + try: + self.callback(message) + except Exception as e: + log.error("Callback failed: %s" % e) + + log.debug("Worker %s terminated" % self.native_id) + + class DHCPLeases(object): regex_leaseblock = re.compile(r"lease (?P\d+\.\d+\.\d+\.\d+) {(?P[\s\S]+?)\n}") @@ -613,6 +688,9 @@ class UnboundConfigWriter(object): command = ["unbound-control"] command.extend(args) + # Log what we are doing + log.debug("Running %s" % " ".join(command)) + try: subprocess.check_output(command) @@ -623,6 +701,28 @@ class UnboundConfigWriter(object): raise e + def apply_lease(self, lease): + """ + This method takes a lease and updates Unbound at runtime. + """ + log.debug("Applying lease %s" % lease) + + for rr in lease.rrset: + log.debug("Adding new record %s" % " ".join(rr)) + + self._control("local_data", *rr) + + def remove_lease(self, lease): + """ + This method takes a lease and removes it from Unbound at runtime. + """ + log.debug("Removing lease %s" % lease) + + for name, ttl, type, content in lease.rrset: + log.debug("Removing records for %s" % name) + + self._control("local_data_remove", name) + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Bridge for DHCP Leases and Unbound DNS") -- 2.39.5