]> git.ipfire.org Git - ipfire-2.x.git/blob - config/unbound/unbound-dhcp-leases-bridge
4a6f9587f84f8e78a633df9cc2a53a5e8fc01db9
[ipfire-2.x.git] / config / unbound / unbound-dhcp-leases-bridge
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # IPFire.org - A linux based firewall #
5 # Copyright (C) 2016 Michael Tremer #
6 # #
7 # This program is free software: you can redistribute it and/or modify #
8 # it under the terms of the GNU General Public License as published by #
9 # the Free Software Foundation, either version 3 of the License, or #
10 # (at your option) any later version. #
11 # #
12 # This program is distributed in the hope that it will be useful, #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15 # GNU General Public License for more details. #
16 # #
17 # You should have received a copy of the GNU General Public License #
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
19 # #
20 ###############################################################################
21
22 import argparse
23 import datetime
24 import daemon
25 import filecmp
26 import functools
27 import ipaddress
28 import logging
29 import logging.handlers
30 import os
31 import queue
32 import re
33 import signal
34 import socket
35 import stat
36 import subprocess
37 import sys
38 import tempfile
39 import threading
40
41 LOCAL_TTL = 60
42
43 log = logging.getLogger("dhcp")
44 log.setLevel(logging.DEBUG)
45
46 def setup_logging(daemon=True, loglevel=logging.INFO):
47 log.setLevel(loglevel)
48
49 # Log to syslog by default
50 handler = logging.handlers.SysLogHandler(address="/dev/log", facility="daemon")
51 log.addHandler(handler)
52
53 # Format everything
54 formatter = logging.Formatter("%(name)s[%(process)d]: %(message)s")
55 handler.setFormatter(formatter)
56
57 handler.setLevel(loglevel)
58
59 # If we are running in foreground, we should write everything to the console, too
60 if not daemon:
61 handler = logging.StreamHandler()
62 log.addHandler(handler)
63
64 handler.setLevel(loglevel)
65
66 return log
67
68 class UnboundDHCPLeasesBridge(object):
69 def __init__(self, dhcp_leases_file, fix_leases_file, unbound_leases_file, hosts_file, socket_path):
70 self.leases_file = dhcp_leases_file
71 self.fix_leases_file = fix_leases_file
72 self.hosts_file = hosts_file
73 self.socket_path = socket_path
74
75 self.socket = None
76
77 # Store all known leases
78 self.leases = set()
79
80 # Create a queue for all received events
81 self.queue = queue.Queue()
82
83 # Initialize the worker
84 self.worker = Worker(self.queue, callback=self._handle_message)
85
86 # Initialize the watcher
87 self.watcher = Watcher(reload=self.reload)
88
89 self.unbound = UnboundConfigWriter(unbound_leases_file)
90
91 def run(self):
92 log.info("Unbound DHCP Leases Bridge started on %s" % self.leases_file)
93
94 # Launch the worker
95 self.worker.start()
96
97 # Launch the watcher
98 self.watcher.start()
99
100 # Open the server socket
101 self.socket = self._open_socket(self.socket_path)
102
103 while True:
104 # Accept any incoming connections
105 try:
106 conn, peer = self.socket.accept()
107 except OSError as e:
108 break
109
110 try:
111 # Receive what the client is sending
112 data, ancillary_data, flags, address = conn.recvmsg(4096)
113
114 # Log that we have received some data
115 log.debug("Received message of %s byte(s)" % len(data))
116
117 # Decode the data
118 message = self._decode_message(data)
119
120 # Add the message to the queue
121 self.queue.put(message)
122
123 conn.send(b"OK\n")
124
125 # Send ERROR to the client if something went wrong
126 except Exception as e:
127 log.error("Could not handle message: %s" % e)
128
129 conn.send(b"ERROR\n")
130 continue
131
132 # Close the connection
133 finally:
134 conn.close()
135
136 # Terminate the worker
137 self.queue.put(None)
138
139 # Terminate the watcher
140 self.watcher.terminate()
141
142 # Wait for the worker and watcher to finish
143 self.worker.join()
144 self.watcher.join()
145
146 log.info("Unbound DHCP Leases Bridge terminated")
147
148 def _open_socket(self, path):
149 # Allocate a new socket
150 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
151
152 # Unlink any old sockets
153 try:
154 os.unlink(path)
155 except FileNotFoundError as e:
156 pass
157
158 # Bind the socket
159 try:
160 s.bind(self.socket_path)
161 except OSError as e:
162 log.error("Could not open socket at %s: %s" % (path, e))
163
164 raise SystemExit(1) from e
165
166 # Listen
167 s.listen(128)
168
169 return s
170
171 def _decode_message(self, data):
172 message = {}
173
174 for line in data.splitlines():
175 # Skip empty lines
176 if not line:
177 continue
178
179 # Try to decode the line
180 try:
181 line = line.decode()
182 except UnicodeError as e:
183 log.error("Could not decode %r: %s" % (line, e))
184
185 raise e
186
187 # Split the line
188 key, _, value = line.partition("=")
189
190 # Skip the line if it does not have a value
191 if not _:
192 raise ValueError("No value given")
193
194 # Store the attributes
195 message[key] = value
196
197 return message
198
199 def _handle_message(self, message):
200 log.debug("Handling message:")
201 for key in message:
202 log.debug(" %-20s = %s" % (key, message[key]))
203
204 # Extract the event type
205 event = message.get("EVENT")
206
207 # Check if event is set
208 if not event:
209 raise ValueError("The message does not have EVENT set")
210
211 # COMMIT
212 elif event == "commit":
213 address = message.get("ADDRESS")
214 name = message.get("NAME")
215
216 # Find the old lease
217 old_lease = self._find_lease(address)
218
219 # Don't update fixed leases as they might clear the hostname
220 if old_lease and old_lease.fixed:
221 log.debug("Won't update fixed lease %s" % old_lease)
222 return
223
224 # Create a new lease
225 lease = Lease(address, {
226 "client-hostname" : name,
227 })
228 self._add_lease(lease)
229
230 # Can we skip the update?
231 if old_lease:
232 if lease.rrset == old_lease.rrset:
233 log.debug("Won't update %s as nothing has changed" % lease)
234 return
235
236 # Remove the old lease first
237 self.unbound.remove_lease(old_lease)
238 self._remove_lease(old_lease)
239
240 # Apply the lease
241 self.unbound.apply_lease(lease)
242
243 # RELEASE/EXPIRY
244 elif event in ("release", "expiry"):
245 address = message.get("ADDRESS")
246
247 # Find the lease
248 lease = self._find_lease(address)
249
250 if not lease:
251 log.warning("Could not find lease for %s" % address)
252 return
253
254 # Remove the lease
255 self.unbound.remove_lease(lease)
256 self._remove_lease(lease)
257
258 # Raise an error if the event is not supported
259 else:
260 raise ValueError("Unsupported event: %s" % event)
261
262 def update_dhcp_leases(self):
263 # Drop all known leases
264 self.leases.clear()
265
266 # Add all dynamic leases
267 for lease in DHCPLeases(self.leases_file):
268 self._add_lease(lease)
269
270 # Add all static leases
271 for lease in FixLeases(self.fix_leases_file):
272 self._add_lease(lease)
273
274 # Dump leases
275 if self.leases:
276 log.debug("DHCP Leases:")
277 for lease in self.leases:
278 log.debug(" %s:" % lease.fqdn)
279 log.debug(" Start: %s" % lease.time_starts)
280 log.debug(" End : %s" % lease.time_ends)
281 if lease.has_expired():
282 log.debug(" Expired")
283
284 self.unbound.update_dhcp_leases([l for l in self.leases if not l.has_expired()])
285
286 def _add_lease(self, lease):
287 # Skip leases without a FQDN
288 if not lease.fqdn:
289 log.debug("Skipping lease without a FQDN: %s" % lease)
290 return
291
292 # Skip any leases that also are a static host
293 elif lease.fqdn in self.hosts:
294 log.debug("Skipping lease for which a static host exists: %s" % lease)
295 return
296
297 # Don't add expired leases
298 elif lease.has_expired():
299 log.debug("Skipping expired lease: %s" % lease)
300 return
301
302 # Remove any previous leases
303 self._remove_lease(lease)
304
305 # Store the lease
306 self.leases.add(lease)
307
308 def _find_lease(self, ipaddr):
309 """
310 Returns the lease with the specified IP address
311 """
312 if not isinstance(ipaddr, ipaddress.IPv4Address):
313 ipaddr = ipaddress.IPv4Address(ipaddr)
314
315 for lease in self.leases:
316 if lease.ipaddr == ipaddr:
317 return lease
318
319 def _remove_lease(self, lease):
320 try:
321 self.leases.remove(lease)
322 except KeyError:
323 pass
324
325 def read_static_hosts(self):
326 log.info("Reading static hosts from %s" % self.hosts_file)
327
328 hosts = {}
329 with open(self.hosts_file) as f:
330 for line in f.readlines():
331 line = line.rstrip()
332
333 try:
334 enabled, ipaddr, hostname, domainname, generateptr = line.split(",")
335 except:
336 log.warning("Could not parse line: %s" % line)
337 continue
338
339 # Skip any disabled entries
340 if not enabled == "on":
341 continue
342
343 if hostname and domainname:
344 fqdn = "%s.%s" % (hostname, domainname)
345 elif hostname:
346 fqdn = hostname
347 elif domainname:
348 fqdn = domainname
349
350 try:
351 hosts[fqdn].append(ipaddr)
352 hosts[fqdn].sort()
353 except KeyError:
354 hosts[fqdn] = [ipaddr,]
355
356 # Dump everything in the logs
357 log.debug("Static hosts:")
358 for name in hosts:
359 log.debug(" %-20s : %s" % (name, ", ".join(hosts[name])))
360
361 return hosts
362
363 def reload(self, *args, **kwargs):
364 # Read all static hosts
365 self.hosts = self.read_static_hosts()
366
367 # Unconditionally update all leases and reload Unbound
368 self.update_dhcp_leases()
369
370 def terminate(self, *args, **kwargs):
371 # Close the socket
372 if self.socket:
373 self.socket.close()
374
375
376 class Watcher(threading.Thread):
377 """
378 Watches if Unbound is still running.
379 """
380 def __init__(self, reload, *args, **kwargs):
381 super().__init__(*args, **kwargs)
382
383 self.reload = reload
384
385 # Set to true if this thread should be terminated
386 self._terminated = threading.Event()
387
388 def run(self):
389 log.debug("Watcher launched")
390
391 pidfd = None
392
393 while True:
394 # One iteration takes 30 seconds unless we don't know the process
395 # when we try to find it once a second.
396 if self._terminated.wait(30 if pidfd else 1):
397 break
398
399 # Fetch a PIDFD for Unbound
400 if pidfd is None:
401 pidfd = self._get_pidfd()
402
403 # If we could not acquire a PIDFD, we will try again soon...
404 if not pidfd:
405 log.warning("Cannot find Unbound...")
406 continue
407
408 # Since Unbound has been restarted, we need to reload it all...
409 self.reload()
410
411 log.debug("Checking if Unbound is still alive...")
412
413 # Send the process a signal
414 try:
415 signal.pidfd_send_signal(pidfd, signal.SIG_DFL)
416
417 # If the process has died, we land here and will have to wait until Unbound
418 # has come back and reload it...
419 except ProcessLookupError as e:
420 log.error("Unbound has died")
421
422 # Reset the PIDFD
423 pidfd = None
424
425 else:
426 log.debug("Unbound is alive")
427
428 log.debug("Watcher terminated")
429
430 def terminate(self):
431 """
432 Called to signal this thread to terminate
433 """
434 self._terminated.set()
435
436 def _get_pidfd(self):
437 """
438 Returns a PIDFD for unbound if it is running, otherwise None.
439 """
440 # Try to find the PID
441 pid = pidof("unbound")
442
443 if pid:
444 log.debug("Unbound is running as PID %s" % pid)
445
446 # Open a PIDFD
447 pidfd = os.pidfd_open(pid)
448
449 log.debug("Acquired PIDFD %s for PID %s" % (pidfd, pid))
450
451 return pidfd
452
453
454 class Worker(threading.Thread):
455 """
456 The worker is launched in a separate thread
457 which allows us to perform some tasks asynchronously.
458 """
459 def __init__(self, queue, callback):
460 super().__init__()
461
462 self.queue = queue
463 self.callback = callback
464
465 def run(self):
466 log.debug("Worker %s launched" % self.native_id)
467
468 while True:
469 message = self.queue.get()
470
471 # If the message is None, we have to quit
472 if message is None:
473 break
474
475 # Call the callback
476 try:
477 self.callback(message)
478 except Exception as e:
479 log.error("Callback failed: %s" % e, exc_info=True)
480
481 log.debug("Worker %s terminated" % self.native_id)
482
483
484 class DHCPLeases(object):
485 regex_leaseblock = re.compile(r"lease (?P<ipaddr>\d+\.\d+\.\d+\.\d+) {(?P<config>[\s\S]+?)\n}")
486
487 def __init__(self, path):
488 self.path = path
489
490 self._leases = self._parse()
491
492 def __iter__(self):
493 return iter(self._leases)
494
495 def _parse(self):
496 log.info("Reading DHCP leases from %s" % self.path)
497
498 leases = []
499
500 with open(self.path) as f:
501 # Read entire leases file
502 data = f.read()
503
504 for match in self.regex_leaseblock.finditer(data):
505 block = match.groupdict()
506
507 ipaddr = block.get("ipaddr")
508 config = block.get("config")
509
510 properties = self._parse_block(config)
511
512 # Skip any abandoned leases
513 if not "hardware" in properties:
514 continue
515
516 # Skip inactive leases
517 elif not properties.get("binding", "state active"):
518 continue
519
520 lease = Lease(ipaddr, properties)
521 leases.append(lease)
522
523 return leases
524
525 def _parse_block(self, block):
526 properties = {}
527
528 for line in block.splitlines():
529 if not line:
530 continue
531
532 # Remove trailing ; from line
533 if line.endswith(";"):
534 line = line[:-1]
535
536 # Invalid line if it doesn't end with ;
537 else:
538 continue
539
540 # Remove any leading whitespace
541 line = line.lstrip()
542
543 # We skip all options and sets
544 if line.startswith("option") or line.startswith("set"):
545 continue
546
547 # Split by first space
548 key, val = line.split(" ", 1)
549 properties[key] = val
550
551 return properties
552
553
554 class FixLeases(object):
555 def __init__(self, path):
556 self.path = path
557
558 self._leases = self._parse()
559
560 def __iter__(self):
561 return iter(self._leases)
562
563 def _parse(self):
564 log.info("Reading fix leases from %s" % self.path)
565
566 now = datetime.datetime.utcnow()
567
568 leases = []
569
570 with open(self.path) as f:
571 for line in f.readlines():
572 line = line.rstrip()
573
574 try:
575 hwaddr, ipaddr, enabled, a, b, c, hostname = line.split(",")
576 except ValueError:
577 log.warning("Could not parse line: %s" % line)
578 continue
579
580 # Skip any disabled leases
581 if not enabled == "on":
582 continue
583
584 l = Lease(ipaddr, {
585 "binding" : "state active",
586 "client-hostname" : hostname,
587 "starts" : now.strftime("%w %Y/%m/%d %H:%M:%S"),
588 "ends" : "never",
589 }, fixed=True)
590 leases.append(l)
591
592 return leases
593
594
595 class Lease(object):
596 def __init__(self, ipaddr, properties, fixed=False):
597 if not isinstance(ipaddr, ipaddress.IPv4Address):
598 ipaddr = ipaddress.IPv4Address(ipaddr)
599
600 self.ipaddr = ipaddr
601 self._properties = properties
602 self.fixed = fixed
603
604 def __repr__(self):
605 return "<%s for %s (%s)>" % (self.__class__.__name__, self.ipaddr, self.hostname)
606
607 def __eq__(self, other):
608 if isinstance(other, self.__class__):
609 return self.ipaddr == other.ipaddr
610
611 return NotImplemented
612
613 def __gt__(self, other):
614 if isinstance(other, self.__class__):
615 if not self.ipaddr == other.ipaddr:
616 return NotImplemented
617
618 return self.time_starts > other.time_starts
619
620 return NotImplemented
621
622 def __hash__(self):
623 return hash(self.ipaddr)
624
625 @property
626 def hostname(self):
627 hostname = self._properties.get("client-hostname")
628
629 if hostname is None:
630 return
631
632 # Remove any ""
633 hostname = hostname.replace("\"", "")
634
635 # Only return valid hostnames
636 m = re.match(r"^[A-Z0-9\-]{1,63}$", hostname, re.I)
637 if m:
638 return hostname
639
640 @property
641 def domain(self):
642 # Load ethernet settings
643 ethernet_settings = self.read_settings("/var/ipfire/ethernet/settings")
644
645 # Load DHCP settings
646 dhcp_settings = self.read_settings("/var/ipfire/dhcp/settings")
647
648 subnets = {}
649 for zone in ("GREEN", "BLUE"):
650 if not dhcp_settings.get("ENABLE_%s" % zone) == "on":
651 continue
652
653 netaddr = ethernet_settings.get("%s_NETADDRESS" % zone)
654 submask = ethernet_settings.get("%s_NETMASK" % zone)
655
656 subnet = ipaddress.ip_network("%s/%s" % (netaddr, submask))
657 domain = dhcp_settings.get("DOMAIN_NAME_%s" % zone)
658
659 subnets[subnet] = domain
660
661 address = ipaddress.ip_address(self.ipaddr)
662
663 for subnet in subnets:
664 if address in subnet:
665 return subnets[subnet]
666
667 # Load main settings
668 settings = self.read_settings("/var/ipfire/main/settings")
669
670 # Fall back to the host domain if no match could be found
671 return settings.get("DOMAINNAME", "localdomain")
672
673 @staticmethod
674 @functools.cache
675 def read_settings(filename):
676 settings = {}
677
678 with open(filename) as f:
679 for line in f.readlines():
680 # Remove line-breaks
681 line = line.rstrip()
682
683 k, v = line.split("=", 1)
684 settings[k] = v
685
686 return settings
687
688 @property
689 def fqdn(self):
690 if self.hostname:
691 return "%s.%s" % (self.hostname, self.domain)
692
693 @staticmethod
694 def _parse_time(s):
695 return datetime.datetime.strptime(s, "%w %Y/%m/%d %H:%M:%S")
696
697 @property
698 def time_starts(self):
699 starts = self._properties.get("starts")
700
701 if starts:
702 return self._parse_time(starts)
703
704 @property
705 def time_ends(self):
706 ends = self._properties.get("ends")
707
708 if not ends or ends == "never":
709 return
710
711 return self._parse_time(ends)
712
713 def has_expired(self):
714 if not self.time_starts:
715 return
716
717 if not self.time_ends:
718 return self.time_starts > datetime.datetime.utcnow()
719
720 return not self.time_starts < datetime.datetime.utcnow() < self.time_ends
721
722 @property
723 def rrset(self):
724 # If the lease does not have a valid FQDN, we cannot create any RRs
725 if self.fqdn is None:
726 return []
727
728 return [
729 # Forward record
730 (self.fqdn, "%s" % LOCAL_TTL, "IN A", "%s" % self.ipaddr),
731
732 # Reverse record
733 (self.ipaddr.reverse_pointer, "%s" % LOCAL_TTL,
734 "IN PTR", self.fqdn),
735 ]
736
737
738 class UnboundConfigWriter(object):
739 def __init__(self, path):
740 self.path = path
741
742 def update_dhcp_leases(self, leases):
743 # Write out all leases
744 if self.write_dhcp_leases(leases):
745 log.debug("Reloading Unbound...")
746
747 # Reload the configuration without dropping the cache
748 self._control("reload_keep_cache")
749
750 def write_dhcp_leases(self, leases):
751 log.debug("Writing DHCP leases...")
752
753 with tempfile.NamedTemporaryFile(mode="w") as f:
754 for l in sorted(leases, key=lambda x: x.ipaddr):
755 for rr in l.rrset:
756 f.write("local-data: \"%s\"\n" % " ".join(rr))
757
758 # Flush the file
759 f.flush()
760
761 # Compare if the new leases file has changed from the previous version
762 try:
763 if filecmp.cmp(f.name, self.path, shallow=False):
764 log.debug("The generated leases file has not changed")
765
766 return False
767
768 # Remove the old file
769 os.unlink(self.path)
770
771 # If the previous file did not exist, just keep falling through
772 except FileNotFoundError:
773 pass
774
775 # Make file readable for everyone
776 os.fchmod(f.fileno(), stat.S_IRUSR|stat.S_IWUSR|stat.S_IRGRP|stat.S_IROTH)
777
778 # Move the file to its destination
779 os.link(f.name, self.path)
780
781 return True
782
783 def _control(self, *args):
784 command = ["unbound-control"]
785 command.extend(args)
786
787 # Log what we are doing
788 log.debug("Running %s" % " ".join(command))
789
790 try:
791 subprocess.check_output(command)
792
793 # Log any errors
794 except subprocess.CalledProcessError as e:
795 log.critical("Could not run %s, error code: %s: %s" % (
796 " ".join(command), e.returncode, e.output))
797
798 raise e
799
800 def apply_lease(self, lease):
801 """
802 This method takes a lease and updates Unbound at runtime.
803 """
804 log.debug("Applying lease %s" % lease)
805
806 for rr in lease.rrset:
807 log.debug("Adding new record %s" % " ".join(rr))
808
809 self._control("local_data", *rr)
810
811 def remove_lease(self, lease):
812 """
813 This method takes a lease and removes it from Unbound at runtime.
814 """
815 log.debug("Removing lease %s" % lease)
816
817 for name, ttl, type, content in lease.rrset:
818 log.debug("Removing records for %s" % name)
819
820 self._control("local_data_remove", name)
821
822
823 def pidof(program):
824 """
825 Returns the first PID of the given program.
826 """
827 try:
828 output = subprocess.check_output(["pidof", program])
829 except subprocess.CalledProcessError as e:
830 return
831
832 # Convert to string
833 output = output.decode()
834
835 # Return the first PID
836 for pid in output.split():
837 try:
838 pid = int(pid)
839 except ValueError:
840 continue
841
842 return pid
843
844
845 if __name__ == "__main__":
846 parser = argparse.ArgumentParser(description="Bridge for DHCP Leases and Unbound DNS")
847
848 # Daemon Stuff
849 parser.add_argument("--daemon", "-d", action="store_true",
850 help="Launch as daemon in background")
851 parser.add_argument("--verbose", "-v", action="count", help="Be more verbose")
852
853 # Paths
854 parser.add_argument("--dhcp-leases", default="/var/state/dhcp/dhcpd.leases",
855 metavar="PATH", help="Path to the DHCPd leases file")
856 parser.add_argument("--unbound-leases", default="/etc/unbound/dhcp-leases.conf",
857 metavar="PATH", help="Path to the unbound configuration file")
858 parser.add_argument("--fix-leases", default="/var/ipfire/dhcp/fixleases",
859 metavar="PATH", help="Path to the fix leases file")
860 parser.add_argument("--hosts", default="/var/ipfire/main/hosts",
861 metavar="PATH", help="Path to static hosts file")
862 parser.add_argument("--socket-path", default="/var/run/unbound-dhcp-leases-bridge.sock",
863 metavar="PATH", help="Socket Path",
864 )
865
866 # Parse command line arguments
867 args = parser.parse_args()
868
869 # Setup logging
870 loglevel = logging.WARN
871
872 if args.verbose:
873 if args.verbose == 1:
874 loglevel = logging.INFO
875 elif args.verbose >= 2:
876 loglevel = logging.DEBUG
877
878 bridge = UnboundDHCPLeasesBridge(args.dhcp_leases, args.fix_leases,
879 args.unbound_leases, args.hosts, socket_path=args.socket_path)
880
881 with daemon.DaemonContext(
882 detach_process=args.daemon,
883 stderr=None if args.daemon else sys.stderr,
884 signal_map = {
885 signal.SIGHUP : bridge.reload,
886 signal.SIGINT : bridge.terminate,
887 signal.SIGTERM : bridge.terminate,
888 },
889 ) as daemon:
890 setup_logging(daemon=args.daemon, loglevel=loglevel)
891
892 bridge.run()