+ def create_worker_threads(self, num=None):
+ """
+ Creates a number of worker threads
+ """
+ # If no number of threads is given, we will create as many as we have
+ # active processor cores but never less than four.
+ if num is None:
+ num = max(multiprocessing.cpu_count(), 4)
+
+ worker_threads = []
+
+ for id in range(num):
+ worker_thread = WorkerThread(self, id)
+ worker_threads.append(worker_thread)
+
+ return worker_threads
+
+ def initialise_timer_queue(self):
+ for p in self.plugins:
+ timer = PluginTimer(p)
+
+ self._timer_queue.put(timer)
+
+ def process_timer_queue(self):
+ # Take the item from the timer queue that is to be due first
+ timer = self._timer_queue.get()
+
+ try:
+ # If the timer event is to be executed, we will put the plugin
+ # into the worker queue and reset the timer
+ if timer.is_due():
+ self._worker_queue.put(timer.plugin)
+ timer.reset_deadline()
+
+ return timer
+ finally:
+ # Put the timer back into the timer queue.
+ self._timer_queue.put(timer)
+
+
+class WorkerThread(threading.Thread):
+ HEARTBEAT = 2.5
+
+ def __init__(self, collecty, id):
+ threading.Thread.__init__(self)
+ self.daemon = True
+
+ self.log = logging.getLogger("collecty.worker")
+ self.log.propagate = 1
+
+ self.collecty = collecty
+ self.id = id
+
+ self.log.debug(_("Worker thread %s has been initialised") % self.id)
+
+ @property
+ def queue(self):
+ """
+ The queue this thread is getting events from
+ """
+ return self.collecty._worker_queue
+
+ def run(self):
+ self.log.debug(_("Worker thread %s has been started") % self.id)
+ self.running = True
+
+ while self.running:
+ try:
+ plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
+
+ # If the queue has been empty we just retry
+ except queue.Empty:
+ continue
+
+ # Execute the collect operation for this plugin
+ plugin.collect()
+
+ self.log.debug(_("Worker thread %s has been terminated") % self.id)
+
+ def shutdown(self):
+ self.running = False
+
+ # Wait until all data has been written.
+ self.join()
+