]> git.ipfire.org Git - oddments/collecty.git/commitdiff
daemon: Make WriteQueue thread-safe
authorMichael Tremer <michael.tremer@ipfire.org>
Mon, 28 Sep 2020 11:18:04 +0000 (11:18 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Mon, 28 Sep 2020 11:18:04 +0000 (11:18 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/collecty/daemon.py

index 199e363963fab75954be753fa77dd84c64502700..4aadd13fbb6bae31b05f4e4499b038b24165f62e 100644 (file)
@@ -27,6 +27,7 @@ import sched
 import signal
 import tarfile
 import tempfile
+import threading
 import time
 
 from . import bus
@@ -239,13 +240,18 @@ class WriteQueue(object):
 
                self.log = logging.getLogger("collecty.queue")
 
+               # Lock to make this class thread-safe
+               self._lock = threading.Lock()
+
                self._queue = queue.PriorityQueue()
 
                self.log.debug(_("Initialised write queue"))
 
        def add(self, object, time, data):
                result = QueueObject(object.file, time, data)
-               self._queue.put(result)
+
+               with self._lock:
+                       self._queue.put(result)
 
        def commit(self):
                """
@@ -263,13 +269,15 @@ class WriteQueue(object):
                # Get all objects from the queue and group them by the RRD file
                # to commit them all at once
                results = {}
-               while not self._queue.empty():
-                       result = self._queue.get()
 
-                       try:
-                               results[result.file].append(result)
-                       except KeyError:
-                               results[result.file] = [result]
+               with self._lock:
+                       while not self._queue.empty():
+                               result = self._queue.get()
+
+                               try:
+                                       results[result.file].append(result)
+                               except KeyError:
+                                       results[result.file] = [result]
 
                # Write the collected data to disk
                for filename, results in list(results.items()):
@@ -305,17 +313,18 @@ class WriteQueue(object):
                # ready any items selectively. Everything that belongs to our
                # transaction is kept. Everything else will be put back into the
                # queue.
-               while not self._queue.empty():
-                       result = self._queue.get()
-
-                       if result.file == filename:
-                               results.append(result)
-                       else:
-                               others.append(result)
-
-               # Put back all items that did not match
-               for result in others:
-                       self._queue.put(result)
+               with self._lock:
+                       while not self._queue.empty():
+                               result = self._queue.get()
+
+                               if result.file == filename:
+                                       results.append(result)
+                               else:
+                                       others.append(result)
+
+                       # Put back all items that did not match
+                       for result in others:
+                               self._queue.put(result)
 
                # Write everything else to disk
                if results: