From: Michael Tremer Date: Mon, 28 Sep 2020 11:39:07 +0000 (+0000) Subject: daemon: Refactor WriteQueue without Python's queue module X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=1d17077f923a9c035a84274562436de220b1cd89;p=collecty.git daemon: Refactor WriteQueue without Python's queue module Signed-off-by: Michael Tremer --- diff --git a/src/collecty/daemon.py b/src/collecty/daemon.py index 8cd7153..fdb0a72 100644 --- a/src/collecty/daemon.py +++ b/src/collecty/daemon.py @@ -21,7 +21,6 @@ import logging import os -import queue import rrdtool import sched import signal @@ -240,11 +239,12 @@ class WriteQueue(object): self.log = logging.getLogger("collecty.queue") + # Store data here + self._data = [] + # Lock to make this class thread-safe self._lock = threading.Lock() - self._queue = queue.PriorityQueue() - self.log.debug(_("Initialised write queue")) def submit(self, object, data): @@ -254,7 +254,7 @@ class WriteQueue(object): data = QueueObject(object.file, data) with self._lock: - self._queue.put(data) + self._data.append(data) return data @@ -262,31 +262,33 @@ class WriteQueue(object): """ Flushes the read data to disk. """ - # There is nothing to do if the queue is empty - if self._queue.empty(): - self.log.debug(_("No data to commit")) - return + self.log.debug(_("Committing data to disk...")) time_start = time.time() - self.log.debug(_("Submitting data to the databases...")) - - # Get all objects from the queue and group them by the RRD file - # to commit them all at once - results = {} - + # There is nothing to do if the queue is empty with self._lock: - while not self._queue.empty(): - result = self._queue.get() + if not self._data: + self.log.debug(_("No data to commit")) + return + + # Get all objects from the queue and group them by the RRD file + # to commit them all at once + results = {} + # Group all datapoints by file + for data in self._data: try: - results[result.file].append(result) + results[data.file].append(data) except KeyError: - results[result.file] = [result] + results[data.file] = [data] + + # Clear the queue + self._data.clear() # Write the collected data to disk - for filename, results in list(results.items()): - self._commit_file(filename, results) + for filename in sorted(results): + self._commit_file(filename, results[filename]) duration = time.time() - time_start self.log.debug(_("Emptied write queue in %.2fs") % duration) @@ -295,8 +297,11 @@ class WriteQueue(object): self.log.debug(_("Committing %(counter)s entries to %(filename)s") \ % { "counter" : len(results), "filename" : filename }) - for result in results: - self.log.debug(" %s: %s" % (result.time, result.data)) + # Sort data before submitting it to rrdtool + results.sort() + + for data in results: + self.log.debug(" %s" % data) try: rrdtool.update(filename, *["%s" % r for r in results]) @@ -319,17 +324,14 @@ class WriteQueue(object): # transaction is kept. Everything else will be put back into the # queue. with self._lock: - while not self._queue.empty(): - result = self._queue.get() - - if result.file == filename: - results.append(result) + for data in self._data: + if data.file == filename: + results.append(data) else: - others.append(result) + others.append(data) # Put back all items that did not match - for result in others: - self._queue.put(result) + self._data = others # Write everything else to disk if results: