From: Michael Tremer Date: Mon, 28 Sep 2020 11:18:04 +0000 (+0000) Subject: daemon: Make WriteQueue thread-safe X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2c0fa15ea6f84ae54b7e8457f890c44f21801b36;p=collecty.git daemon: Make WriteQueue thread-safe Signed-off-by: Michael Tremer --- diff --git a/src/collecty/daemon.py b/src/collecty/daemon.py index 199e363..4aadd13 100644 --- a/src/collecty/daemon.py +++ b/src/collecty/daemon.py @@ -27,6 +27,7 @@ import sched import signal import tarfile import tempfile +import threading import time from . import bus @@ -239,13 +240,18 @@ class WriteQueue(object): self.log = logging.getLogger("collecty.queue") + # Lock to make this class thread-safe + self._lock = threading.Lock() + self._queue = queue.PriorityQueue() self.log.debug(_("Initialised write queue")) def add(self, object, time, data): result = QueueObject(object.file, time, data) - self._queue.put(result) + + with self._lock: + self._queue.put(result) def commit(self): """ @@ -263,13 +269,15 @@ class WriteQueue(object): # Get all objects from the queue and group them by the RRD file # to commit them all at once results = {} - while not self._queue.empty(): - result = self._queue.get() - try: - results[result.file].append(result) - except KeyError: - results[result.file] = [result] + with self._lock: + while not self._queue.empty(): + result = self._queue.get() + + try: + results[result.file].append(result) + except KeyError: + results[result.file] = [result] # Write the collected data to disk for filename, results in list(results.items()): @@ -305,17 +313,18 @@ class WriteQueue(object): # ready any items selectively. Everything that belongs to our # transaction is kept. Everything else will be put back into the # queue. - while not self._queue.empty(): - result = self._queue.get() - - if result.file == filename: - results.append(result) - else: - others.append(result) - - # Put back all items that did not match - for result in others: - self._queue.put(result) + with self._lock: + while not self._queue.empty(): + result = self._queue.get() + + if result.file == filename: + results.append(result) + else: + others.append(result) + + # Put back all items that did not match + for result in others: + self._queue.put(result) # Write everything else to disk if results: