From 41cf5f727b7d043195db176c4305ee0ba23ec3bf Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Mon, 28 Sep 2020 11:28:20 +0000 Subject: [PATCH] daemon: Refactor some parts of the write queue Signed-off-by: Michael Tremer --- src/collecty/daemon.py | 42 +++++++++++++++++++++++++++++------- src/collecty/plugins/base.py | 38 +++----------------------------- 2 files changed, 37 insertions(+), 43 deletions(-) diff --git a/src/collecty/daemon.py b/src/collecty/daemon.py index 4aadd13..8cd7153 100644 --- a/src/collecty/daemon.py +++ b/src/collecty/daemon.py @@ -247,11 +247,16 @@ class WriteQueue(object): self.log.debug(_("Initialised write queue")) - def add(self, object, time, data): - result = QueueObject(object.file, time, data) + def submit(self, object, data): + """ + Submit a new data point for object + """ + data = QueueObject(object.file, data) with self._lock: - self._queue.put(result) + self._queue.put(data) + + return data def commit(self): """ @@ -332,13 +337,34 @@ class WriteQueue(object): class QueueObject(object): - def __init__(self, file, time, data): + def __init__(self, file, data): self.file = file - self.time = time - self.data = data + self.data = self._format_data(data) + + # Save current timestamp + self.time = time.time() def __str__(self): - return "%s:%s" % (self.time.strftime("%s"), self.data) + return "%.0f:%s" % (self.time, self.data) def __lt__(self, other): - return self.time < other.time + if isinstance(other, self.__class__): + return self.time < other.time + + return NotImplemented + + @staticmethod + def _format_data(data): + if not isinstance(data, tuple) and not isinstance(data, list): + return data + + # Replace all Nones by UNKNOWN + s = [] + + for e in data: + if e is None: + e = "U" + + s.append("%s" % e) + + return ":".join(s) diff --git a/src/collecty/plugins/base.py b/src/collecty/plugins/base.py index 080db04..3cf630f 100644 --- a/src/collecty/plugins/base.py +++ b/src/collecty/plugins/base.py @@ -19,7 +19,6 @@ # # ############################################################################### -import datetime import logging import os import re @@ -148,8 +147,6 @@ class Plugin(object, metaclass=PluginRegistration): # Run through all objects of this plugin and call the collect method. for object in self.objects: - now = datetime.datetime.utcnow() - # Run collection try: result = object.collect() @@ -163,14 +160,11 @@ class Plugin(object, metaclass=PluginRegistration): self.log.warning(_("Received empty result: %s") % object) continue - # Format the result for RRDtool - result = self._format_result(result) - - self.log.debug(_("Collected %s: %s") % (object, result)) - # Add the object to the write queue so that the data is written # to the databases later. - self.collecty.write_queue.add(object, now, result) + result = self.collecty.write_queue.submit(object, result) + + self.log.debug(_("Collected %s: %s") % (object, result)) # Returns the time this function took to complete. delay = time.time() - time_start @@ -181,22 +175,6 @@ class Plugin(object, metaclass=PluginRegistration): else: self.log.debug(_("Collection finished in %.2fms") % (delay * 1000)) - @staticmethod - def _format_result(result): - if not isinstance(result, tuple) and not isinstance(result, list): - return result - - # Replace all Nones by UNKNOWN - s = [] - - for e in result: - if e is None: - e = "U" - - s.append("%s" % e) - - return ":".join(s) - def get_object(self, id): for object in self.objects: if not object.id == id: @@ -453,16 +431,6 @@ class Object(object): x, y, vals = rrdtool.graph("/dev/null", *args) return dict(zip(self.rrd_schema_names, vals)) - def execute(self): - if self.collected: - raise RuntimeError("This object has already collected its data") - - self.collected = True - self.now = datetime.datetime.utcnow() - - # Call the collect - result = self.collect() - def commit(self): """ Will commit the collected data to the database. -- 2.47.2