###############################################################################
import Queue as queue
+import datetime
+import multiprocessing
import rrdtool
import signal
import threading
# The default interval, when all data is written to disk.
SUBMIT_INTERVAL = 300
- HEARTBEAT = 5
+ HEARTBEAT = 1
def __init__(self, debug=False):
self.debug = debug
# will be written to disk later.
self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
+ # Create worker threads
+ self.worker_threads = self.create_worker_threads()
+
+ self._timer_queue = queue.PriorityQueue()
+ self._worker_queue = queue.Queue()
+
# Create a thread that connects to dbus and processes requests we
# get from there.
self.bus = bus.Bus(self)
# Start the bus
self.bus.start()
- # Start all data source threads.
- for p in self.plugins:
- p.start()
+ # Initialise the timer queue
+ self.initialise_timer_queue()
+
+ # Start worker threads
+ for w in self.worker_threads:
+ w.start()
# Run the write queue thread
self.write_queue.start()
# Regularly submit all data to disk.
while self.running:
try:
+ # Try processing one event from the queue. If that succeeded
+ # we will retry immediately.
+ if self.process_timer_queue():
+ continue
+
+ # Otherwise we will sleep for a bit
time.sleep(self.HEARTBEAT)
+
+ # Log warnings if the worker queue is filling up
+ queue_size = self._worker_queue.qsize()
+ if queue_size >= 5:
+ log.warning(_("Worker queue is filling up with %s events") % queue_size)
+
except KeyboardInterrupt:
self.shutdown()
break
- # Wait until all plugins are finished.
- for p in self.plugins:
- p.join()
+ # Wait until all worker threads are finished
+ for w in self.worker_threads:
+ w.join()
# Stop the bus thread
self.bus.shutdown()
self.running = False
# Propagating shutdown to all threads.
- for p in self.plugins:
- p.shutdown()
+ for w in self.worker_threads:
+ w.shutdown()
def register_signal_handler(self):
for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
return plugin.generate_graph(template_name, *args, **kwargs)
+ def create_worker_threads(self, num=None):
+ """
+ Creates a number of worker threads
+ """
+ # If no number of threads is given, we will create as many as we have
+ # active processor cores but never less than four.
+ if num is None:
+ num = max(multiprocessing.cpu_count(), 4)
+
+ worker_threads = []
+
+ for id in range(num):
+ worker_thread = WorkerThread(self, id)
+ worker_threads.append(worker_thread)
+
+ return worker_threads
+
+ def initialise_timer_queue(self):
+ for p in self.plugins:
+ timer = PluginTimer(p)
+
+ self._timer_queue.put(timer)
+
+ def process_timer_queue(self):
+ # Take the item from the timer queue that is to be due first
+ timer = self._timer_queue.get()
+
+ try:
+ # If the timer event is to be executed, we will put the plugin
+ # into the worker queue and reset the timer
+ if timer.is_due():
+ self._worker_queue.put(timer.plugin)
+ timer.reset_deadline()
+
+ return timer
+ finally:
+ # Put the timer back into the timer queue.
+ self._timer_queue.put(timer)
+
+
+class WorkerThread(threading.Thread):
+ HEARTBEAT = 2.5
+
+ def __init__(self, collecty, id):
+ threading.Thread.__init__(self)
+ self.daemon = True
+
+ self.log = logging.getLogger("collecty.worker")
+ self.log.propagate = 1
+
+ self.collecty = collecty
+ self.id = id
+
+ self.log.debug(_("Worker thread %s has been initialised") % self.id)
+
+ @property
+ def queue(self):
+ """
+ The queue this thread is getting events from
+ """
+ return self.collecty._worker_queue
+
+ def run(self):
+ self.log.debug(_("Worker thread %s has been started") % self.id)
+ self.running = True
+
+ while self.running:
+ try:
+ plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
+
+ # If the queue has been empty we just retry
+ except queue.Empty:
+ continue
+
+ # Execute the collect operation for this plugin
+ plugin.collect()
+
+ self.log.debug(_("Worker thread %s has been terminated") % self.id)
+
+ def shutdown(self):
+ self.running = False
+
+ # Wait until all data has been written.
+ self.join()
+
class WriteQueue(threading.Thread):
def __init__(self, collecty, submit_interval):
def __cmp__(self, other):
return cmp(self.time, other.time)
+
+
+class PluginTimer(object):
+ def __init__(self, plugin):
+ self.plugin = plugin
+
+ self.deadline = datetime.datetime.utcnow()
+
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.deadline)
+
+ def __cmp__(self, other):
+ return cmp(self.deadline, other.deadline)
+
+ def reset_deadline(self):
+ self.deadline = datetime.datetime.utcnow() \
+ + datetime.timedelta(seconds=self.plugin.interval)
+
+ def is_due(self):
+ return datetime.datetime.utcnow() >= self.deadline
return self.elapsed > self.timeout
-class Plugin(threading.Thread):
+class Plugin(object):
# The name of this plugin.
name = None
_plugins[plugin.name] = plugin
def __init__(self, collecty, **kwargs):
- threading.Thread.__init__(self, name=self.description)
- self.daemon = True
-
self.collecty = collecty
# Check if this plugin was configured correctly.
# Run some custom initialization.
self.init(**kwargs)
- # Keepalive options
- self.running = True
- self.timer = Timer(self.interval)
-
self.log.debug(_("Successfully initialized %s") % self.__class__.__name__)
@property
self.collecty.write_queue.add(o, now, result)
# Returns the time this function took to complete.
- return (time.time() - time_start)
-
- def run(self):
- self.log.debug(_("%s plugin has started") % self.name)
-
- # Initially collect everything
- self.collect()
-
- while self.running:
- # Reset the timer.
- self.timer.reset()
-
- # Wait until the timer has successfully elapsed.
- if self.timer.wait():
- delay = self.collect()
- self.timer.reset(delay)
-
- self.log.debug(_("%s plugin has stopped") % self.name)
-
- def shutdown(self):
- self.log.debug(_("Received shutdown signal."))
- self.running = False
+ delay = time.time() - time_start
- # Kill any running timers.
- if self.timer:
- self.timer.cancel()
+ # Log some warning when a collect method takes too long to return some data
+ if delay >= 60:
+ self.log.warning(_("A worker thread was stalled for %.4fs") % delay)
def get_object(self, id):
for object in self.objects: