import signal
import tarfile
import tempfile
+import threading
import time
from . import bus
self.log = logging.getLogger("collecty.queue")
+ # Lock to make this class thread-safe
+ self._lock = threading.Lock()
+
self._queue = queue.PriorityQueue()
self.log.debug(_("Initialised write queue"))
def add(self, object, time, data):
result = QueueObject(object.file, time, data)
- self._queue.put(result)
+
+ with self._lock:
+ self._queue.put(result)
def commit(self):
"""
# Get all objects from the queue and group them by the RRD file
# to commit them all at once
results = {}
- while not self._queue.empty():
- result = self._queue.get()
- try:
- results[result.file].append(result)
- except KeyError:
- results[result.file] = [result]
+ with self._lock:
+ while not self._queue.empty():
+ result = self._queue.get()
+
+ try:
+ results[result.file].append(result)
+ except KeyError:
+ results[result.file] = [result]
# Write the collected data to disk
for filename, results in list(results.items()):
# ready any items selectively. Everything that belongs to our
# transaction is kept. Everything else will be put back into the
# queue.
- while not self._queue.empty():
- result = self._queue.get()
-
- if result.file == filename:
- results.append(result)
- else:
- others.append(result)
-
- # Put back all items that did not match
- for result in others:
- self._queue.put(result)
+ with self._lock:
+ while not self._queue.empty():
+ result = self._queue.get()
+
+ if result.file == filename:
+ results.append(result)
+ else:
+ others.append(result)
+
+ # Put back all items that did not match
+ for result in others:
+ self._queue.put(result)
# Write everything else to disk
if results: