From 6e603f14125df4c9eb07ac460aae6baf78902e70 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Sun, 20 Sep 2020 13:54:54 +0000 Subject: [PATCH] daemon: Drop our own scheduler and use sched Our own implementation was trying to be very accurate which comes with a large overhead and is largely unnecessary. This patch removes the threading approach which kept multiple timers and replaces it with one simple deadline scheduler. Plugins will have to ensure that they are not blocking the collection. Signed-off-by: Michael Tremer --- src/collecty/daemon.py | 247 +++++++++-------------------------- src/collecty/plugins/base.py | 3 + 2 files changed, 64 insertions(+), 186 deletions(-) diff --git a/src/collecty/daemon.py b/src/collecty/daemon.py index 644ca37..da3fdb0 100644 --- a/src/collecty/daemon.py +++ b/src/collecty/daemon.py @@ -19,14 +19,12 @@ # # ############################################################################### -import datetime import logging -import multiprocessing import os import queue import rrdtool +import sched import signal -import threading import time from . import bus @@ -40,9 +38,7 @@ log = logging.getLogger("collecty") class Collecty(object): # The default interval, when all data is written to disk. - SUBMIT_INTERVAL = 300 - - HEARTBEAT = 1 + COMMIT_INTERVAL = 300 def __init__(self, debug=False): self.debug = debug @@ -57,18 +53,13 @@ class Collecty(object): self.plugins = [] - # Indicates whether this process should be running or not. - self.running = True + # Create the scheduler + self.scheduler = sched.scheduler() + self._schedule_commit() # The write queue holds all collected pieces of data which # will be written to disk later. - self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL) - - # Create worker threads - self.worker_threads = self.create_worker_threads() - - self._timer_queue = queue.PriorityQueue() - self._worker_queue = queue.Queue() + self.write_queue = WriteQueue(self) # Create a thread that connects to dbus and processes requests we # get from there. @@ -94,12 +85,59 @@ class Collecty(object): self.plugins.append(plugin) + # Schedule the plugin to collect + self._schedule_plugin(plugin) + @property def templates(self): for plugin in self.plugins: for template in plugin.templates: yield template + def _schedule_plugin(self, plugin): + """ + Schedules a collection event for the given plugin + """ + log.debug("Scheduling plugin %s for executing in %ss" % (plugin, plugin.interval)) + + self.scheduler.enter( + plugin.interval, plugin.priority, self._collect, (plugin,), + ) + + def _schedule_commit(self): + log.debug("Scheduling commit in %ss" % self.COMMIT_INTERVAL) + + self.scheduler.enter( + self.COMMIT_INTERVAL, -1, self._commit, + ) + + def _collect(self, plugin, **kwargs): + """ + Called for each plugin when it is time to collect some data + """ + log.debug("Collection started for %s" % plugin) + + # Add the next collection event to the scheduler + self._schedule_plugin(plugin) + + # Run collection + try: + plugin.collect() + + except Exception as e: + log.error("Unhandled exception in %s" % plugin, exc_info=True) + return + + def _commit(self): + """ + Called when all data should be committed to disk + """ + # Schedule the next commit + self._schedule_commit() + + # Write everything in the queue + self.write_queue.commit() + def run(self): # Register signal handlers. self.register_signal_handler() @@ -112,58 +150,22 @@ class Collecty(object): # Start the bus self.bus.start() - # Initialise the timer queue - self.initialise_timer_queue() - - # Start worker threads - for w in self.worker_threads: - w.start() - - # Run the write queue thread - self.write_queue.start() - - # Regularly submit all data to disk. - while self.running: - try: - # Try processing one event from the queue. If that succeeded - # we will retry immediately. - if self.process_timer_queue(): - continue - - # Otherwise we will sleep for a bit - time.sleep(self.HEARTBEAT) - - # Log warnings if the worker queue is filling up - queue_size = self._worker_queue.qsize() - if queue_size >= 5: - log.warning(_("Worker queue is filling up with %s events") % queue_size) - - except KeyboardInterrupt: - self.shutdown() - break - - # Wait until all worker threads are finished - for w in self.worker_threads: - w.join() + # Run the scheduler + try: + self.scheduler.run() + except KeyboardInterrupt: + pass # Stop the bus thread self.bus.shutdown() # Write all collected data to disk before ending the main thread - self.write_queue.shutdown() + self.write_queue.commit() log.debug(_("Main thread exited")) def shutdown(self): - if not self.running: - return - log.info(_("Received shutdown signal")) - self.running = False - - # Propagating shutdown to all threads. - for w in self.worker_threads: - w.shutdown() def register_signal_handler(self): for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1): @@ -210,124 +212,17 @@ class Collecty(object): return plugin.last_update(*args, **kwargs) - def create_worker_threads(self, num=None): - """ - Creates a number of worker threads - """ - # If no number of threads is given, we will create as many as we have - # active processor cores but never less than two. - if num is None: - num = max(multiprocessing.cpu_count(), 2) - - worker_threads = [] - - for id in range(num): - worker_thread = WorkerThread(self, id) - worker_threads.append(worker_thread) - - return worker_threads - - def initialise_timer_queue(self): - for p in self.plugins: - timer = PluginTimer(p) - - self._timer_queue.put(timer) - - def process_timer_queue(self): - # Take the item from the timer queue that is to be due first - timer = self._timer_queue.get() - - try: - # If the timer event is to be executed, we will put the plugin - # into the worker queue and reset the timer - if timer.is_due(): - self._worker_queue.put(timer.plugin) - timer.reset_deadline() - - return timer - finally: - # Put the timer back into the timer queue. - self._timer_queue.put(timer) - - -class WorkerThread(threading.Thread): - HEARTBEAT = 2.5 - - def __init__(self, collecty, id): - threading.Thread.__init__(self) - self.daemon = True - - self.log = logging.getLogger("collecty.worker") - - self.collecty = collecty - self.id = id - - self.log.debug(_("Worker thread %s has been initialised") % self.id) - - @property - def queue(self): - """ - The queue this thread is getting events from - """ - return self.collecty._worker_queue - - def run(self): - self.log.debug(_("Worker thread %s has been started") % self.id) - self.running = True - - while self.running: - try: - plugin = self.queue.get(block=True, timeout=self.HEARTBEAT) - - # If the queue has been empty we just retry - except queue.Empty: - continue - - # Execute the collect operation for this plugin - plugin.collect() - - self.log.debug(_("Worker thread %s has been terminated") % self.id) - - def shutdown(self): - self.running = False - - -class WriteQueue(threading.Thread): - def __init__(self, collecty, submit_interval): - threading.Thread.__init__(self) - self.daemon = True +class WriteQueue(object): + def __init__(self, collecty): self.collecty = collecty self.log = logging.getLogger("collecty.queue") - self.timer = plugins.Timer(submit_interval) self._queue = queue.PriorityQueue() self.log.debug(_("Initialised write queue")) - def run(self): - self.log.debug(_("Write queue process started")) - self.running = True - - while self.running: - # Reset the timer. - self.timer.reset() - - # Wait until the timer has successfully elapsed. - if self.timer.wait(): - self.commit() - - self.commit() - self.log.debug(_("Write queue process stopped")) - - def shutdown(self): - self.running = False - self.timer.cancel() - - # Wait until all data has been written. - self.join() - def add(self, object, time, data): result = QueueObject(object.file, time, data) self._queue.put(result) @@ -418,23 +313,3 @@ class QueueObject(object): def __lt__(self, other): return self.time < other.time - - -class PluginTimer(object): - def __init__(self, plugin): - self.plugin = plugin - - self.deadline = datetime.datetime.utcnow() - - def __repr__(self): - return "<%s %s>" % (self.__class__.__name__, self.deadline) - - def __lt__(self, other): - return self.deadline < other.deadline - - def reset_deadline(self): - self.deadline = datetime.datetime.utcnow() \ - + datetime.timedelta(seconds=self.plugin.interval) - - def is_due(self): - return datetime.datetime.utcnow() >= self.deadline diff --git a/src/collecty/plugins/base.py b/src/collecty/plugins/base.py index 4553132..249f395 100644 --- a/src/collecty/plugins/base.py +++ b/src/collecty/plugins/base.py @@ -141,6 +141,9 @@ class Plugin(object, metaclass=PluginRegistration): # The default interval for all plugins interval = 60 + # Priority + priority = 0 + def __init__(self, collecty, **kwargs): self.collecty = collecty -- 2.39.2