]> git.ipfire.org Git - collecty.git/blobdiff - src/collecty/daemon.py
Add dbus interface
[collecty.git] / src / collecty / daemon.py
index a66fa108352d8530fbfd00f050eae7ff2d85fc58..2fa000f0e45ef03aca0400cda3f9207f0001f6ff 100644 (file)
 #                                                                             #
 ###############################################################################
 
+import Queue as queue
+import rrdtool
 import signal
+import threading
+import time
 
+import bus
 import plugins
 
 from constants import *
@@ -33,16 +38,27 @@ class Collecty(object):
        # The default interval, when all data is written to disk.
        SUBMIT_INTERVAL = 300
 
+       HEARTBEAT = 5
+
        def __init__(self, debug=False):
+               self.debug = debug
+
                # Enable debug logging when running in debug mode
-               if debug:
+               if self.debug:
                        log.setLevel(logging.DEBUG)
 
                self.plugins = []
 
                # Indicates whether this process should be running or not.
                self.running = True
-               self.timer = plugins.Timer(self.SUBMIT_INTERVAL, heartbeat=2)
+
+               # The write queue holds all collected pieces of data which
+               # will be written to disk later.
+               self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
+
+               # Create a thread that connects to dbus and processes requests we
+               # get from there.
+               self.bus = bus.Bus(self)
 
                # Add all plugins
                for plugin in plugins.get():
@@ -62,48 +78,52 @@ class Collecty(object):
 
                self.plugins.append(plugin)
 
+       @property
+       def templates(self):
+               for plugin in self.plugins:
+                       for template in plugin.templates:
+                               yield template
+
        def run(self):
                # Register signal handlers.
                self.register_signal_handler()
 
+               # Start the bus
+               self.bus.start()
+
                # Start all data source threads.
                for p in self.plugins:
                        p.start()
 
+               # Run the write queue thread
+               self.write_queue.start()
+
                # Regularly submit all data to disk.
                while self.running:
-                       if self.timer.wait():
-                               self.submit_all()
+                       try:
+                               time.sleep(self.HEARTBEAT)
+                       except KeyboardInterrupt:
+                               self.shutdown()
+                               break
 
-               # Wait until all instances are finished.
-               while self.plugins:
-                       for p in self.plugins[:]:
-                               if not p.isAlive():
-                                       log.debug(_("%s is not alive anymore. Removing.") % p)
-                                       self.plugins.remove(p)
+               # Wait until all plugins are finished.
+               for p in self.plugins:
+                       p.join()
 
-                       # Wait a bit.
-                       time.sleep(0.1)
+               # Stop the bus thread
+               self.bus.shutdown()
 
-               log.debug(_("No thread running. Exiting main thread."))
+               # Write all collected data to disk before ending the main thread
+               self.write_queue.shutdown()
 
-       def submit_all(self):
-               """
-                       Submit all data right now.
-               """
-               log.debug(_("Submitting all data in memory"))
-               for p in self.plugins:
-                       p._submit()
-
-               # Schedule the next submit.
-               self.timer.reset()
+               log.debug(_("Main thread exited"))
 
        def shutdown(self):
-               log.debug(_("Received shutdown signal"))
+               if not self.running:
+                       return
 
+               log.debug(_("Received shutdown signal"))
                self.running = False
-               if self.timer:
-                       self.timer.cancel()
 
                # Propagating shutdown to all threads.
                for p in self.plugins:
@@ -123,9 +143,116 @@ class Collecty(object):
                        self.shutdown()
 
                elif sig == signal.SIGUSR1:
-                       # Submit all data.
-                       self.submit_all()
+                       # Commit all data.
+                       self.write_queue.commit()
 
-       @property
-       def graph_default_arguments(self):
-               return GRAPH_DEFAULT_ARGUMENTS
+       def get_plugin_from_template(self, template_name):
+               for plugin in self.plugins:
+                       if not template_name in [t.name for t in plugin.templates]:
+                               continue
+
+                       return plugin
+
+       def generate_graph(self, template_name, *args, **kwargs):
+               plugin = self.get_plugin_from_template(template_name)
+               if not plugin:
+                       raise RuntimeError("Could not find template %s" % template_name)
+
+               return plugin.generate_graph(template_name, *args, **kwargs)
+
+
+class WriteQueue(threading.Thread):
+       def __init__(self, collecty, submit_interval):
+               threading.Thread.__init__(self)
+               self.daemon = True
+
+               self.collecty = collecty
+
+               self.log = logging.getLogger("collecty.queue")
+               self.log.setLevel(logging.DEBUG)
+               self.log.propagate = 1
+
+               self.timer = plugins.Timer(submit_interval)
+               self._queue = queue.PriorityQueue()
+
+               self.log.debug(_("Initialised write queue"))
+
+       def run(self):
+               self.log.debug(_("Write queue process started"))
+               self.running = True
+
+               while self.running:
+                       # Reset the timer.
+                       self.timer.reset()
+
+                       # Wait until the timer has successfully elapsed.
+                       if self.timer.wait():
+                               self.commit()
+
+               self.commit()
+               self.log.debug(_("Write queue process stopped"))
+
+       def shutdown(self):
+               self.running = False
+               self.timer.cancel()
+
+               # Wait until all data has been written.
+               self.join()
+
+       def add(self, object, time, data):
+               result = QueueObject(object.file, time, data)
+               self._queue.put(result)
+
+       def commit(self):
+               """
+                       Flushes the read data to disk.
+               """
+               # There is nothing to do if the queue is empty
+               if self._queue.empty():
+                       self.log.debug(_("No data to commit"))
+                       return
+
+               time_start = time.time()
+
+               self.log.debug(_("Submitting data to the databases..."))
+
+               # Get all objects from the queue and group them by the RRD file
+               # to commit them all at once
+               results = {}
+               while not self._queue.empty():
+                       result = self._queue.get()
+
+                       try:
+                               results[result.file].append(result)
+                       except KeyError:
+                               results[result.file] = [result]
+
+               # Write the collected data to disk
+               for filename, results in results.items():
+                       self._commit_file(filename, results)
+
+               duration = time.time() - time_start
+               self.log.debug(_("Emptied write queue in %.2fs") % duration)
+
+       def _commit_file(self, filename, results):
+               self.log.debug(_("Committing %(counter)s entries to %(filename)s:") \
+                       % { "counter" : len(results), "filename" : filename })
+
+               if self.collecty.debug:
+                       for result in results:
+                               self.log.debug("  %s: %s" % (result.time, result.data))
+
+               rrdtool.update(filename, *["%s" % r for r in results])
+
+
+class QueueObject(object):
+       def __init__(self, file, time, data):
+               self.file = file
+               self.time = time
+               self.data = data
+
+       def __str__(self):
+               return "%s:%s" % (self.time.strftime("%s"), self.data)
+
+       def __cmp__(self, other):
+               return cmp(self.time, other.time)