]> git.ipfire.org Git - oddments/collecty.git/commitdiff
daemon: Refactor some parts of the write queue
authorMichael Tremer <michael.tremer@ipfire.org>
Mon, 28 Sep 2020 11:28:20 +0000 (11:28 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Mon, 28 Sep 2020 11:28:20 +0000 (11:28 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/collecty/daemon.py
src/collecty/plugins/base.py

index 4aadd13fbb6bae31b05f4e4499b038b24165f62e..8cd7153f44f884845943f15ab96b8fc405d09292 100644 (file)
@@ -247,11 +247,16 @@ class WriteQueue(object):
 
                self.log.debug(_("Initialised write queue"))
 
-       def add(self, object, time, data):
-               result = QueueObject(object.file, time, data)
+       def submit(self, object, data):
+               """
+                       Submit a new data point for object
+               """
+               data = QueueObject(object.file, data)
 
                with self._lock:
-                       self._queue.put(result)
+                       self._queue.put(data)
+
+               return data
 
        def commit(self):
                """
@@ -332,13 +337,34 @@ class WriteQueue(object):
 
 
 class QueueObject(object):
-       def __init__(self, file, time, data):
+       def __init__(self, file, data):
                self.file = file
-               self.time = time
-               self.data = data
+               self.data = self._format_data(data)
+
+               # Save current timestamp
+               self.time = time.time()
 
        def __str__(self):
-               return "%s:%s" % (self.time.strftime("%s"), self.data)
+               return "%.0f:%s" % (self.time, self.data)
 
        def __lt__(self, other):
-               return self.time < other.time
+               if isinstance(other, self.__class__):
+                       return self.time < other.time
+
+               return NotImplemented
+
+       @staticmethod
+       def _format_data(data):
+               if not isinstance(data, tuple) and not isinstance(data, list):
+                       return data
+
+               # Replace all Nones by UNKNOWN
+               s = []
+
+               for e in data:
+                       if e is None:
+                               e = "U"
+
+                       s.append("%s" % e)
+
+               return ":".join(s)
index 080db041c4bcea61c1dc14f8848f19c9fed1df78..3cf630f3253e2295651f919b815fb5a20b77a90c 100644 (file)
@@ -19,7 +19,6 @@
 #                                                                             #
 ###############################################################################
 
-import datetime
 import logging
 import os
 import re
@@ -148,8 +147,6 @@ class Plugin(object, metaclass=PluginRegistration):
 
                # Run through all objects of this plugin and call the collect method.
                for object in self.objects:
-                       now = datetime.datetime.utcnow()
-
                        # Run collection
                        try:
                                result = object.collect()
@@ -163,14 +160,11 @@ class Plugin(object, metaclass=PluginRegistration):
                                self.log.warning(_("Received empty result: %s") % object)
                                continue
 
-                       # Format the result for RRDtool
-                       result = self._format_result(result)
-
-                       self.log.debug(_("Collected %s: %s") % (object, result))
-
                        # Add the object to the write queue so that the data is written
                        # to the databases later.
-                       self.collecty.write_queue.add(object, now, result)
+                       result = self.collecty.write_queue.submit(object, result)
+
+                       self.log.debug(_("Collected %s: %s") % (object, result))
 
                # Returns the time this function took to complete.
                delay = time.time() - time_start
@@ -181,22 +175,6 @@ class Plugin(object, metaclass=PluginRegistration):
                else:
                        self.log.debug(_("Collection finished in %.2fms") % (delay * 1000))
 
-       @staticmethod
-       def _format_result(result):
-               if not isinstance(result, tuple) and not isinstance(result, list):
-                       return result
-
-               # Replace all Nones by UNKNOWN
-               s = []
-
-               for e in result:
-                       if e is None:
-                               e = "U"
-
-                       s.append("%s" % e)
-
-               return ":".join(s)
-
        def get_object(self, id):
                for object in self.objects:
                        if not object.id == id:
@@ -453,16 +431,6 @@ class Object(object):
                x, y, vals = rrdtool.graph("/dev/null", *args)
                return dict(zip(self.rrd_schema_names, vals))
 
-       def execute(self):
-               if self.collected:
-                       raise RuntimeError("This object has already collected its data")
-
-               self.collected = True
-               self.now = datetime.datetime.utcnow()
-
-               # Call the collect
-               result = self.collect()
-
        def commit(self):
                """
                        Will commit the collected data to the database.