import subprocess
import sys
import tempfile
+import threading
import time
LOCAL_TTL = 60
# Create a queue for all received events
self.queue = queue.Queue()
+ # Initialize the worker
+ self.worker = Worker(self.queue, callback=self._handle_message)
+
self.unbound = UnboundConfigWriter(unbound_leases_file)
# Load all required data
def run(self):
log.info("Unbound DHCP Leases Bridge started on %s" % self.leases_file)
+ # Launch the worker
+ self.worker.start()
+
# Open the server socket
self.socket = self._open_socket(self.socket_path)
# Decode the data
message = self._decode_message(data)
- # Log the received message
- log.debug("Received message:")
- for key in message:
- log.debug(" %-20s = %s" % (key, message[key]))
-
# Add the message to the queue
self.queue.put(message)
finally:
conn.close()
+ # Terminate the worker
+ self.queue.put(None)
+ self.worker.join()
+
log.info("Unbound DHCP Leases Bridge terminated")
def _open_socket(self, path):
return message
+ def _handle_message(self, message):
+ log.debug("Handling message:")
+ for key in message:
+ log.debug(" %-20s = %s" % (key, message[key]))
+
+ # Extract the event type
+ event = message.get("EVENT")
+
+ # Check if event is set
+ if not event:
+ raise ValueError("The message does not have EVENT set")
+
+ # COMMIT
+ elif event == "commit":
+ address = message.get("ADDRESS")
+ name = message.get("NAME")
+
+ # Create a new lease
+ lease = Lease(address, {
+ "client-hostname" : name,
+ })
+
+ # Apply the lease
+ self.unbound.apply_lease(lease)
+
+ # RELEASE/EXPIRY
+ elif event in ("release", "expiry"):
+ address = message.get("ADDRESS")
+
+ # Create a new lease
+ lease = Lease(address, {})
+
+ # Remove the lease
+ self.unbound.remove_lease(lease)
+
+ # Raise an error if the event is not supported
+ else:
+ raise ValueError("Unsupported event: %s" % event)
+
def update_dhcp_leases(self):
leases = []
self.socket.close()
+class Worker(threading.Thread):
+ """
+ The worker is launched in a separate thread
+ which allows us to perform some tasks asynchronously.
+ """
+ def __init__(self, queue, callback):
+ super().__init__()
+
+ self.queue = queue
+ self.callback = callback
+
+ def run(self):
+ log.debug("Worker %s launched" % self.native_id)
+
+ while True:
+ message = self.queue.get()
+
+ # If the message is None, we have to quit
+ if message is None:
+ break
+
+ # Call the callback
+ try:
+ self.callback(message)
+ except Exception as e:
+ log.error("Callback failed: %s" % e)
+
+ log.debug("Worker %s terminated" % self.native_id)
+
+
class DHCPLeases(object):
regex_leaseblock = re.compile(r"lease (?P<ipaddr>\d+\.\d+\.\d+\.\d+) {(?P<config>[\s\S]+?)\n}")
command = ["unbound-control"]
command.extend(args)
+ # Log what we are doing
+ log.debug("Running %s" % " ".join(command))
+
try:
subprocess.check_output(command)
raise e
+ def apply_lease(self, lease):
+ """
+ This method takes a lease and updates Unbound at runtime.
+ """
+ log.debug("Applying lease %s" % lease)
+
+ for rr in lease.rrset:
+ log.debug("Adding new record %s" % " ".join(rr))
+
+ self._control("local_data", *rr)
+
+ def remove_lease(self, lease):
+ """
+ This method takes a lease and removes it from Unbound at runtime.
+ """
+ log.debug("Removing lease %s" % lease)
+
+ for name, ttl, type, content in lease.rrset:
+ log.debug("Removing records for %s" % name)
+
+ self._control("local_data_remove", name)
+
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Bridge for DHCP Leases and Unbound DNS")