]> git.ipfire.org Git - collecty.git/blobdiff - src/collecty/daemon.py
daemon: Don't wait until the worker thread has finished after sending shutdown signal
[collecty.git] / src / collecty / daemon.py
index 2fa000f0e45ef03aca0400cda3f9207f0001f6ff..0891f8a9f30a8405b6fb353ef3207f7a1cffc963 100644 (file)
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python3
 ###############################################################################
 #                                                                             #
 # collecty - A system statistics collection daemon for IPFire                 #
 #                                                                             #
 ###############################################################################
 
-import Queue as queue
+import datetime
+import multiprocessing
+import queue
 import rrdtool
 import signal
 import threading
 import time
 
-import bus
-import plugins
+from . import bus
+from . import plugins
 
-from constants import *
-from i18n import _
+from .constants import *
+from .i18n import _
 
 import logging
 log = logging.getLogger("collecty")
@@ -38,7 +40,7 @@ class Collecty(object):
        # The default interval, when all data is written to disk.
        SUBMIT_INTERVAL = 300
 
-       HEARTBEAT = 5
+       HEARTBEAT = 1
 
        def __init__(self, debug=False):
                self.debug = debug
@@ -56,6 +58,12 @@ class Collecty(object):
                # will be written to disk later.
                self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
 
+               # Create worker threads
+               self.worker_threads = self.create_worker_threads()
+
+               self._timer_queue = queue.PriorityQueue()
+               self._worker_queue = queue.Queue()
+
                # Create a thread that connects to dbus and processes requests we
                # get from there.
                self.bus = bus.Bus(self)
@@ -64,7 +72,7 @@ class Collecty(object):
                for plugin in plugins.get():
                        self.add_plugin(plugin)
 
