-#!/usr/bin/python
+#!/usr/bin/python3
###############################################################################
# #
# collecty - A system statistics collection daemon for IPFire #
# #
###############################################################################
-import Queue as queue
+import datetime
+import multiprocessing
+import os
+import queue
import rrdtool
import signal
import threading
import time
-import plugins
+from . import bus
+from . import locales
+from . import plugins
-from constants import *
-from i18n import _
+from .constants import *
+from .i18n import _
import logging
log = logging.getLogger("collecty")
# The default interval, when all data is written to disk.
SUBMIT_INTERVAL = 300
- HEARTBEAT = 5
+ HEARTBEAT = 1
def __init__(self, debug=False):
self.debug = debug
+ # Reset timezone to UTC
+ # rrdtool is reading that from the environment
+ os.environ["TZ"] = "UTC"
+
# Enable debug logging when running in debug mode
if self.debug:
log.setLevel(logging.DEBUG)
# will be written to disk later.
self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
+ # Create worker threads
+ self.worker_threads = self.create_worker_threads()
+
+ self._timer_queue = queue.PriorityQueue()
+ self._worker_queue = queue.Queue()
+
+ # Create a thread that connects to dbus and processes requests we
+ # get from there.
+ self.bus = bus.Bus(self)
+
# Add all plugins
for plugin in plugins.get():
self.add_plugin(plugin)
- log.info(_("Collecty successfully initialized with %s plugins") \
+ log.debug(_("Collecty successfully initialized with %s plugins") \
% len(self.plugins))
+ log.debug(_("Supported locales: %s") % ", ".join(locales.get_supported_locales()))
+
def add_plugin(self, plugin_class):
# Try initialising a new plugin. If that fails, we will log the
# error and try to go on.
self.plugins.append(plugin)
+ @property
+ def templates(self):
+ for plugin in self.plugins:
+ for template in plugin.templates:
+ yield template
+
def run(self):
# Register signal handlers.
self.register_signal_handler()
- # Start all data source threads.
- for p in self.plugins:
- p.start()
+ # Cannot do anything if no plugins have been initialised
+ if not self.plugins:
+ log.critical(_("No plugins have been initialised"))
+ return
+
+ # Start the bus
+ self.bus.start()
+
+ # Initialise the timer queue
+ self.initialise_timer_queue()
+
+ # Start worker threads
+ for w in self.worker_threads:
+ w.start()
# Run the write queue thread
self.write_queue.start()
# Regularly submit all data to disk.
while self.running:
try:
+ # Try processing one event from the queue. If that succeeded
+ # we will retry immediately.
+ if self.process_timer_queue():
+ continue
+
+ # Otherwise we will sleep for a bit
time.sleep(self.HEARTBEAT)
+
+ # Log warnings if the worker queue is filling up
+ queue_size = self._worker_queue.qsize()
+ if queue_size >= 5:
+ log.warning(_("Worker queue is filling up with %s events") % queue_size)
+
except KeyboardInterrupt:
self.shutdown()
break
- # Wait until all plugins are finished.
- for p in self.plugins:
- p.join()
+ # Wait until all worker threads are finished
+ for w in self.worker_threads:
+ w.join()
+
+ # Stop the bus thread
+ self.bus.shutdown()
# Write all collected data to disk before ending the main thread
self.write_queue.shutdown()
if not self.running:
return
- log.debug(_("Received shutdown signal"))
+ log.info(_("Received shutdown signal"))
self.running = False
# Propagating shutdown to all threads.
- for p in self.plugins:
- p.shutdown()
+ for w in self.worker_threads:
+ w.shutdown()
def register_signal_handler(self):
for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
# Commit all data.
self.write_queue.commit()
+ def get_plugin_from_template(self, template_name):
+ for plugin in self.plugins:
+ if not template_name in [t.name for t in plugin.templates]:
+ continue
+
+ return plugin
+
+ def generate_graph(self, template_name, *args, **kwargs):
+ plugin = self.get_plugin_from_template(template_name)
+ if not plugin:
+ raise RuntimeError("Could not find template %s" % template_name)
+
+ return plugin.generate_graph(template_name, *args, **kwargs)
+
+ def graph_info(self, template_name, *args, **kwargs):
+ plugin = self.get_plugin_from_template(template_name)
+ if not plugin:
+ raise RuntimeError("Could not find template %s" % template_name)
+
+ return plugin.graph_info(template_name, *args, **kwargs)
+
+ def create_worker_threads(self, num=None):
+ """
+ Creates a number of worker threads
+ """
+ # If no number of threads is given, we will create as many as we have
+ # active processor cores but never less than two.
+ if num is None:
+ num = max(multiprocessing.cpu_count(), 2)
+
+ worker_threads = []
+
+ for id in range(num):
+ worker_thread = WorkerThread(self, id)
+ worker_threads.append(worker_thread)
+
+ return worker_threads
+
+ def initialise_timer_queue(self):
+ for p in self.plugins:
+ timer = PluginTimer(p)
+
+ self._timer_queue.put(timer)
+
+ def process_timer_queue(self):
+ # Take the item from the timer queue that is to be due first
+ timer = self._timer_queue.get()
+
+ try:
+ # If the timer event is to be executed, we will put the plugin
+ # into the worker queue and reset the timer
+ if timer.is_due():
+ self._worker_queue.put(timer.plugin)
+ timer.reset_deadline()
+
+ return timer
+ finally:
+ # Put the timer back into the timer queue.
+ self._timer_queue.put(timer)
+
+
+class WorkerThread(threading.Thread):
+ HEARTBEAT = 2.5
+
+ def __init__(self, collecty, id):
+ threading.Thread.__init__(self)
+ self.daemon = True
+
+ self.log = logging.getLogger("collecty.worker")
+ self.log.propagate = 1
+
+ self.collecty = collecty
+ self.id = id
+
+ self.log.debug(_("Worker thread %s has been initialised") % self.id)
+
@property
- def graph_default_arguments(self):
- return GRAPH_DEFAULT_ARGUMENTS
+ def queue(self):
+ """
+ The queue this thread is getting events from
+ """
+ return self.collecty._worker_queue
+
+ def run(self):
+ self.log.debug(_("Worker thread %s has been started") % self.id)
+ self.running = True
+
+ while self.running:
+ try:
+ plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
+
+ # If the queue has been empty we just retry
+ except queue.Empty:
+ continue
+
+ # Execute the collect operation for this plugin
+ plugin.collect()
+
+ self.log.debug(_("Worker thread %s has been terminated") % self.id)
+
+ def shutdown(self):
+ self.running = False
class WriteQueue(threading.Thread):
self.collecty = collecty
self.log = logging.getLogger("collecty.queue")
- self.log.setLevel(logging.DEBUG)
self.log.propagate = 1
self.timer = plugins.Timer(submit_interval)
results[result.file] = [result]
# Write the collected data to disk
- for filename, results in results.items():
+ for filename, results in list(results.items()):
self._commit_file(filename, results)
duration = time.time() - time_start
self.log.debug(_("Emptied write queue in %.2fs") % duration)
def _commit_file(self, filename, results):
- self.log.debug(_("Committing %(counter)s entries to %(filename)s:") \
+ self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
% { "counter" : len(results), "filename" : filename })
- if self.collecty.debug:
- for result in results:
- self.log.debug(" %s: %s" % (result.time, result.data))
+ for result in results:
+ self.log.debug(" %s: %s" % (result.time, result.data))
- rrdtool.update(filename, *["%s" % r for r in results])
+ try:
+ rrdtool.update(filename, *["%s" % r for r in results])
+
+ # Catch operational errors like unreadable/unwritable RRD databases
+ # or those where the format has changed. The collected data will be lost.
+ except rrdtool.OperationalError as e:
+ self.log.critical(_("Could not update RRD database %s: %s") \
+ % (filename, e))
class QueueObject(object):
def __str__(self):
return "%s:%s" % (self.time.strftime("%s"), self.data)
- def __cmp__(self, other):
- return cmp(self.time, other.time)
+ def __lt__(self, other):
+ return self.time < other.time
+
+
+class PluginTimer(object):
+ def __init__(self, plugin):
+ self.plugin = plugin
+
+ self.deadline = datetime.datetime.utcnow()
+
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.deadline)
+
+ def __lt__(self, other):
+ return self.deadline < other.deadline
+
+ def reset_deadline(self):
+ self.deadline = datetime.datetime.utcnow() \
+ + datetime.timedelta(seconds=self.plugin.interval)
+
+ def is_due(self):
+ return datetime.datetime.utcnow() >= self.deadline