self.log.debug(_("Initialised write queue"))
- def add(self, object, time, data):
- result = QueueObject(object.file, time, data)
+ def submit(self, object, data):
+ """
+ Submit a new data point for object
+ """
+ data = QueueObject(object.file, data)
with self._lock:
- self._queue.put(result)
+ self._queue.put(data)
+
+ return data
def commit(self):
"""
class QueueObject(object):
- def __init__(self, file, time, data):
+ def __init__(self, file, data):
self.file = file
- self.time = time
- self.data = data
+ self.data = self._format_data(data)
+
+ # Save current timestamp
+ self.time = time.time()
def __str__(self):
- return "%s:%s" % (self.time.strftime("%s"), self.data)
+ return "%.0f:%s" % (self.time, self.data)
def __lt__(self, other):
- return self.time < other.time
+ if isinstance(other, self.__class__):
+ return self.time < other.time
+
+ return NotImplemented
+
+ @staticmethod
+ def _format_data(data):
+ if not isinstance(data, tuple) and not isinstance(data, list):
+ return data
+
+ # Replace all Nones by UNKNOWN
+ s = []
+
+ for e in data:
+ if e is None:
+ e = "U"
+
+ s.append("%s" % e)
+
+ return ":".join(s)
# #
###############################################################################
-import datetime
import logging
import os
import re
# Run through all objects of this plugin and call the collect method.
for object in self.objects:
- now = datetime.datetime.utcnow()
-
# Run collection
try:
result = object.collect()
self.log.warning(_("Received empty result: %s") % object)
continue
- # Format the result for RRDtool
- result = self._format_result(result)
-
- self.log.debug(_("Collected %s: %s") % (object, result))
-
# Add the object to the write queue so that the data is written
# to the databases later.
- self.collecty.write_queue.add(object, now, result)
+ result = self.collecty.write_queue.submit(object, result)
+
+ self.log.debug(_("Collected %s: %s") % (object, result))
# Returns the time this function took to complete.
delay = time.time() - time_start
else:
self.log.debug(_("Collection finished in %.2fms") % (delay * 1000))
- @staticmethod
- def _format_result(result):
- if not isinstance(result, tuple) and not isinstance(result, list):
- return result
-
- # Replace all Nones by UNKNOWN
- s = []
-
- for e in result:
- if e is None:
- e = "U"
-
- s.append("%s" % e)
-
- return ":".join(s)
-
def get_object(self, id):
for object in self.objects:
if not object.id == id:
x, y, vals = rrdtool.graph("/dev/null", *args)
return dict(zip(self.rrd_schema_names, vals))
- def execute(self):
- if self.collected:
- raise RuntimeError("This object has already collected its data")
-
- self.collected = True
- self.now = datetime.datetime.utcnow()
-
- # Call the collect
- result = self.collect()
-
def commit(self):
"""
Will commit the collected data to the database.