]> git.ipfire.org Git - oddments/collecty.git/commitdiff
daemon: Drop our own scheduler and use sched
authorMichael Tremer <michael.tremer@ipfire.org>
Sun, 20 Sep 2020 13:54:54 +0000 (13:54 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sun, 20 Sep 2020 13:54:54 +0000 (13:54 +0000)
Our own implementation was trying to be very accurate which
comes with a large overhead and is largely unnecessary.

This patch removes the threading approach which kept multiple
timers and replaces it with one simple deadline scheduler.

Plugins will have to ensure that they are not blocking the
collection.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/collecty/daemon.py
src/collecty/plugins/base.py

index 644ca3737a12ab2283e3a438d11c1ee15bc4acef..da3fdb071c8e2ba487d73a347a65d5050ac753c6 100644 (file)
 #                                                                             #
 ###############################################################################
 
-import datetime
 import logging
-import multiprocessing
 import os
 import queue
 import rrdtool
+import sched
 import signal
-import threading
 import time
 
 from . import bus
@@ -40,9 +38,7 @@ log = logging.getLogger("collecty")
 
 class Collecty(object):
        # The default interval, when all data is written to disk.
-       SUBMIT_INTERVAL = 300
-
-       HEARTBEAT = 1
+       COMMIT_INTERVAL = 300
 
        def __init__(self, debug=False):
                self.debug = debug
@@ -57,18 +53,13 @@ class Collecty(object):
 
                self.plugins = []
 
-               # Indicates whether this process should be running or not.
-               self.running = True
+               # Create the scheduler
+               self.scheduler = sched.scheduler()
+               self._schedule_commit()
 
                # The write queue holds all collected pieces of data which
                # will be written to disk later.
-               self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
-
-               # Create worker threads
-               self.worker_threads = self.create_worker_threads()
-
-               self._timer_queue = queue.PriorityQueue()
-               self._worker_queue = queue.Queue()
+               self.write_queue = WriteQueue(self)
 
                # Create a thread that connects to dbus and processes requests we
                # get from there.
@@ -94,12 +85,59 @@ class Collecty(object):
 
                self.plugins.append(plugin)
 
+               # Schedule the plugin to collect
+               self._schedule_plugin(plugin)
+
        @property
        def templates(self):
                for plugin in self.plugins:
                        for template in plugin.templates:
                                yield template
 
+       def _schedule_plugin(self, plugin):
+               """
+                       Schedules a collection event for the given plugin
+               """
+               log.debug("Scheduling plugin %s for executing in %ss" % (plugin, plugin.interval))
+
+               self.scheduler.enter(
+                       plugin.interval, plugin.priority, self._collect, (plugin,),
+               )
+
+       def _schedule_commit(self):
+               log.debug("Scheduling commit in %ss" % self.COMMIT_INTERVAL)
+
+               self.scheduler.enter(
+                       self.COMMIT_INTERVAL, -1, self._commit,
+               )
+
+       def _collect(self, plugin, **kwargs):
+               """
+                       Called for each plugin when it is time to collect some data
+               """
+               log.debug("Collection started for %s" % plugin)
+
+               # Add the next collection event to the scheduler
+               self._schedule_plugin(plugin)
+
+               # Run collection
+               try:
+                       plugin.collect()
+
+               except Exception as e:
+                       log.error("Unhandled exception in %s" % plugin, exc_info=True)
+                       return
+
+       def _commit(self):
+               """
+                       Called when all data should be committed to disk
+               """
+               # Schedule the next commit
+               self._schedule_commit()
+
+               # Write everything in the queue
+               self.write_queue.commit()
+
        def run(self):
                # Register signal handlers.
                self.register_signal_handler()
@@ -112,58 +150,22 @@ class Collecty(object):
                # Start the bus
                self.bus.start()
 
-               # Initialise the timer queue
-               self.initialise_timer_queue()
-
-               # Start worker threads
-               for w in self.worker_threads:
-                       w.start()
-
-               # Run the write queue thread
-               self.write_queue.start()
-
-               # Regularly submit all data to disk.
-               while self.running:
-                       try:
-                               # Try processing one event from the queue. If that succeeded
-                               # we will retry immediately.
-                               if self.process_timer_queue():
-                                       continue
-
-                               # Otherwise we will sleep for a bit
-                               time.sleep(self.HEARTBEAT)
-
-                               # Log warnings if the worker queue is filling up
-                               queue_size = self._worker_queue.qsize()
-                               if queue_size >= 5:
-                                       log.warning(_("Worker queue is filling up with %s events") % queue_size)
-
-                       except KeyboardInterrupt:
-                               self.shutdown()
-                               break
-
-               # Wait until all worker threads are finished
-               for w in self.worker_threads:
-                       w.join()
+               # Run the scheduler
+               try:
+                       self.scheduler.run()
+               except KeyboardInterrupt:
+                       pass
 
                # Stop the bus thread
                self.bus.shutdown()
 
                # Write all collected data to disk before ending the main thread
-               self.write_queue.shutdown()
+               self.write_queue.commit()
 
                log.debug(_("Main thread exited"))
 
        def shutdown(self):
-               if not self.running:
-                       return
-
                log.info(_("Received shutdown signal"))
-               self.running = False
-
-               # Propagating shutdown to all threads.
-               for w in self.worker_threads:
-                       w.shutdown()
 
        def register_signal_handler(self):
                for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
@@ -210,124 +212,17 @@ class Collecty(object):
 
                return plugin.last_update(*args, **kwargs)
 
-       def create_worker_threads(self, num=None):
-               """
-                       Creates a number of worker threads
-               """
-               # If no number of threads is given, we will create as many as we have
-               # active processor cores but never less than two.
-               if num is None:
-                       num = max(multiprocessing.cpu_count(), 2)
-
-               worker_threads = []
-
-               for id in range(num):
-                       worker_thread = WorkerThread(self, id)
-                       worker_threads.append(worker_thread)
-
-               return worker_threads
-
-       def initialise_timer_queue(self):
-               for p in self.plugins:
-                       timer = PluginTimer(p)
-
-                       self._timer_queue.put(timer)
-
-       def process_timer_queue(self):
-               # Take the item from the timer queue that is to be due first
-               timer = self._timer_queue.get()
-
-               try:
-                       # If the timer event is to be executed, we will put the plugin
-                       # into the worker queue and reset the timer
-                       if timer.is_due():
-                               self._worker_queue.put(timer.plugin)
-                               timer.reset_deadline()
-
-                               return timer
-               finally:
-                       # Put the timer back into the timer queue.
-                       self._timer_queue.put(timer)
-
-
-class WorkerThread(threading.Thread):
-       HEARTBEAT = 2.5
-
-       def __init__(self, collecty, id):
-               threading.Thread.__init__(self)
-               self.daemon = True
-
-               self.log = logging.getLogger("collecty.worker")
-
-               self.collecty = collecty
-               self.id = id
-
-               self.log.debug(_("Worker thread %s has been initialised") % self.id)
-
-       @property
-       def queue(self):
-               """
-                       The queue this thread is getting events from
-               """
-               return self.collecty._worker_queue
-
-       def run(self):
-               self.log.debug(_("Worker thread %s has been started") % self.id)
-               self.running = True
-
-               while self.running:
-                       try:
-                               plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
-
-                       # If the queue has been empty we just retry
-                       except queue.Empty:
-                               continue
-
-                       # Execute the collect operation for this plugin
-                       plugin.collect()
-
-               self.log.debug(_("Worker thread %s has been terminated") % self.id)
-
-       def shutdown(self):
-               self.running = False
-
-
-class WriteQueue(threading.Thread):
-       def __init__(self, collecty, submit_interval):
-               threading.Thread.__init__(self)
-               self.daemon = True
 
+class WriteQueue(object):
+       def __init__(self, collecty):
                self.collecty = collecty
 
                self.log = logging.getLogger("collecty.queue")
 
-               self.timer = plugins.Timer(submit_interval)
                self._queue = queue.PriorityQueue()
 
                self.log.debug(_("Initialised write queue"))
 
-       def run(self):
-               self.log.debug(_("Write queue process started"))
-               self.running = True
-
-               while self.running:
-                       # Reset the timer.
-                       self.timer.reset()
-
-                       # Wait until the timer has successfully elapsed.
-                       if self.timer.wait():
-                               self.commit()
-
-               self.commit()
-               self.log.debug(_("Write queue process stopped"))
-
-       def shutdown(self):
-               self.running = False
-               self.timer.cancel()
-
-               # Wait until all data has been written.
-               self.join()
-
        def add(self, object, time, data):
                result = QueueObject(object.file, time, data)
                self._queue.put(result)
@@ -418,23 +313,3 @@ class QueueObject(object):
 
        def __lt__(self, other):
                return self.time < other.time
-
-
-class PluginTimer(object):
-       def __init__(self, plugin):
-               self.plugin = plugin
-
-               self.deadline = datetime.datetime.utcnow()
-
-       def __repr__(self):
-               return "<%s %s>" % (self.__class__.__name__, self.deadline)
-
-       def __lt__(self, other):
-               return self.deadline < other.deadline
-
-       def reset_deadline(self):
-               self.deadline = datetime.datetime.utcnow() \
-                       + datetime.timedelta(seconds=self.plugin.interval)
-
-       def is_due(self):
-               return datetime.datetime.utcnow() >= self.deadline
index 4553132e64a597a6151eb7fc88d45000601fb6ea..249f3950c4ba006735558da001df988925343759 100644 (file)
@@ -141,6 +141,9 @@ class Plugin(object, metaclass=PluginRegistration):
        # The default interval for all plugins
        interval = 60
 
+       # Priority
+       priority = 0
+
        def __init__(self, collecty, **kwargs):
                self.collecty = collecty