]> git.ipfire.org Git - ipfire-2.x.git/commitdiff
unbound-dhcp-leases-bridge: Implement a worker thread to handle all events
authorMichael Tremer <michael.tremer@ipfire.org>
Fri, 10 May 2024 16:01:50 +0000 (17:01 +0100)
committerMichael Tremer <michael.tremer@ipfire.org>
Fri, 10 May 2024 16:01:50 +0000 (17:01 +0100)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
config/unbound/unbound-dhcp-leases-bridge

index 1f46ea3f96ed9c5a427cd2a9c6a8a37850818c65..b4e212b385865d9f719c0ae8afd09f485cf987c1 100644 (file)
@@ -36,6 +36,7 @@ import stat
 import subprocess
 import sys
 import tempfile
+import threading
 import time
 
 LOCAL_TTL = 60
@@ -91,6 +92,9 @@ class UnboundDHCPLeasesBridge(object):
                # Create a queue for all received events
                self.queue = queue.Queue()
 
+               # Initialize the worker
+               self.worker = Worker(self.queue, callback=self._handle_message)
+
                self.unbound = UnboundConfigWriter(unbound_leases_file)
 
                # Load all required data
@@ -99,6 +103,9 @@ class UnboundDHCPLeasesBridge(object):
        def run(self):
                log.info("Unbound DHCP Leases Bridge started on %s" % self.leases_file)
 
+               # Launch the worker
+               self.worker.start()
+
                # Open the server socket
                self.socket = self._open_socket(self.socket_path)
 
@@ -119,11 +126,6 @@ class UnboundDHCPLeasesBridge(object):
                                # Decode the data
                                message = self._decode_message(data)
 
-                               # Log the received message
-                               log.debug("Received message:")
-                               for key in message:
-                                       log.debug("  %-20s = %s" % (key, message[key]))
-
                                # Add the message to the queue
                                self.queue.put(message)
 
@@ -140,6 +142,10 @@ class UnboundDHCPLeasesBridge(object):
                        finally:
                                conn.close()
 
+               # Terminate the worker
+               self.queue.put(None)
+               self.worker.join()
+
                log.info("Unbound DHCP Leases Bridge terminated")
 
        def _open_socket(self, path):
@@ -193,6 +199,45 @@ class UnboundDHCPLeasesBridge(object):
 
                return message
 
+       def _handle_message(self, message):
+               log.debug("Handling message:")
+               for key in message:
+                       log.debug("  %-20s = %s" % (key, message[key]))
+
+               # Extract the event type
+               event = message.get("EVENT")
+
+               # Check if event is set
+               if not event:
+                       raise ValueError("The message does not have EVENT set")
+
+               # COMMIT
+               elif event == "commit":
+                       address = message.get("ADDRESS")
+                       name    = message.get("NAME")
+
+                       # Create a new lease
+                       lease = Lease(address, {
+                               "client-hostname" : name,
+                       })
+
+                       # Apply the lease
+                       self.unbound.apply_lease(lease)
+
+               # RELEASE/EXPIRY
+               elif event in ("release", "expiry"):
+                       address = message.get("ADDRESS")
+
+                       # Create a new lease
+                       lease = Lease(address, {})
+
+                       # Remove the lease
+                       self.unbound.remove_lease(lease)
+
+               # Raise an error if the event is not supported
+               else:
+                       raise ValueError("Unsupported event: %s" % event)
+
        def update_dhcp_leases(self):
                leases = []
 
@@ -276,6 +321,36 @@ class UnboundDHCPLeasesBridge(object):
                        self.socket.close()
 
 
+class Worker(threading.Thread):
+       """
+               The worker is launched in a separate thread
+               which allows us to perform some tasks asynchronously.
+       """
+       def __init__(self, queue, callback):
+               super().__init__()
+
+               self.queue = queue
+               self.callback = callback
+
+       def run(self):
+               log.debug("Worker %s launched" % self.native_id)
+
+               while True:
+                       message = self.queue.get()
+
+                       # If the message is None, we have to quit
+                       if message is None:
+                               break
+
+                       # Call the callback
+                       try:
+                               self.callback(message)
+                       except Exception as e:
+                               log.error("Callback failed: %s" % e)
+
+               log.debug("Worker %s terminated" % self.native_id)
+
+
 class DHCPLeases(object):
        regex_leaseblock = re.compile(r"lease (?P<ipaddr>\d+\.\d+\.\d+\.\d+) {(?P<config>[\s\S]+?)\n}")
 
@@ -613,6 +688,9 @@ class UnboundConfigWriter(object):
                command = ["unbound-control"]
                command.extend(args)
 
+               # Log what we are doing
+               log.debug("Running %s" % " ".join(command))
+
                try:
                        subprocess.check_output(command)
 
@@ -623,6 +701,28 @@ class UnboundConfigWriter(object):
 
                        raise e
 
+       def apply_lease(self, lease):
+               """
+                       This method takes a lease and updates Unbound at runtime.
+               """
+               log.debug("Applying lease %s" % lease)
+
+               for rr in lease.rrset:
+                       log.debug("Adding new record %s" % " ".join(rr))
+
+                       self._control("local_data", *rr)
+
+       def remove_lease(self, lease):
+               """
+                       This method takes a lease and removes it from Unbound at runtime.
+               """
+               log.debug("Removing lease %s" % lease)
+
+               for name, ttl, type, content in lease.rrset:
+                       log.debug("Removing records for %s" % name)
+
+                       self._control("local_data_remove", name)
+
 
 if __name__ == "__main__":
        parser = argparse.ArgumentParser(description="Bridge for DHCP Leases and Unbound DNS")