]> git.ipfire.org Git - oddments/collecty.git/commitdiff
Implement worker thread concept
authorMichael Tremer <michael.tremer@ipfire.org>
Tue, 26 May 2015 17:18:38 +0000 (17:18 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Tue, 26 May 2015 17:18:38 +0000 (17:18 +0000)
The threading model has been move from the plugins into
a worker thread model that has a constant number of threads
and should therefore waste even less resources.

src/collecty/daemon.py
src/collecty/plugins/base.py

index 236e1c93fb8ee3f548d43542a6e40481b1888389..bb46971493368d3d18789d118555870ac14c6268 100644 (file)
@@ -20,6 +20,8 @@
 ###############################################################################
 
 import Queue as queue
+import datetime
+import multiprocessing
 import rrdtool
 import signal
 import threading
@@ -38,7 +40,7 @@ class Collecty(object):
        # The default interval, when all data is written to disk.
        SUBMIT_INTERVAL = 300
 
-       HEARTBEAT = 5
+       HEARTBEAT = 1
 
        def __init__(self, debug=False):
                self.debug = debug
@@ -56,6 +58,12 @@ class Collecty(object):
                # will be written to disk later.
                self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
 
+               # Create worker threads
+               self.worker_threads = self.create_worker_threads()
+
+               self._timer_queue = queue.PriorityQueue()
+               self._worker_queue = queue.Queue()
+
                # Create a thread that connects to dbus and processes requests we
                # get from there.
                self.bus = bus.Bus(self)
@@ -91,9 +99,12 @@ class Collecty(object):
                # Start the bus
                self.bus.start()
 
-               # Start all data source threads.
-               for p in self.plugins:
-                       p.start()
+               # Initialise the timer queue
+               self.initialise_timer_queue()
+
+               # Start worker threads
+               for w in self.worker_threads:
+                       w.start()
 
                # Run the write queue thread
                self.write_queue.start()
@@ -101,14 +112,26 @@ class Collecty(object):
                # Regularly submit all data to disk.
                while self.running:
                        try:
+                               # Try processing one event from the queue. If that succeeded
+                               # we will retry immediately.
+                               if self.process_timer_queue():
+                                       continue
+
+                               # Otherwise we will sleep for a bit
                                time.sleep(self.HEARTBEAT)
+
+                               # Log warnings if the worker queue is filling up
+                               queue_size = self._worker_queue.qsize()
+                               if queue_size >= 5:
+                                       log.warning(_("Worker queue is filling up with %s events") % queue_size)
+
                        except KeyboardInterrupt:
                                self.shutdown()
                                break
 
-               # Wait until all plugins are finished.
-               for p in self.plugins:
-                       p.join()
+               # Wait until all worker threads are finished
+               for w in self.worker_threads:
+                       w.join()
 
                # Stop the bus thread
                self.bus.shutdown()
@@ -126,8 +149,8 @@ class Collecty(object):
                self.running = False
 
                # Propagating shutdown to all threads.
-               for p in self.plugins:
-                       p.shutdown()
+               for w in self.worker_threads:
+                       w.shutdown()
 
        def register_signal_handler(self):
                for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
@@ -160,6 +183,91 @@ class Collecty(object):
 
                return plugin.generate_graph(template_name, *args, **kwargs)
 
+       def create_worker_threads(self, num=None):
+               """
+                       Creates a number of worker threads
+               """
+               # If no number of threads is given, we will create as many as we have
+               # active processor cores but never less than four.
+               if num is None:
+                       num = max(multiprocessing.cpu_count(), 4)
+
+               worker_threads = []
+
+               for id in range(num):
+                       worker_thread = WorkerThread(self, id)
+                       worker_threads.append(worker_thread)
+
+               return worker_threads
+
+       def initialise_timer_queue(self):
+               for p in self.plugins:
+                       timer = PluginTimer(p)
+
+                       self._timer_queue.put(timer)
+
+       def process_timer_queue(self):
+               # Take the item from the timer queue that is to be due first
+               timer = self._timer_queue.get()
+
+               try:
+                       # If the timer event is to be executed, we will put the plugin
+                       # into the worker queue and reset the timer
+                       if timer.is_due():
+                               self._worker_queue.put(timer.plugin)
+                               timer.reset_deadline()
+
+                               return timer
+               finally:
+                       # Put the timer back into the timer queue.
+                       self._timer_queue.put(timer)
+
+
+class WorkerThread(threading.Thread):
+       HEARTBEAT = 2.5
+
+       def __init__(self, collecty, id):
+               threading.Thread.__init__(self)
+               self.daemon = True
+
+               self.log = logging.getLogger("collecty.worker")
+               self.log.propagate = 1
+
+               self.collecty = collecty
+               self.id = id
+
+               self.log.debug(_("Worker thread %s has been initialised") % self.id)
+
+       @property
+       def queue(self):
+               """
+                       The queue this thread is getting events from
+               """
+               return self.collecty._worker_queue
+
+       def run(self):
+               self.log.debug(_("Worker thread %s has been started") % self.id)
+               self.running = True
+
+               while self.running:
+                       try:
+                               plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
+
+                       # If the queue has been empty we just retry
+                       except queue.Empty:
+                               continue
+
+                       # Execute the collect operation for this plugin
+                       plugin.collect()
+
+               self.log.debug(_("Worker thread %s has been terminated") % self.id)
+
+       def shutdown(self):
+               self.running = False
+
+               # Wait until all data has been written.
+               self.join()
+
 
 class WriteQueue(threading.Thread):
        def __init__(self, collecty, submit_interval):
@@ -261,3 +369,23 @@ class QueueObject(object):
 
        def __cmp__(self, other):
                return cmp(self.time, other.time)
+
+
+class PluginTimer(object):
+       def __init__(self, plugin):
+               self.plugin = plugin
+
+               self.deadline = datetime.datetime.utcnow()
+
+       def __repr__(self):
+               return "<%s %s>" % (self.__class__.__name__, self.deadline)
+
+       def __cmp__(self, other):
+               return cmp(self.deadline, other.deadline)
+
+       def reset_deadline(self):
+               self.deadline = datetime.datetime.utcnow() \
+                       + datetime.timedelta(seconds=self.plugin.interval)
+
+       def is_due(self):
+               return datetime.datetime.utcnow() >= self.deadline
index 61a4682564e9855ac745ed410fea33562e234a84..8d768ae8b7e97b86409e32e811aed9c4fd5c4f4f 100644 (file)
@@ -73,7 +73,7 @@ class Timer(object):
                return self.elapsed > self.timeout
 
 
-class Plugin(threading.Thread):
+class Plugin(object):
        # The name of this plugin.
        name = None
 
@@ -104,9 +104,6 @@ class Plugin(threading.Thread):
                        _plugins[plugin.name] = plugin
 
        def __init__(self, collecty, **kwargs):
-               threading.Thread.__init__(self, name=self.description)
-               self.daemon = True
-
                self.collecty = collecty
 
                # Check if this plugin was configured correctly.
@@ -122,10 +119,6 @@ class Plugin(threading.Thread):
                # Run some custom initialization.
                self.init(**kwargs)
 
-               # Keepalive options
-               self.running = True
-               self.timer = Timer(self.interval)
-
                self.log.debug(_("Successfully initialized %s") % self.__class__.__name__)
 
        @property
@@ -173,32 +166,11 @@ class Plugin(threading.Thread):
                        self.collecty.write_queue.add(o, now, result)
 
                # Returns the time this function took to complete.
-               return (time.time() - time_start)
-
-       def run(self):
-               self.log.debug(_("%s plugin has started") % self.name)
-
-               # Initially collect everything
-               self.collect()
-
-               while self.running:
-                       # Reset the timer.
-                       self.timer.reset()
-
-                       # Wait until the timer has successfully elapsed.
-                       if self.timer.wait():
-                               delay = self.collect()
-                               self.timer.reset(delay)
-
-               self.log.debug(_("%s plugin has stopped") % self.name)
-
-       def shutdown(self):
-               self.log.debug(_("Received shutdown signal."))
-               self.running = False
+               delay = time.time() - time_start
 
-               # Kill any running timers.
-               if self.timer:
-                       self.timer.cancel()
+               # Log some warning when a collect method takes too long to return some data
+               if delay >= 60:
+                       self.log.warning(_("A worker thread was stalled for %.4fs") % delay)
 
        def get_object(self, id):
                for object in self.objects: