]> git.ipfire.org Git - collecty.git/blobdiff - src/collecty/plugins/base.py
Rewrite plugin architecture
[collecty.git] / src / collecty / plugins / base.py
index 82b0982b521a9de371aae4dd53155306d36c976a..94b0bc06593a9ba713b6c34e3d5a1c8e0259536d 100644 (file)
@@ -21,6 +21,7 @@
 
 from __future__ import division
 
+import datetime
 import logging
 import math
 import os
@@ -82,16 +83,8 @@ class Plugin(threading.Thread):
        # the data from this data source.
        templates = []
 
-       # The schema of the RRD database.
-       rrd_schema = None
-
-       # RRA properties.
-       rra_types = ["AVERAGE", "MIN", "MAX"]
-       rra_timespans = [3600, 86400, 604800, 2678400, 31622400]
-       rra_rows = 2880
-
-       # The default interval of this plugin.
-       default_interval = 60
+       # The default interval for all plugins
+       interval = 60
 
        # Automatically register all providers.
        class __metaclass__(type):
@@ -118,7 +111,6 @@ class Plugin(threading.Thread):
                # Check if this plugin was configured correctly.
                assert self.name, "Name of the plugin is not set: %s" % self.name
                assert self.description, "Description of the plugin is not set: %s" % self.description
-               assert self.rrd_schema
 
                # Initialize the logger.
                self.log = logging.getLogger("collecty.plugins.%s" % self.name)
@@ -129,50 +121,136 @@ class Plugin(threading.Thread):
                # Run some custom initialization.
                self.init(**kwargs)
 
-               # Create the database file.
-               self.create()
-
                # Keepalive options
                self.running = True
                self.timer = Timer(self.interval)
 
-               self.log.info(_("Successfully initialized (%s).") % self.id)
-       
-       def __repr__(self):
-               return "<%s %s>" % (self.__class__.__name__, self.id)
+               self.log.info(_("Successfully initialized %s") % self.__class__.__name__)
 
        @property
-       def id(self):
+       def path(self):
                """
-                       A unique ID of the plugin instance.
+                       Returns the name of the sub directory in which all RRD files
+                       for this plugin should be stored in.
                """
                return self.name
 
-       @property
-       def interval(self):
+       ### Basic methods
+
+       def init(self, **kwargs):
                """
-                       Returns the interval in milliseconds, when the read method
-                       should be called again.
+                       Do some custom initialization stuff here.
                """
-               # XXX read this from the settings
+               pass
+
+       def collect(self):
+               """
+                       Gathers the statistical data, this plugin collects.
+               """
+               time_start = time.time()
+
+               # Run through all objects of this plugin and call the collect method.
+               for o in self.objects:
+                       now = datetime.datetime.utcnow()
+                       try:
+                               result = o.collect()
+                       except:
+                               self.log.warning(_("Unhandled exception in %s.collect()") % o, exc_info=True)
+                               continue
+
+                       if not result:
+                               self.log.warning(_("Received empty result: %s") % o)
+                               continue
+
+                       self.log.debug(_("Collected %s: %s") % (o, result))
+
+                       # Add the object to the write queue so that the data is written
+                       # to the databases later.
+                       self.collecty.write_queue.add(o, now, result)
+
+               # Returns the time this function took to complete.
+               return (time.time() - time_start)
+
+       def run(self):
+               self.log.debug(_("%s plugin has started") % self.name)
+
+               # Initially collect everything
+               self.collect()
+
+               while self.running:
+                       # Reset the timer.
+                       self.timer.reset()
+
+                       # Wait until the timer has successfully elapsed.
+                       if self.timer.wait():
+                               delay = self.collect()
+                               self.timer.reset(delay)
+
+               self.log.debug(_("%s plugin has stopped") % self.name)
+
+       def shutdown(self):
+               self.log.debug(_("Received shutdown signal."))
+               self.running = False
+
+               # Kill any running timers.
+               if self.timer:
+                       self.timer.cancel()
 
-               # Otherwise return the default.
-               return self.default_interval
+
+class Object(object):
+       # The schema of the RRD database.
+       rrd_schema = None
+
+       # RRA properties.
+       rra_types     = ["AVERAGE", "MIN", "MAX"]
+       rra_timespans = [3600, 86400, 604800, 2678400, 31622400]
+       rra_rows      = 2880
+
+       def __init__(self, plugin, *args, **kwargs):
+               self.plugin = plugin
+
+               # Indicates if this object has collected its data
+               self.collected = False
+
+               # Initialise this object
+               self.init(*args, **kwargs)
+
+               # Create the database file.
+               self.create()
+
+       def __repr__(self):
+               return "<%s>" % self.__class__.__name__
 
        @property
