import logging
import os
-import queue
import rrdtool
import sched
import signal
self.log = logging.getLogger("collecty.queue")
+ # Store data here
+ self._data = []
+
# Lock to make this class thread-safe
self._lock = threading.Lock()
- self._queue = queue.PriorityQueue()
-
self.log.debug(_("Initialised write queue"))
def submit(self, object, data):
data = QueueObject(object.file, data)
with self._lock:
- self._queue.put(data)
+ self._data.append(data)
return data
"""
Flushes the read data to disk.
"""
- # There is nothing to do if the queue is empty
- if self._queue.empty():
- self.log.debug(_("No data to commit"))
- return
+ self.log.debug(_("Committing data to disk..."))
time_start = time.time()
- self.log.debug(_("Submitting data to the databases..."))
-
- # Get all objects from the queue and group them by the RRD file
- # to commit them all at once
- results = {}
-
+ # There is nothing to do if the queue is empty
with self._lock:
- while not self._queue.empty():
- result = self._queue.get()
+ if not self._data:
+ self.log.debug(_("No data to commit"))
+ return
+
+ # Get all objects from the queue and group them by the RRD file
+ # to commit them all at once
+ results = {}
+ # Group all datapoints by file
+ for data in self._data:
try:
- results[result.file].append(result)
+ results[data.file].append(data)
except KeyError:
- results[result.file] = [result]
+ results[data.file] = [data]
+
+ # Clear the queue
+ self._data.clear()
# Write the collected data to disk
- for filename, results in list(results.items()):
- self._commit_file(filename, results)
+ for filename in sorted(results):
+ self._commit_file(filename, results[filename])
duration = time.time() - time_start
self.log.debug(_("Emptied write queue in %.2fs") % duration)
self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
% { "counter" : len(results), "filename" : filename })
- for result in results:
- self.log.debug(" %s: %s" % (result.time, result.data))
+ # Sort data before submitting it to rrdtool
+ results.sort()
+
+ for data in results:
+ self.log.debug(" %s" % data)
try:
rrdtool.update(filename, *["%s" % r for r in results])
# transaction is kept. Everything else will be put back into the
# queue.
with self._lock:
- while not self._queue.empty():
- result = self._queue.get()
-
- if result.file == filename:
- results.append(result)
+ for data in self._data:
+ if data.file == filename:
+ results.append(data)
else:
- others.append(result)
+ others.append(data)
# Put back all items that did not match
- for result in others:
- self._queue.put(result)
+ self._data = others
# Write everything else to disk
if results: