]> git.ipfire.org Git - collecty.git/blobdiff - src/collecty/daemon.py
daemon: Drop our own scheduler and use sched
[collecty.git] / src / collecty / daemon.py
index 644ca3737a12ab2283e3a438d11c1ee15bc4acef..da3fdb071c8e2ba487d73a347a65d5050ac753c6 100644 (file)
 #                                                                             #
 ###############################################################################
 
-import datetime
 import logging
-import multiprocessing
 import os
 import queue
 import rrdtool
+import sched
 import signal
-import threading
 import time
 
 from . import bus
@@ -40,9 +38,7 @@ log = logging.getLogger("collecty")
 
 class Collecty(object):
        # The default interval, when all data is written to disk.
-       SUBMIT_INTERVAL = 300
-
-       HEARTBEAT = 1
+       COMMIT_INTERVAL = 300
 
        def __init__(self, debug=False):
                self.debug = debug
@@ -57,18 +53,13 @@ class Collecty(object):
 
                self.plugins = []
 
-               # Indicates whether this process should be running or not.
-               self.running = True
+               # Create the scheduler
+               self.scheduler = sched.scheduler()
+               self._schedule_commit()
 
                # The write queue holds all collected pieces of data which
                # will be written to disk later.
-               self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
-
-               # Create worker threads
-               self.worker_threads = self.create_worker_threads()
-
-               self._timer_queue = queue.PriorityQueue()
-               self._worker_queue = queue.Queue()
+               self.write_queue = WriteQueue(self)
 
                # Create a thread that connects to dbus and processes requests we
                # get from there.
@@ -94,12 +85,59 @@ class Collecty(object):
 
                self.plugins.append(plugin)
 
+               # Schedule the plugin to collect
+               self._schedule_plugin(plugin)
+
        @property
        def templates(self):
                for plugin in self.plugins:
                        for template in plugin.templates:
                                yield template
 
+       def _schedule_plugin(self, plugin):
+               """
+                       Schedules a collection event for the given plugin
+               """
+               log.debug("Scheduling plugin %s for executing in %ss" % (plugin, plugin.interval))
+
+               self.scheduler.enter(
+                       plugin.interval, plugin.priority, self._collect, (plugin,),
+               )
+
+       def _schedule_commit(self):
+               log.debug("Scheduling commit in %ss" % self.COMMIT_INTERVAL)
+
+               self.scheduler.enter(
+                       self.COMMIT_INTERVAL, -1, self._commit,
+               )
+
+       def _collect(self, plugin, **kwargs):
+               """
+                       Called for each plugin when it is time to collect some data
+               """
+               log.debug("Collection started for %s" % plugin)
+
+               # Add the next collection event to the scheduler
+               self._schedule_plugin(plugin)
+
+               # Run collection
+               try:
+                       plugin.collect()
+
+               except Exception as e:
+                       log.error("Unhandled exception in %s" % plugin, exc_info=True)
+                       return
+
+       def _commit(self):
+               """
+                       Called when all data should be committed to disk
+               """
+               # Schedule the next commit
+               self._schedule_commit()
+
+               # Write everything in the queue
+               self.write_queue.commit()
+
        def run(self):
                # Register signal handlers.
                self.register_signal_handler()
@@ -112,58 +150,22 @@ class Collecty(object):
                # Start the bus
                self.bus.start()
 
-               # Initialise the timer queue
-               self.initialise_timer_queue()
-
-               # Start worker threads
-               for w in self.worker_threads:
-                       w.start()
-
-               # Run the write queue thread
-               self.write_queue.start()
-
-               # Regularly submit all data to disk.
-               while self.running:
-                       try:
-                               # Try processing one event from the queue. If that succeeded
-                               # we will retry immediately.
-                               if self.process_timer_queue():
-                                       continue
-
-                               # Otherwise we will sleep for a bit
-                               time.sleep(self.HEARTBEAT)
-
-                               # Log warnings if the worker queue is filling up
-                               queue_size = self._worker_queue.qsize()
-                               if queue_size >= 5:
-                                       log.warning(_("Worker queue is filling up with %s events") % queue_size)
-
-                       except KeyboardInterrupt:
-                               self.shutdown()
-                               break
-
-               # Wait until all worker threads are finished
-               for w in self.worker_threads:
-                       w.join()
+               # Run the scheduler
+               try:
+                       self.scheduler.run()
+               except KeyboardInterrupt:
+                       pass
 
                # Stop the bus thread
                self.bus.shutdown()
 
                # Write all collected data to disk before ending the main thread
-               self.write_queue.shutdown()
+               self.write_queue.commit()
 
                log.debug(_("Main thread exited"))
 
        def shutdown(self):
-               if not self.running:
-                       return
-
                log.info(_("Received shutdown signal"))
-               self.running = False
-
-               # Propagating shutdown to all threads.
-               for w in self.worker_threads:
-                       w.shutdown()
 
        def register_signal_handler(self):
                for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
@@ -210,124 +212,17 @@ class Collecty(object):
 
                return plugin.last_update(*args, **kwargs)
 
-       def create_worker_threads(self, num=None):
-               """
-                       Creates a number of worker threads
-               """
-               # If no number of threads is given, we will create as many as we have
-               # active processor cores but never less than two.
-               if num is None:
-                       num = max(multiprocessing.cpu_count(), 2)
-
-               worker_threads = []
-
-               for id in range(num):
-                       worker_thread = WorkerThread(self, id)
-                       worker_threads.append(worker_thread)
-
-               return worker_threads
-
-       def initialise_timer_queue(self):
-               for p in self.plugins:
-                       timer = PluginTimer(p)
-
-                       self._timer_queue.put(timer)
-
-       def process_timer_queue(self):
-               # Take the item from the timer queue that is to be due first
-               timer = self._timer_queue.get()
-
-               try:
-                       # If the timer event is to be executed, we will put the plugin
-                       # into the worker queue and reset the timer
-                       if timer.is_due():
-                               self._worker_queue.put(timer.plugin)
-                               timer.reset_deadline()
-
-                               return timer
-               finally:
-                       # Put the timer back into the timer queue.
-                       self._timer_queue.put(timer)
-
-
-class WorkerThread(threading.Thread):
-       HEARTBEAT = 2.5
-
-       def __init__(self, collecty, id):
-               threading.Thread.__init__(self)
-               self.daemon = True
-
-               self.log = logging.getLogger("collecty.worker")
-
-               self.collecty = collecty
-               self.id = id
-
-               self.log.debug(_("Worker thread %s has been initialised") % self.id)
-
-       @property
-       def queue(self):
-               """
-                       The queue this thread is getting events from
-               """
-               return self.collecty._worker_queue
-
-       def run(self):
-               self.log.debug(_("Worker thread %s has been started") % self.id)
-               self.running = True
-
-               while self.running:
-                       try:
-                               plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
-
-                       # If the queue has been empty we just retry
-                       except queue.Empty:
-                               continue
-
-                       # Execute the collect operation for this plugin
-                       plugin.collect()
-
-               self.log.debug(_("Worker thread %s has been terminated") % self.id)
-
-       def shutdown(self):
-               self.running = False
-
-
-class WriteQueue(threading.Thread):
-       def __init__(self, collecty, submit_interval):
-               threading.Thread.__init__(self)
-               self.daemon = True
 
+class WriteQueue(object):
+       def __init__(self, collecty):
                self.collecty = collecty
 
                self.log = logging.getLogger("collecty.queue")
 
-               self.timer = plugins.Timer(submit_interval)
                self._queue = queue.PriorityQueue()
 
                self.log.debug(_("Initialised write queue"))
 
-       def run(self):
-               self.log.debug(_("Write queue process started"))
-               self.running = True
-
-               while self.running:
-                       # Reset the timer.
-                       self.timer.reset()
-
-                       # Wait until the timer has successfully elapsed.
-                       if self.timer.wait():
-                               self.commit()
-
-               self.commit()
-               self.log.debug(_("Write queue process stopped"))
-
-       def shutdown(self):
-               self.running = False
-               self.timer.cancel()
-
-               # Wait until all data has been written.
-               self.join()
-
        def add(self, object, time, data):
                result = QueueObject(object.file, time, data)
                self._queue.put(result)
@@ -418,23 +313,3 @@ class QueueObject(object):
 
        def __lt__(self, other):
                return self.time < other.time
-
-
-class PluginTimer(object):
-       def __init__(self, plugin):
-               self.plugin = plugin
-
-               self.deadline = datetime.datetime.utcnow()
-
-       def __repr__(self):
-               return "<%s %s>" % (self.__class__.__name__, self.deadline)
-
-       def __lt__(self, other):
-               return self.deadline < other.deadline
-
-       def reset_deadline(self):
-               self.deadline = datetime.datetime.utcnow() \
-                       + datetime.timedelta(seconds=self.plugin.interval)
-
-       def is_due(self):
-               return datetime.datetime.utcnow() >= self.deadline