# #
###############################################################################
+import Queue as queue
+import rrdtool
import signal
+import threading
+import time
+import bus
import plugins
from constants import *
# The default interval, when all data is written to disk.
SUBMIT_INTERVAL = 300
+ HEARTBEAT = 5
+
def __init__(self, debug=False):
+ self.debug = debug
+
# Enable debug logging when running in debug mode
- if debug:
+ if self.debug:
log.setLevel(logging.DEBUG)
self.plugins = []
# Indicates whether this process should be running or not.
self.running = True
- self.timer = plugins.Timer(self.SUBMIT_INTERVAL, heartbeat=2)
+
+ # The write queue holds all collected pieces of data which
+ # will be written to disk later.
+ self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
+
+ # Create a thread that connects to dbus and processes requests we
+ # get from there.
+ self.bus = bus.Bus(self)
# Add all plugins
for plugin in plugins.get():
self.plugins.append(plugin)
+ @property
+ def templates(self):
+ for plugin in self.plugins:
+ for template in plugin.templates:
+ yield template
+
def run(self):
# Register signal handlers.
self.register_signal_handler()
+ # Start the bus
+ self.bus.start()
+
# Start all data source threads.
for p in self.plugins:
p.start()
+ # Run the write queue thread
+ self.write_queue.start()
+
# Regularly submit all data to disk.
while self.running:
- if self.timer.wait():
- self.submit_all()
+ try:
+ time.sleep(self.HEARTBEAT)
+ except KeyboardInterrupt:
+ self.shutdown()
+ break
- # Wait until all instances are finished.
- while self.plugins:
- for p in self.plugins[:]:
- if not p.isAlive():
- log.debug(_("%s is not alive anymore. Removing.") % p)
- self.plugins.remove(p)
+ # Wait until all plugins are finished.
+ for p in self.plugins:
+ p.join()
- # Wait a bit.
- time.sleep(0.1)
+ # Stop the bus thread
+ self.bus.shutdown()
- log.debug(_("No thread running. Exiting main thread."))
+ # Write all collected data to disk before ending the main thread
+ self.write_queue.shutdown()
- def submit_all(self):
- """
- Submit all data right now.
- """
- log.debug(_("Submitting all data in memory"))
- for p in self.plugins:
- p._submit()
-
- # Schedule the next submit.
- self.timer.reset()
+ log.debug(_("Main thread exited"))
def shutdown(self):
- log.debug(_("Received shutdown signal"))
+ if not self.running:
+ return
+ log.debug(_("Received shutdown signal"))
self.running = False
- if self.timer:
- self.timer.cancel()
# Propagating shutdown to all threads.
for p in self.plugins:
self.shutdown()
elif sig == signal.SIGUSR1:
- # Submit all data.
- self.submit_all()
+ # Commit all data.
+ self.write_queue.commit()
- @property
- def graph_default_arguments(self):
- return GRAPH_DEFAULT_ARGUMENTS
+ def get_plugin_from_template(self, template_name):
+ for plugin in self.plugins:
+ if not template_name in [t.name for t in plugin.templates]:
+ continue
+
+ return plugin
+
+ def generate_graph(self, template_name, *args, **kwargs):
+ plugin = self.get_plugin_from_template(template_name)
+ if not plugin:
+ raise RuntimeError("Could not find template %s" % template_name)
+
+ return plugin.generate_graph(template_name, *args, **kwargs)
+
+
+class WriteQueue(threading.Thread):
+ def __init__(self, collecty, submit_interval):
+ threading.Thread.__init__(self)
+ self.daemon = True
+
+ self.collecty = collecty
+
+ self.log = logging.getLogger("collecty.queue")
+ self.log.setLevel(logging.DEBUG)
+ self.log.propagate = 1
+
+ self.timer = plugins.Timer(submit_interval)
+ self._queue = queue.PriorityQueue()
+
+ self.log.debug(_("Initialised write queue"))
+
+ def run(self):
+ self.log.debug(_("Write queue process started"))
+ self.running = True
+
+ while self.running:
+ # Reset the timer.
+ self.timer.reset()
+
+ # Wait until the timer has successfully elapsed.
+ if self.timer.wait():
+ self.commit()
+
+ self.commit()
+ self.log.debug(_("Write queue process stopped"))
+
+ def shutdown(self):
+ self.running = False
+ self.timer.cancel()
+
+ # Wait until all data has been written.
+ self.join()
+
+ def add(self, object, time, data):
+ result = QueueObject(object.file, time, data)
+ self._queue.put(result)
+
+ def commit(self):
+ """
+ Flushes the read data to disk.
+ """
+ # There is nothing to do if the queue is empty
+ if self._queue.empty():
+ self.log.debug(_("No data to commit"))
+ return
+
+ time_start = time.time()
+
+ self.log.debug(_("Submitting data to the databases..."))
+
+ # Get all objects from the queue and group them by the RRD file
+ # to commit them all at once
+ results = {}
+ while not self._queue.empty():
+ result = self._queue.get()
+
+ try:
+ results[result.file].append(result)
+ except KeyError:
+ results[result.file] = [result]
+
+ # Write the collected data to disk
+ for filename, results in results.items():
+ self._commit_file(filename, results)
+
+ duration = time.time() - time_start
+ self.log.debug(_("Emptied write queue in %.2fs") % duration)
+
+ def _commit_file(self, filename, results):
+ self.log.debug(_("Committing %(counter)s entries to %(filename)s:") \
+ % { "counter" : len(results), "filename" : filename })
+
+ if self.collecty.debug:
+ for result in results:
+ self.log.debug(" %s: %s" % (result.time, result.data))
+
+ rrdtool.update(filename, *["%s" % r for r in results])
+
+
+class QueueObject(object):
+ def __init__(self, file, time, data):
+ self.file = file
+ self.time = time
+ self.data = data
+
+ def __str__(self):
+ return "%s:%s" % (self.time.strftime("%s"), self.data)
+
+ def __cmp__(self, other):
+ return cmp(self.time, other.time)