From 49c1b8fd8f1c6109663d72448a8d586cc56adf95 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Tue, 26 May 2015 17:18:38 +0000 Subject: [PATCH] Implement worker thread concept The threading model has been move from the plugins into a worker thread model that has a constant number of threads and should therefore waste even less resources. --- src/collecty/daemon.py | 146 ++++++++++++++++++++++++++++++++--- src/collecty/plugins/base.py | 38 ++------- 2 files changed, 142 insertions(+), 42 deletions(-) diff --git a/src/collecty/daemon.py b/src/collecty/daemon.py index 236e1c9..bb46971 100644 --- a/src/collecty/daemon.py +++ b/src/collecty/daemon.py @@ -20,6 +20,8 @@ ############################################################################### import Queue as queue +import datetime +import multiprocessing import rrdtool import signal import threading @@ -38,7 +40,7 @@ class Collecty(object): # The default interval, when all data is written to disk. SUBMIT_INTERVAL = 300 - HEARTBEAT = 5 + HEARTBEAT = 1 def __init__(self, debug=False): self.debug = debug @@ -56,6 +58,12 @@ class Collecty(object): # will be written to disk later. self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL) + # Create worker threads + self.worker_threads = self.create_worker_threads() + + self._timer_queue = queue.PriorityQueue() + self._worker_queue = queue.Queue() + # Create a thread that connects to dbus and processes requests we # get from there. self.bus = bus.Bus(self) @@ -91,9 +99,12 @@ class Collecty(object): # Start the bus self.bus.start() - # Start all data source threads. - for p in self.plugins: - p.start() + # Initialise the timer queue + self.initialise_timer_queue() + + # Start worker threads + for w in self.worker_threads: + w.start() # Run the write queue thread self.write_queue.start() @@ -101,14 +112,26 @@ class Collecty(object): # Regularly submit all data to disk. while self.running: try: + # Try processing one event from the queue. If that succeeded + # we will retry immediately. + if self.process_timer_queue(): + continue + + # Otherwise we will sleep for a bit time.sleep(self.HEARTBEAT) + + # Log warnings if the worker queue is filling up + queue_size = self._worker_queue.qsize() + if queue_size >= 5: + log.warning(_("Worker queue is filling up with %s events") % queue_size) + except KeyboardInterrupt: self.shutdown() break - # Wait until all plugins are finished. - for p in self.plugins: - p.join() + # Wait until all worker threads are finished + for w in self.worker_threads: + w.join() # Stop the bus thread self.bus.shutdown() @@ -126,8 +149,8 @@ class Collecty(object): self.running = False # Propagating shutdown to all threads. - for p in self.plugins: - p.shutdown() + for w in self.worker_threads: + w.shutdown() def register_signal_handler(self): for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1): @@ -160,6 +183,91 @@ class Collecty(object): return plugin.generate_graph(template_name, *args, **kwargs) + def create_worker_threads(self, num=None): + """ + Creates a number of worker threads + """ + # If no number of threads is given, we will create as many as we have + # active processor cores but never less than four. + if num is None: + num = max(multiprocessing.cpu_count(), 4) + + worker_threads = [] + + for id in range(num): + worker_thread = WorkerThread(self, id) + worker_threads.append(worker_thread) + + return worker_threads + + def initialise_timer_queue(self): + for p in self.plugins: + timer = PluginTimer(p) + + self._timer_queue.put(timer) + + def process_timer_queue(self): + # Take the item from the timer queue that is to be due first + timer = self._timer_queue.get() + + try: + # If the timer event is to be executed, we will put the plugin + # into the worker queue and reset the timer + if timer.is_due(): + self._worker_queue.put(timer.plugin) + timer.reset_deadline() + + return timer + finally: + # Put the timer back into the timer queue. + self._timer_queue.put(timer) + + +class WorkerThread(threading.Thread): + HEARTBEAT = 2.5 + + def __init__(self, collecty, id): + threading.Thread.__init__(self) + self.daemon = True + + self.log = logging.getLogger("collecty.worker") + self.log.propagate = 1 + + self.collecty = collecty + self.id = id + + self.log.debug(_("Worker thread %s has been initialised") % self.id) + + @property + def queue(self): + """ + The queue this thread is getting events from + """ + return self.collecty._worker_queue + + def run(self): + self.log.debug(_("Worker thread %s has been started") % self.id) + self.running = True + + while self.running: + try: + plugin = self.queue.get(block=True, timeout=self.HEARTBEAT) + + # If the queue has been empty we just retry + except queue.Empty: + continue + + # Execute the collect operation for this plugin + plugin.collect() + + self.log.debug(_("Worker thread %s has been terminated") % self.id) + + def shutdown(self): + self.running = False + + # Wait until all data has been written. + self.join() + class WriteQueue(threading.Thread): def __init__(self, collecty, submit_interval): @@ -261,3 +369,23 @@ class QueueObject(object): def __cmp__(self, other): return cmp(self.time, other.time) + + +class PluginTimer(object): + def __init__(self, plugin): + self.plugin = plugin + + self.deadline = datetime.datetime.utcnow() + + def __repr__(self): + return "<%s %s>" % (self.__class__.__name__, self.deadline) + + def __cmp__(self, other): + return cmp(self.deadline, other.deadline) + + def reset_deadline(self): + self.deadline = datetime.datetime.utcnow() \ + + datetime.timedelta(seconds=self.plugin.interval) + + def is_due(self): + return datetime.datetime.utcnow() >= self.deadline diff --git a/src/collecty/plugins/base.py b/src/collecty/plugins/base.py index 61a4682..8d768ae 100644 --- a/src/collecty/plugins/base.py +++ b/src/collecty/plugins/base.py @@ -73,7 +73,7 @@ class Timer(object): return self.elapsed > self.timeout -class Plugin(threading.Thread): +class Plugin(object): # The name of this plugin. name = None @@ -104,9 +104,6 @@ class Plugin(threading.Thread): _plugins[plugin.name] = plugin def __init__(self, collecty, **kwargs): - threading.Thread.__init__(self, name=self.description) - self.daemon = True - self.collecty = collecty # Check if this plugin was configured correctly. @@ -122,10 +119,6 @@ class Plugin(threading.Thread): # Run some custom initialization. self.init(**kwargs) - # Keepalive options - self.running = True - self.timer = Timer(self.interval) - self.log.debug(_("Successfully initialized %s") % self.__class__.__name__) @property @@ -173,32 +166,11 @@ class Plugin(threading.Thread): self.collecty.write_queue.add(o, now, result) # Returns the time this function took to complete. - return (time.time() - time_start) - - def run(self): - self.log.debug(_("%s plugin has started") % self.name) - - # Initially collect everything - self.collect() - - while self.running: - # Reset the timer. - self.timer.reset() - - # Wait until the timer has successfully elapsed. - if self.timer.wait(): - delay = self.collect() - self.timer.reset(delay) - - self.log.debug(_("%s plugin has stopped") % self.name) - - def shutdown(self): - self.log.debug(_("Received shutdown signal.")) - self.running = False + delay = time.time() - time_start - # Kill any running timers. - if self.timer: - self.timer.cancel() + # Log some warning when a collect method takes too long to return some data + if delay >= 60: + self.log.warning(_("A worker thread was stalled for %.4fs") % delay) def get_object(self, id): for object in self.objects: -- 2.39.2