]> git.ipfire.org Git - collecty.git/blobdiff - src/collecty/daemon.py
daemon: Don't wait until the worker thread has finished after sending shutdown signal
[collecty.git] / src / collecty / daemon.py
index 8f310213b15d90946a7b15ac4b0cfdf5ec781b6f..0891f8a9f30a8405b6fb353ef3207f7a1cffc963 100644 (file)
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python3
 ###############################################################################
 #                                                                             #
 # collecty - A system statistics collection daemon for IPFire                 #
 #                                                                             #
 ###############################################################################
 
+import datetime
+import multiprocessing
+import queue
+import rrdtool
 import signal
+import threading
+import time
 
-import plugins
+from . import bus
+from . import plugins
 
-from constants import *
-from i18n import _
+from .constants import *
+from .i18n import _
 
 import logging
 log = logging.getLogger("collecty")
@@ -33,85 +40,122 @@ class Collecty(object):
        # The default interval, when all data is written to disk.
        SUBMIT_INTERVAL = 300
 
+       HEARTBEAT = 1
+
        def __init__(self, debug=False):
+               self.debug = debug
+
                # Enable debug logging when running in debug mode
-               if debug:
+               if self.debug:
                        log.setLevel(logging.DEBUG)
 
-               self.data_sources = []
+               self.plugins = []
 
                # Indicates whether this process should be running or not.
                self.running = True
-               self.timer = plugins.Timer(self.SUBMIT_INTERVAL, heartbeat=2)
 
-               # Add all automatic data sources.
-               self.add_autocreate_data_sources()
+               # The write queue holds all collected pieces of data which
+               # will be written to disk later.
+               self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
 
-               log.info(_("Collecty successfully initialized."))
+               # Create worker threads
+               self.worker_threads = self.create_worker_threads()
 
-       def add_autocreate_data_sources(self):
-               for data_source in plugins.data_sources:
-                       if not hasattr(data_source, "autocreate"):
-                               continue
+               self._timer_queue = queue.PriorityQueue()
+               self._worker_queue = queue.Queue()
 
-                       ret = data_source.autocreate(self)
-                       if not ret:
-                               continue
+               # Create a thread that connects to dbus and processes requests we
+               # get from there.
+               self.bus = bus.Bus(self)
+
+               # Add all plugins
+               for plugin in plugins.get():
+                       self.add_plugin(plugin)
+
+               log.debug(_("Collecty successfully initialized with %s plugins") \
+                       % len(self.plugins))
 
-                       if not type(ret) == type([]):
-                               ret = [ret,]
+       def add_plugin(self, plugin_class):
+               # Try initialising a new plugin. If that fails, we will log the
+               # error and try to go on.
+               try:
+                       plugin = plugin_class(self)
+               except:
+                       log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True)
+                       return
 
-                       log.debug(_("Data source '%(name)s' registered %(number)s instance(s).") % \
-                               { "name" : data_source.name, "number" : len(ret) })
+               self.plugins.append(plugin)
 
-                       self.data_sources += ret
+       @property
+       def templates(self):
+               for plugin in self.plugins:
+                       for template in plugin.templates:
+                               yield template
 
        def run(self):
                # Register signal handlers.
                self.register_signal_handler()
 
-               # Start all data source threads.
-               for ds in self.data_sources:
-                       ds.start()
+               # Cannot do anything if no plugins have been initialised
+               if not self.plugins:
+                       log.critical(_("No plugins have been initialised"))
+                       return
+
+               # Start the bus
+               self.bus.start()
+
+               # Initialise the timer queue
+               self.initialise_timer_queue()
+
+               # Start worker threads
+               for w in self.worker_threads:
+                       w.start()
+
+               # Run the write queue thread
+               self.write_queue.start()
 
                # Regularly submit all data to disk.
                while self.running:
-                       if self.timer.wait():
-                               self.submit_all()
+                       try:
+                               # Try processing one event from the queue. If that succeeded
+                               # we will retry immediately.
+                               if self.process_timer_queue():
+                                       continue
 
