]> git.ipfire.org Git - oddments/collecty.git/commitdiff
daemon: Refactor WriteQueue without Python's queue module
authorMichael Tremer <michael.tremer@ipfire.org>
Mon, 28 Sep 2020 11:39:07 +0000 (11:39 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Mon, 28 Sep 2020 11:39:07 +0000 (11:39 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/collecty/daemon.py

index 8cd7153f44f884845943f15ab96b8fc405d09292..fdb0a723e8ed615dfb929a3fcce16d148f94a8b5 100644 (file)
@@ -21,7 +21,6 @@
 
 import logging
 import os
-import queue
 import rrdtool
 import sched
 import signal
@@ -240,11 +239,12 @@ class WriteQueue(object):
 
                self.log = logging.getLogger("collecty.queue")
 
+               # Store data here
+               self._data = []
+
                # Lock to make this class thread-safe
                self._lock = threading.Lock()
 
-               self._queue = queue.PriorityQueue()
-
                self.log.debug(_("Initialised write queue"))
 
        def submit(self, object, data):
@@ -254,7 +254,7 @@ class WriteQueue(object):
                data = QueueObject(object.file, data)
 
                with self._lock:
-                       self._queue.put(data)
+                       self._data.append(data)
 
                return data
 
@@ -262,31 +262,33 @@ class WriteQueue(object):
                """
                        Flushes the read data to disk.
                """
-               # There is nothing to do if the queue is empty
-               if self._queue.empty():
-                       self.log.debug(_("No data to commit"))
-                       return
+               self.log.debug(_("Committing data to disk..."))
 
                time_start = time.time()
 
-               self.log.debug(_("Submitting data to the databases..."))
-
-               # Get all objects from the queue and group them by the RRD file
-               # to commit them all at once
-               results = {}
-
+               # There is nothing to do if the queue is empty
                with self._lock:
-                       while not self._queue.empty():
-                               result = self._queue.get()
+                       if not self._data:
+                               self.log.debug(_("No data to commit"))
+                               return
+
+                       # Get all objects from the queue and group them by the RRD file
+                       # to commit them all at once
+                       results = {}
 
+                       # Group all datapoints by file
+                       for data in self._data:
                                try:
-                                       results[result.file].append(result)
+                                       results[data.file].append(data)
                                except KeyError:
-                                       results[result.file] = [result]
+                                       results[data.file] = [data]
+
+                       # Clear the queue
+                       self._data.clear()
 
                # Write the collected data to disk
-               for filename, results in list(results.items()):
-                       self._commit_file(filename, results)
+               for filename in sorted(results):
+                       self._commit_file(filename, results[filename])
 
                duration = time.time() - time_start
                self.log.debug(_("Emptied write queue in %.2fs") % duration)
@@ -295,8 +297,11 @@ class WriteQueue(object):
                self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
                        % { "counter" : len(results), "filename" : filename })
 
-               for result in results:
-                       self.log.debug("  %s: %s" % (result.time, result.data))
+               # Sort data before submitting it to rrdtool
+               results.sort()
+
+               for data in results:
+                       self.log.debug("  %s" % data)
 
                try:
                        rrdtool.update(filename, *["%s" % r for r in results])
@@ -319,17 +324,14 @@ class WriteQueue(object):
                # transaction is kept. Everything else will be put back into the
                # queue.
                with self._lock:
-                       while not self._queue.empty():
-                               result = self._queue.get()
-
-                               if result.file == filename:
-                                       results.append(result)
+                       for data in self._data:
+                               if data.file == filename:
+                                       results.append(data)
                                else:
-                                       others.append(result)
+                                       others.append(data)
 
                        # Put back all items that did not match
-                       for result in others:
-                               self._queue.put(result)
+                       self._data = others
 
                # Write everything else to disk
                if results: