# #
###############################################################################
-import datetime
import logging
-import multiprocessing
import os
import queue
import rrdtool
+import sched
import signal
-import threading
import time
from . import bus
class Collecty(object):
# The default interval, when all data is written to disk.
- SUBMIT_INTERVAL = 300
-
- HEARTBEAT = 1
+ COMMIT_INTERVAL = 300
def __init__(self, debug=False):
self.debug = debug
self.plugins = []
- # Indicates whether this process should be running or not.
- self.running = True
+ # Create the scheduler
+ self.scheduler = sched.scheduler()
+ self._schedule_commit()
# The write queue holds all collected pieces of data which
# will be written to disk later.
- self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
-
- # Create worker threads
- self.worker_threads = self.create_worker_threads()
-
- self._timer_queue = queue.PriorityQueue()
- self._worker_queue = queue.Queue()
+ self.write_queue = WriteQueue(self)
# Create a thread that connects to dbus and processes requests we
# get from there.
self.plugins.append(plugin)
+ # Schedule the plugin to collect
+ self._schedule_plugin(plugin)
+
@property
def templates(self):
for plugin in self.plugins:
for template in plugin.templates:
yield template
+ def _schedule_plugin(self, plugin):
+ """
+ Schedules a collection event for the given plugin
+ """
+ log.debug("Scheduling plugin %s for executing in %ss" % (plugin, plugin.interval))
+
+ self.scheduler.enter(
+ plugin.interval, plugin.priority, self._collect, (plugin,),
+ )
+
+ def _schedule_commit(self):
+ log.debug("Scheduling commit in %ss" % self.COMMIT_INTERVAL)
+
+ self.scheduler.enter(
+ self.COMMIT_INTERVAL, -1, self._commit,
+ )
+
+ def _collect(self, plugin, **kwargs):
+ """
+ Called for each plugin when it is time to collect some data
+ """
+ log.debug("Collection started for %s" % plugin)
+
+ # Add the next collection event to the scheduler
+ self._schedule_plugin(plugin)
+
+ # Run collection
+ try:
+ plugin.collect()
+
+ except Exception as e:
+ log.error("Unhandled exception in %s" % plugin, exc_info=True)
+ return
+
+ def _commit(self):
+ """
+ Called when all data should be committed to disk
+ """
+ # Schedule the next commit
+ self._schedule_commit()
+
+ # Write everything in the queue
+ self.write_queue.commit()
+
def run(self):
# Register signal handlers.
self.register_signal_handler()
# Start the bus
self.bus.start()
- # Initialise the timer queue
- self.initialise_timer_queue()
-
- # Start worker threads
- for w in self.worker_threads:
- w.start()
-
- # Run the write queue thread
- self.write_queue.start()
-
- # Regularly submit all data to disk.
- while self.running:
- try:
- # Try processing one event from the queue. If that succeeded
- # we will retry immediately.
- if self.process_timer_queue():
- continue
-
- # Otherwise we will sleep for a bit
- time.sleep(self.HEARTBEAT)
-
- # Log warnings if the worker queue is filling up
- queue_size = self._worker_queue.qsize()
- if queue_size >= 5:
- log.warning(_("Worker queue is filling up with %s events") % queue_size)
-
- except KeyboardInterrupt:
- self.shutdown()
- break
-
- # Wait until all worker threads are finished
- for w in self.worker_threads:
- w.join()
+ # Run the scheduler
+ try:
+ self.scheduler.run()
+ except KeyboardInterrupt:
+ pass
# Stop the bus thread
self.bus.shutdown()
# Write all collected data to disk before ending the main thread
- self.write_queue.shutdown()
+ self.write_queue.commit()
log.debug(_("Main thread exited"))
def shutdown(self):
- if not self.running:
- return
-
log.info(_("Received shutdown signal"))
- self.running = False
-
- # Propagating shutdown to all threads.
- for w in self.worker_threads:
- w.shutdown()
def register_signal_handler(self):
for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
return plugin.last_update(*args, **kwargs)
- def create_worker_threads(self, num=None):
- """
- Creates a number of worker threads
- """
- # If no number of threads is given, we will create as many as we have
- # active processor cores but never less than two.
- if num is None:
- num = max(multiprocessing.cpu_count(), 2)
-
- worker_threads = []
-
- for id in range(num):
- worker_thread = WorkerThread(self, id)
- worker_threads.append(worker_thread)
-
- return worker_threads
-
- def initialise_timer_queue(self):
- for p in self.plugins:
- timer = PluginTimer(p)
-
- self._timer_queue.put(timer)
-
- def process_timer_queue(self):
- # Take the item from the timer queue that is to be due first
- timer = self._timer_queue.get()
-
- try:
- # If the timer event is to be executed, we will put the plugin
- # into the worker queue and reset the timer
- if timer.is_due():
- self._worker_queue.put(timer.plugin)
- timer.reset_deadline()
-
- return timer
- finally:
- # Put the timer back into the timer queue.
- self._timer_queue.put(timer)
-
-
-class WorkerThread(threading.Thread):
- HEARTBEAT = 2.5
-
- def __init__(self, collecty, id):
- threading.Thread.__init__(self)
- self.daemon = True
-
- self.log = logging.getLogger("collecty.worker")
-
- self.collecty = collecty
- self.id = id
-
- self.log.debug(_("Worker thread %s has been initialised") % self.id)
-
- @property
- def queue(self):
- """
- The queue this thread is getting events from
- """
- return self.collecty._worker_queue
-
- def run(self):
- self.log.debug(_("Worker thread %s has been started") % self.id)
- self.running = True
-
- while self.running:
- try:
- plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
-
- # If the queue has been empty we just retry
- except queue.Empty:
- continue
-
- # Execute the collect operation for this plugin
- plugin.collect()
-
- self.log.debug(_("Worker thread %s has been terminated") % self.id)
-
- def shutdown(self):
- self.running = False
-
-
-class WriteQueue(threading.Thread):
- def __init__(self, collecty, submit_interval):
- threading.Thread.__init__(self)
- self.daemon = True
+class WriteQueue(object):
+ def __init__(self, collecty):
self.collecty = collecty
self.log = logging.getLogger("collecty.queue")
- self.timer = plugins.Timer(submit_interval)
self._queue = queue.PriorityQueue()
self.log.debug(_("Initialised write queue"))
- def run(self):
- self.log.debug(_("Write queue process started"))
- self.running = True
-
- while self.running:
- # Reset the timer.
- self.timer.reset()
-
- # Wait until the timer has successfully elapsed.
- if self.timer.wait():
- self.commit()
-
- self.commit()
- self.log.debug(_("Write queue process stopped"))
-
- def shutdown(self):
- self.running = False
- self.timer.cancel()
-
- # Wait until all data has been written.
- self.join()
-
def add(self, object, time, data):
result = QueueObject(object.file, time, data)
self._queue.put(result)
def __lt__(self, other):
return self.time < other.time
-
-
-class PluginTimer(object):
- def __init__(self, plugin):
- self.plugin = plugin
-
- self.deadline = datetime.datetime.utcnow()
-
- def __repr__(self):
- return "<%s %s>" % (self.__class__.__name__, self.deadline)
-
- def __lt__(self, other):
- return self.deadline < other.deadline
-
- def reset_deadline(self):
- self.deadline = datetime.datetime.utcnow() \
- + datetime.timedelta(seconds=self.plugin.interval)
-
- def is_due(self):
- return datetime.datetime.utcnow() >= self.deadline