-       def stepsize(self):
-               return self.interval
+       def collecty(self):
+               return self.plugin.collecty
 
        @property
-       def heartbeat(self):
-               return self.stepsize * 2
+       def log(self):
+               return self.plugin.log
+
+       @property
+       def id(self):
+               """
+                       Returns a UNIQUE identifier for this object. As this is incorporated
+                       into the path of RRD file, it must only contain ASCII characters.
+               """
+               raise NotImplementedError
 
        @property
        def file(self):
                """
                        The absolute path to the RRD file of this plugin.
                """
-               return os.path.join(DATABASE_DIR, "%s.rrd" % self.id)
+               return os.path.join(DATABASE_DIR, self.plugin.path, "%s.rrd" % self.id)
+
+       ### Basic methods
+
+       def init(self, *args, **kwargs):
+               """
+                       Do some custom initialization stuff here.
+               """
+               pass
 
        def create(self):
                """
@@ -195,6 +273,17 @@ class Plugin(threading.Thread):
                for arg in args:
                        self.log.debug("  %s" % arg)
 
+       def info(self):
+               return rrdtool.info(self.file)
+
+       @property
+       def stepsize(self):
+               return self.plugin.interval
+
+       @property
+       def heartbeat(self):
+               return self.stepsize * 2
+
        def get_rrd_schema(self):
                schema = [
                        "--step", "%s" % self.stepsize,
@@ -237,98 +326,23 @@ class Plugin(threading.Thread):
 
                return schema
 
-       def info(self):
-               return rrdtool.info(self.file)
+       def execute(self):
+               if self.collected:
+                       raise RuntimeError("This object has already collected its data")
 
-       ### Basic methods
+               self.collected = True
+               self.now = datetime.datetime.utcnow()
 
-       def init(self, **kwargs):
-               """
-                       Do some custom initialization stuff here.
-               """
-               pass
+               # Call the collect
+               result = self.collect()
 
-       def read(self):
+       def commit(self):
                """
-                       Gathers the statistical data, this plugin collects.
+                       Will commit the collected data to the database.
                """
-               raise NotImplementedError
-
-       def submit(self):
-               """
-                       Flushes the read data to disk.
-               """
-               # Do nothing in case there is no data to submit.
-               if not self.data:
-                       return
-
-               self.log.debug(_("Submitting data to database. %d entries.") % len(self.data))
-               for data in self.data:
-                       self.log.debug("  %s" % data)
-
-               # Create the RRD files (if they don't exist yet or
-               # have vanished for some reason).
+               # Make sure that the RRD database has been created
                self.create()
 
-               rrdtool.update(self.file, *self.data)
-               self.data = []
-
-       def _read(self, *args, **kwargs):
-               """
-                       This method catches errors from the read() method and logs them.
-               """
-               start_time = time.time()
-
-               try:
-                       data = self.read(*args, **kwargs)
-                       if data is None:
-                               self.log.warning(_("Received empty data."))
-                       else:
-                               self.data.append("%d:%s" % (start_time, data))
-
-               # Catch any exceptions, so collecty does not crash.
-               except Exception, e:
-                       self.log.critical(_("Unhandled exception in read()!"), exc_info=True)
-
-               # Return the elapsed time since _read() has been called.
-               return (time.time() - start_time)
-
-       def _submit(self, *args, **kwargs):
-               """
-                       This method catches errors from the submit() method and logs them.
-               """
-               try:
-                       return self.submit(*args, **kwargs)
-
-               # Catch any exceptions, so collecty does not crash.
-               except Exception, e:
-                       self.log.critical(_("Unhandled exception in submit()!"), exc_info=True)
-
-       def run(self):
-               self.log.debug(_("Started."))
-
-               while self.running:
-                       # Reset the timer.
-                       self.timer.reset()
-
-                       # Wait until the timer has successfully elapsed.
-                       if self.timer.wait():
-                               self.log.debug(_("Collecting..."))
-                               delay = self._read()
-
-                               self.timer.reset(delay)
-
-               self._submit()
-               self.log.debug(_("Stopped."))
-
-       def shutdown(self):
-               self.log.debug(_("Received shutdown signal."))
-               self.running = False
-
-               # Kill any running timers.
-               if self.timer:
-                       self.timer.cancel()
-
 
 class GraphTemplate(object):
        # A unique name to identify this graph template.