-               # Wait until all instances are finished.
-               while self.data_sources:
-                       for ds in self.data_sources[:]:
-                               if not ds.isAlive():
-                                       log.debug(_("%s is not alive anymore. Removing.") % ds)
-                                       self.data_sources.remove(ds)
+                               # Otherwise we will sleep for a bit
+                               time.sleep(self.HEARTBEAT)
 
-                       # Wait a bit.
-                       time.sleep(0.1)
+                               # Log warnings if the worker queue is filling up
+                               queue_size = self._worker_queue.qsize()
+                               if queue_size >= 5:
+                                       log.warning(_("Worker queue is filling up with %s events") % queue_size)
 
-               log.debug(_("No thread running. Exiting main thread."))
+                       except KeyboardInterrupt:
+                               self.shutdown()
+                               break
 
-       def submit_all(self):
-               """
-                       Submit all data right now.
-               """
-               log.debug(_("Submitting all data in memory"))
-               for ds in self.data_sources:
-                       ds._submit()
+               # Wait until all worker threads are finished
+               for w in self.worker_threads:
+                       w.join()
+
+               # Stop the bus thread
+               self.bus.shutdown()
 
-               # Schedule the next submit.
-               self.timer.reset()
+               # Write all collected data to disk before ending the main thread
+               self.write_queue.shutdown()
+
+               log.debug(_("Main thread exited"))
 
        def shutdown(self):
-               log.debug(_("Received shutdown signal"))
+               if not self.running:
+                       return
 
+               log.info(_("Received shutdown signal"))
                self.running = False
-               if self.timer:
-                       self.timer.cancel()
 
                # Propagating shutdown to all threads.
-               for ds in self.data_sources:
-                       ds.shutdown()
+               for w in self.worker_threads:
+                       w.shutdown()
 
        def register_signal_handler(self):
                for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
@@ -127,9 +171,223 @@ class Collecty(object):
                        self.shutdown()
 
                elif sig == signal.SIGUSR1:
-                       # Submit all data.
-                       self.submit_all()
+                       # Commit all data.
+                       self.write_queue.commit()
+
+       def get_plugin_from_template(self, template_name):
+               for plugin in self.plugins:
+                       if not template_name in [t.name for t in plugin.templates]:
+                               continue
+
+                       return plugin
+
+       def generate_graph(self, template_name, *args, **kwargs):
+               plugin = self.get_plugin_from_template(template_name)
+               if not plugin:
+                       raise RuntimeError("Could not find template %s" % template_name)
+
+               return plugin.generate_graph(template_name, *args, **kwargs)
+
+       def create_worker_threads(self, num=None):
+               """
+                       Creates a number of worker threads
+               """
+               # If no number of threads is given, we will create as many as we have
+               # active processor cores but never less than two.
+               if num is None:
+                       num = max(multiprocessing.cpu_count(), 2)
+
+               worker_threads = []
+
+               for id in range(num):
+                       worker_thread = WorkerThread(self, id)
+                       worker_threads.append(worker_thread)
+
+               return worker_threads
+
+       def initialise_timer_queue(self):
+               for p in self.plugins:
+                       timer = PluginTimer(p)
+
+                       self._timer_queue.put(timer)
+
+       def process_timer_queue(self):
+               # Take the item from the timer queue that is to be due first
+               timer = self._timer_queue.get()
+
+               try:
+                       # If the timer event is to be executed, we will put the plugin
+                       # into the worker queue and reset the timer
+                       if timer.is_due():
+                               self._worker_queue.put(timer.plugin)
+                               timer.reset_deadline()
+
+                               return timer
+               finally:
+                       # Put the timer back into the timer queue.
+                       self._timer_queue.put(timer)
+
+
+class WorkerThread(threading.Thread):
+       HEARTBEAT = 2.5
+
+       def __init__(self, collecty, id):
+               threading.Thread.__init__(self)
+               self.daemon = True
+
+               self.log = logging.getLogger("collecty.worker")
+               self.log.propagate = 1
+
+               self.collecty = collecty
+               self.id = id
+
+               self.log.debug(_("Worker thread %s has been initialised") % self.id)
 
        @property
