# #
###############################################################################
-from threading import Thread
+from __future__ import division
+import logging
+import math
+import os
+import rrdtool
+import threading
+import time
+
+from ..constants import *
from ..i18n import _
-class Plugin(Thread):
- def __init__(self, collecty, **kwargs):
- Thread.__init__(self)
- self.collecty = collecty
+class Timer(object):
+ def __init__(self, timeout, heartbeat=1):
+ self.timeout = timeout
+ self.heartbeat = heartbeat
- self.interval = int(kwargs.get("interval", 60))
+ self.reset()
- # Keepalive options
- self.heartbeat = 2
+ def reset(self):
+ # Save start time.
+ self.start = time.time()
+
+ # Has this timer been killed?
self.killed = False
- self.wakeup = self.interval / self.heartbeat
+ @property
+ def elapsed(self):
+ return time.time() - self.start
+
+ def cancel(self):
+ self.killed = True
+
+ def wait(self):
+ while self.elapsed < self.timeout and not self.killed:
+ time.sleep(self.heartbeat)
+
+ return self.elapsed > self.timeout
- self.file = kwargs.get("file", None)
- if not self.file.startswith("/"):
- self.file = os.path.join("/var/rrd", self.file)
+
+class DataSource(threading.Thread):
+ # The name of this plugin.
+ name = None
+
+ # A description for this plugin.
+ description = None
+
+ # Templates which can be used to generate a graph out of
+ # the data from this data source.
+ templates = []
+
+ # The schema of the RRD database.
+ rrd_schema = None
+
+ # RRA properties.
+ rra_types = ["AVERAGE", "MIN", "MAX"]
+ rra_timespans = [3600, 86400, 604800, 2678400, 31622400]
+ rra_rows = 2880
+
+ # The default interval of this plugin.
+ default_interval = 60
+
+ def __init__(self, collecty, **kwargs):
+ threading.Thread.__init__(self, name=self.description)
+ self.daemon = True
+
+ self.collecty = collecty
+
+ # Check if this plugin was configured correctly.
+ assert self.name, "Name of the plugin is not set: %s" % self.name
+ assert self.description, "Description of the plugin is not set: %s" % self.description
+ assert self.rrd_schema
+
+ # Initialize the logger.
+ self.log = logging.getLogger("collecty.plugins.%s" % self.name)
+ self.log.propagate = 1
self.data = []
+ # Run some custom initialization.
+ self.init(**kwargs)
+
+ # Create the database file.
self.create()
+
+ # Keepalive options
+ self.running = True
+ self.timer = Timer(self.interval)
+
+ self.log.info(_("Successfully initialized (%s).") % self.id)
def __repr__(self):
- return "<Plugin %s>" % self._type
-
- def __str__(self):
- return "Plugin %s %s" % (self._type, self.file)
-
- def run(self):
- self.collecty.debug("%s started..." % self)
-
- c = 0
- while True:
- if self.killed:
- self.update()
- self.collecty.debug("%s stoppped..." % self)
- return
+ return "<%s %s>" % (self.__class__.__name__, self.id)
- if c == 0:
- self.data.append(self.collect())
- self.collecty.debug("%s collectd: %s..." % (self, self.data[-1]))
+ @property
+ def id(self):
+ """
+ A unique ID of the plugin instance.
+ """
+ return self.name
- self.update()
+ @property
+ def interval(self):
+ """
+ Returns the interval in milliseconds, when the read method
+ should be called again.
+ """
+ # XXX read this from the settings
- c = self.wakeup
+ # Otherwise return the default.
+ return self.default_interval
- c = c - 1
- time.sleep(self.heartbeat)
+ @property
+ def stepsize(self):
+ return self.interval
+
+ @property
+ def file(self):
+ """
+ The absolute path to the RRD file of this plugin.
+ """
+ return os.path.join(DATABASE_DIR, "%s.rrd" % self.id)
+
+ def create(self):
+ """
+ Creates an empty RRD file with the desired data structures.
+ """
+ # Skip if the file does already exist.
+ if os.path.exists(self.file):
+ return
+
+ dirname = os.path.dirname(self.file)
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
+
+ # Create argument list.
+ args = [
+ "--step", "%s" % self.default_interval,
+ ] + self.get_rrd_schema()
+
+ rrdtool.create(self.file, *args)
+
+ self.log.debug(_("Created RRD file %s.") % self.file)
+
+ def get_rrd_schema(self):
+ schema = [
+ "--step", "%s" % self.stepsize,
+ ]
+ for line in self.rrd_schema:
+ if line.startswith("DS:"):
+ try:
+ (prefix, name, type, lower_limit, upper_limit) = line.split(":")
+
+ line = ":".join((
+ prefix,
+ name,
+ type,
+ "%s" % self.stepsize,
+ lower_limit,
+ upper_limit
+ ))
+ except ValueError:
+ pass
+
+ schema.append(line)
+
+ xff = 0.1
+
+ cdp_length = 0
+ for rra_timespan in self.rra_timespans:
+ if (rra_timespan / self.stepsize) < self.rra_rows:
+ rra_timespan = self.stepsize * self.rra_rows
+
+ if cdp_length == 0:
+ cdp_length = 1
+ else:
+ cdp_length = rra_timespan // (self.rra_rows * self.stepsize)
+
+ cdp_number = math.ceil(rra_timespan / (cdp_length * self.stepsize))
+
+ for rra_type in self.rra_types:
+ schema.append("RRA:%s:%.10f:%d:%d" % \
+ (rra_type, xff, cdp_length, cdp_number))
+
+ return schema
+
+ def info(self):
+ return rrdtool.info(self.file)
+
+ ### Basic methods
+
+ def init(self, **kwargs):
+ """
+ Do some custom initialization stuff here.
+ """
+ pass
+
+ def read(self):
+ """
+ Gathers the statistical data, this plugin collects.
+ """
+ raise NotImplementedError
+
+ def submit(self):
+ """
+ Flushes the read data to disk.
+ """
+ # Do nothing in case there is no data to submit.
+ if not self.data:
+ return
+
+ self.log.debug(_("Submitting data to database. %d entries.") % len(self.data))
+ rrdtool.update(self.file, *self.data)
+ self.data = []
+
+ def _read(self, *args, **kwargs):
+ """
+ This method catches errors from the read() method and logs them.
+ """
+ try:
+ return self.read(*args, **kwargs)
+
+ # Catch any exceptions, so collecty does not crash.
+ except Exception, e:
+ self.log.critical(_("Unhandled exception in read()!"), exc_info=True)
+
+ def _submit(self, *args, **kwargs):
+ """
+ This method catches errors from the submit() method and logs them.
+ """
+ try:
+ return self.submit(*args, **kwargs)
+
+ # Catch any exceptions, so collecty does not crash.
+ except Exception, e:
+ self.log.critical(_("Unhandled exception in submit()!"), exc_info=True)
+
+ def run(self):
+ self.log.debug(_("Started."))
+
+ while self.running:
+ # Reset the timer.
+ self.timer.reset()
+
+ # Wait until the timer has successfully elapsed.
+ if self.timer.wait():
+ self.log.debug(_("Collecting..."))
+ self._read()
+
+ self._submit()
+ self.log.debug(_("Stopped."))
def shutdown(self):
- self.killed = True
+ self.log.debug(_("Received shutdown signal."))
+ self.running = False
- def time(self):
- return int(time.time()) # Should return time as int in UTC
+ # Kill any running timers.
+ if self.timer:
+ self.timer.cancel()
- def create(self):
- if not os.path.exists(self.file):
- rrdtool.create(self.file, *self._rrd)
+ @property
+ def now(self):
+ """
+ Returns the current timestamp in the UNIX timestamp format (UTC).
+ """
+ return int(time.time())
+
+
+class GraphTemplate(object):
+ # A unique name to identify this graph template.
+ name = None
+
+ # Instructions how to create the graph.
+ rrd_graph = None
+
+ # Extra arguments passed to rrdgraph.
+ rrd_graph_args = []
- def update(self):
- if self.data:
- self.collecty.debug("%s saving data..." % self)
- rrdtool.update(self.file, *self.data)
- self.data = []
+ def __init__(self, ds):
+ self.ds = ds
- def collect(self):
- raise Exception, "Not implemented"
+ @property
+ def collecty(self):
+ return self.ds.collecty
- def graph(self, file, interval=None):
- args = [ "--imgformat", "PNG",
- "-w", "580", # Width of the graph
- "-h", "240", # Height of the graph
- "--interlaced", "--slope-mode", ]
+ def graph(self, file, interval=None,
+ width=GRAPH_DEFAULT_WIDTH, height=GRAPH_DEFAULT_HEIGHT):
+ args = [
+ "--width", "%d" % width,
+ "--height", "%d" % height,
+ ]
+ args += self.collecty.graph_default_arguments
+ args += self.rrd_graph_args
- intervals = { None : "-3h",
- "hour" : "-1h",
- "day" : "-25h",
- "week" : "-360h" }
+ intervals = {
+ None : "-3h",
+ "hour" : "-1h",
+ "day" : "-25h",
+ "week" : "-360h",
+ "year" : "-365d",
+ }
args.append("--start")
- if intervals.has_key(interval):
+ try:
args.append(intervals[interval])
- else:
+ except KeyError:
args.append(interval)
- info = { "file" : self.file }
- for item in self._graph:
+ info = { "file" : self.ds.file }
+ for item in self.rrd_graph:
try:
args.append(item % info)
except TypeError:
args.append(item)
- rrdtool.graph(file, *args)
-
- def info(self):
- return rrdtool.info(self.file)
-
+ rrdtool.graph(file, *args)