-               log.info(_("Collecty successfully initialized with %s plugins") \
+               log.debug(_("Collecty successfully initialized with %s plugins") \
                        % len(self.plugins))
 
        def add_plugin(self, plugin_class):
@@ -88,12 +96,20 @@ class Collecty(object):
                # Register signal handlers.
                self.register_signal_handler()
 
+               # Cannot do anything if no plugins have been initialised
+               if not self.plugins:
+                       log.critical(_("No plugins have been initialised"))
+                       return
+
                # Start the bus
                self.bus.start()
 
-               # Start all data source threads.
-               for p in self.plugins:
-                       p.start()
+               # Initialise the timer queue
+               self.initialise_timer_queue()
+
+               # Start worker threads
+               for w in self.worker_threads:
+                       w.start()
 
                # Run the write queue thread
                self.write_queue.start()
@@ -101,14 +117,26 @@ class Collecty(object):
                # Regularly submit all data to disk.
                while self.running:
                        try:
+                               # Try processing one event from the queue. If that succeeded
+                               # we will retry immediately.
+                               if self.process_timer_queue():
+                                       continue
+
+                               # Otherwise we will sleep for a bit
                                time.sleep(self.HEARTBEAT)
+
+                               # Log warnings if the worker queue is filling up
+                               queue_size = self._worker_queue.qsize()
+                               if queue_size >= 5:
+                                       log.warning(_("Worker queue is filling up with %s events") % queue_size)
+
                        except KeyboardInterrupt:
                                self.shutdown()
                                break
 
-               # Wait until all plugins are finished.
-               for p in self.plugins:
-                       p.join()
+               # Wait until all worker threads are finished
+               for w in self.worker_threads:
+                       w.join()
 
                # Stop the bus thread
                self.bus.shutdown()
@@ -122,12 +150,12 @@ class Collecty(object):
                if not self.running:
                        return
 
-               log.debug(_("Received shutdown signal"))
+               log.info(_("Received shutdown signal"))
                self.running = False
 
                # Propagating shutdown to all threads.
-               for p in self.plugins:
-                       p.shutdown()
+               for w in self.worker_threads:
+                       w.shutdown()
 
        def register_signal_handler(self):
                for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
@@ -160,6 +188,88 @@ class Collecty(object):
 
                return plugin.generate_graph(template_name, *args, **kwargs)
 
+       def create_worker_threads(self, num=None):
+               """
+                       Creates a number of worker threads
+               """
+               # If no number of threads is given, we will create as many as we have
+               # active processor cores but never less than two.
+               if num is None:
+                       num = max(multiprocessing.cpu_count(), 2)
+
+               worker_threads = []
+
+               for id in range(num):
+                       worker_thread = WorkerThread(self, id)
+                       worker_threads.append(worker_thread)
+
+               return worker_threads
+
+       def initialise_timer_queue(self):
+               for p in self.plugins:
+                       timer = PluginTimer(p)
+
+                       self._timer_queue.put(timer)
+
+       def process_timer_queue(self):
+               # Take the item from the timer queue that is to be due first
+               timer = self._timer_queue.get()
+
+               try:
+                       # If the timer event is to be executed, we will put the plugin
+                       # into the worker queue and reset the timer
+                       if timer.is_due():
+                               self._worker_queue.put(timer.plugin)
+                               timer.reset_deadline()
+
+                               return timer
+               finally:
+                       # Put the timer back into the timer queue.
+                       self._timer_queue.put(timer)
+
+
+class WorkerThread(threading.Thread):
+       HEARTBEAT = 2.5
+
+       def __init__(self, collecty, id):
+               threading.Thread.__init__(self)
+               self.daemon = True
+
+               self.log = logging.getLogger("collecty.worker")
+               self.log.propagate = 1
+
+               self.collecty = collecty
+               self.id = id
+
+               self.log.debug(_("Worker thread %s has been initialised") % self.id)
+
+       @property
+       def queue(self):
+               """
+                       The queue this thread is getting events from
+               """
+               return self.collecty._worker_queue
+
+       def run(self):
+               self.log.debug(_("Worker thread %s has been started") % self.id)
+               self.running = True
+
+               while self.running:
+                       try:
+                               plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
+
+                       # If the queue has been empty we just retry
+                       except queue.Empty:
+                               continue
+
+                       # Execute the collect operation for this plugin
+                       plugin.collect()
+
+               self.log.debug(_("Worker thread %s has been terminated") % self.id)
+
+       def shutdown(self):
+               self.running = False
+
 
 class WriteQueue(threading.Thread):
        def __init__(self, collecty, submit_interval):
@@ -169,7 +279,6 @@ class WriteQueue(threading.Thread):
                self.collecty = collecty
 
                self.log = logging.getLogger("collecty.queue")
-               self.log.setLevel(logging.DEBUG)
                self.log.propagate = 1
 
                self.timer = plugins.Timer(submit_interval)
@@ -228,21 +337,27 @@ class WriteQueue(threading.Thread):
                                results[result.file] = [result]
 
                # Write the collected data to disk
-               for filename, results in results.items():
+               for filename, results in list(results.items()):
                        self._commit_file(filename, results)
 
                duration = time.time() - time_start
                self.log.debug(_("Emptied write queue in %.2fs") % duration)
 
        def _commit_file(self, filename, results):
-               self.log.debug(_("Committing %(counter)s entries to %(filename)s:") \
+               self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
                        % { "counter" : len(results), "filename" : filename })
 
-               if self.collecty.debug:
-                       for result in results:
-                               self.log.debug("  %s: %s" % (result.time, result.data))
+               for result in results:
+                       self.log.debug("  %s: %s" % (result.time, result.data))
+
+               try:
+                       rrdtool.update(filename, *["%s" % r for r in results])
 
-               rrdtool.update(filename, *["%s" % r for r in results])
+               # Catch operational errors like unreadable/unwritable RRD databases
+               # or those where the format has changed. The collected data will be lost.
+               except rrdtool.OperationalError as e:
+                       self.log.critical(_("Could not update RRD database %s: %s") \
+                               % (filename, e))
 
 
 class QueueObject(object):
@@ -254,5 +369,25 @@ class QueueObject(object):
        def __str__(self):
                return "%s:%s" % (self.time.strftime("%s"), self.data)
 
-       def __cmp__(self, other):
-               return cmp(self.time, other.time)
+       def __lt__(self, other):
+               return self.time < other.time
+
+
+class PluginTimer(object):
+       def __init__(self, plugin):
+               self.plugin = plugin
+
+               self.deadline = datetime.datetime.utcnow()
+
+       def __repr__(self):
+               return "<%s %s>" % (self.__class__.__name__, self.deadline)
+
+       def __lt__(self, other):
+               return self.deadline < other.deadline
+
+       def reset_deadline(self):
+               self.deadline = datetime.datetime.utcnow() \
+                       + datetime.timedelta(seconds=self.plugin.interval)
+
+       def is_due(self):
+               return datetime.datetime.utcnow() >= self.deadline