-       def graph_default_arguments(self):
-               return GRAPH_DEFAULT_ARGUMENTS
+       def queue(self):
+               """
+                       The queue this thread is getting events from
+               """
+               return self.collecty._worker_queue
+
+       def run(self):
+               self.log.debug(_("Worker thread %s has been started") % self.id)
+               self.running = True
+
+               while self.running:
+                       try:
+                               plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
+
+                       # If the queue has been empty we just retry
+                       except queue.Empty:
+                               continue
+
+                       # Execute the collect operation for this plugin
+                       plugin.collect()
+
+               self.log.debug(_("Worker thread %s has been terminated") % self.id)
+
+       def shutdown(self):
+               self.running = False
+
+
+class WriteQueue(threading.Thread):
+       def __init__(self, collecty, submit_interval):
+               threading.Thread.__init__(self)
+               self.daemon = True
+
+               self.collecty = collecty
+
+               self.log = logging.getLogger("collecty.queue")
+               self.log.propagate = 1
+
+               self.timer = plugins.Timer(submit_interval)
+               self._queue = queue.PriorityQueue()
+
+               self.log.debug(_("Initialised write queue"))
+
+       def run(self):
+               self.log.debug(_("Write queue process started"))
+               self.running = True
+
+               while self.running:
+                       # Reset the timer.
+                       self.timer.reset()
+
+                       # Wait until the timer has successfully elapsed.
+                       if self.timer.wait():
+                               self.commit()
+
+               self.commit()
+               self.log.debug(_("Write queue process stopped"))
+
+       def shutdown(self):
+               self.running = False
+               self.timer.cancel()
+
+               # Wait until all data has been written.
+               self.join()
+
+       def add(self, object, time, data):
+               result = QueueObject(object.file, time, data)
+               self._queue.put(result)
+
+       def commit(self):
+               """
+                       Flushes the read data to disk.
+               """
+               # There is nothing to do if the queue is empty
+               if self._queue.empty():
+                       self.log.debug(_("No data to commit"))
+                       return
+
+               time_start = time.time()
+
+               self.log.debug(_("Submitting data to the databases..."))
+
+               # Get all objects from the queue and group them by the RRD file
+               # to commit them all at once
+               results = {}
+               while not self._queue.empty():
+                       result = self._queue.get()
+
+                       try:
+                               results[result.file].append(result)
+                       except KeyError:
+                               results[result.file] = [result]
+
+               # Write the collected data to disk
+               for filename, results in list(results.items()):
+                       self._commit_file(filename, results)
+
+               duration = time.time() - time_start
+               self.log.debug(_("Emptied write queue in %.2fs") % duration)
+
+       def _commit_file(self, filename, results):
+               self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
+                       % { "counter" : len(results), "filename" : filename })
+
+               for result in results:
+                       self.log.debug("  %s: %s" % (result.time, result.data))
+
+               try:
+                       rrdtool.update(filename, *["%s" % r for r in results])
+
+               # Catch operational errors like unreadable/unwritable RRD databases
+               # or those where the format has changed. The collected data will be lost.
+               except rrdtool.OperationalError as e:
+                       self.log.critical(_("Could not update RRD database %s: %s") \
+                               % (filename, e))
+
+
+class QueueObject(object):
+       def __init__(self, file, time, data):
+               self.file = file
+               self.time = time
+               self.data = data
+
+       def __str__(self):
+               return "%s:%s" % (self.time.strftime("%s"), self.data)
+
+       def __lt__(self, other):
+               return self.time < other.time
+
+
+class PluginTimer(object):
+       def __init__(self, plugin):
+               self.plugin = plugin
+
+               self.deadline = datetime.datetime.utcnow()
+
+       def __repr__(self):
+               return "<%s %s>" % (self.__class__.__name__, self.deadline)
+
+       def __lt__(self, other):
+               return self.deadline < other.deadline
+
+       def reset_deadline(self):
+               self.deadline = datetime.datetime.utcnow() \
+                       + datetime.timedelta(seconds=self.plugin.interval)
+
+       def is_due(self):
+               return datetime.datetime.utcnow() >= self.deadline