-#!/usr/bin/python
+#!/usr/bin/python3
###############################################################################
# #
# collecty - A system statistics collection daemon for IPFire #
# #
###############################################################################
-from __future__ import division
-
-import datetime
import logging
-import math
import os
+import re
import rrdtool
-import threading
import time
+import unicodedata
+from .. import util
from ..constants import *
from ..i18n import _
-_plugins = {}
+DEF_MATCH = r"C?DEF:([A-Za-z0-9_]+)="
-def get():
+class Environment(object):
"""
- Returns a list with all automatically registered plugins.
+ Sets the correct environment for rrdtool to create
+ localised graphs and graphs in the correct timezone.
"""
- return _plugins.values()
+ def __init__(self, timezone="UTC", locale="en_US.utf-8"):
+ # Build the new environment
+ self.new_environment = {
+ "LANGUAGE" : locale,
+ "LC_ALL" : locale,
+ "TZ" : timezone,
+ }
+
+ def __enter__(self):
+ # Save the current environment
+ self.old_environment = {}
-class Timer(object):
- def __init__(self, timeout, heartbeat=1):
- self.timeout = timeout
- self.heartbeat = heartbeat
+ for k in self.new_environment:
+ # Store the old value
+ self.old_environment[k] = os.environ.get(k, None)
- self.delay = 0
+ # Apply the new one
+ if self.new_environment[k]:
+ os.environ[k] = self.new_environment[k]
- self.reset()
+ def __exit__(self, type, value, traceback):
+ # Roll back to the previous environment
+ for k, v in self.old_environment.items():
+ if v is None:
+ try:
+ del os.environ[k]
+ except KeyError:
+ pass
+ else:
+ os.environ[k] = v
- def reset(self, delay=0):
- # Save start time.
- self.start = time.time()
- self.delay = delay
+class PluginRegistration(type):
+ plugins = {}
- # Has this timer been killed?
- self.killed = False
+ def __init__(plugin, name, bases, dict):
+ type.__init__(plugin, name, bases, dict)
- @property
- def elapsed(self):
- return time.time() - self.start - self.delay
+ # The main class from which is inherited is not registered
+ # as a plugin.
+ if name == "Plugin":
+ return
- def cancel(self):
- self.killed = True
+ if not all((plugin.name, plugin.description)):
+ raise RuntimeError(_("Plugin is not properly configured: %s") % plugin)
- def wait(self):
- while self.elapsed < self.timeout and not self.killed:
- time.sleep(self.heartbeat)
+ PluginRegistration.plugins[plugin.name] = plugin
- return self.elapsed > self.timeout
+def get():
+ """
+ Returns a list with all automatically registered plugins.
+ """
+ return PluginRegistration.plugins.values()
-class Plugin(threading.Thread):
+class Plugin(object, metaclass=PluginRegistration):
# The name of this plugin.
name = None
# The default interval for all plugins
interval = 60
- # Automatically register all providers.
- class __metaclass__(type):
- def __init__(plugin, name, bases, dict):
- type.__init__(plugin, name, bases, dict)
-
- # The main class from which is inherited is not registered
- # as a plugin.
- if name == "Plugin":
- return
-
- if not all((plugin.name, plugin.description)):
- raise RuntimeError(_("Plugin is not properly configured: %s") \
- % plugin)
-
- _plugins[plugin.name] = plugin
+ # Priority
+ priority = 0
def __init__(self, collecty, **kwargs):
- threading.Thread.__init__(self, name=self.description)
- self.daemon = True
-
self.collecty = collecty
# Check if this plugin was configured correctly.
# Initialize the logger.
self.log = logging.getLogger("collecty.plugins.%s" % self.name)
- self.log.propagate = 1
-
- self.data = []
# Run some custom initialization.
self.init(**kwargs)
- # Keepalive options
- self.running = True
- self.timer = Timer(self.interval)
-
- self.log.info(_("Successfully initialized %s") % self.__class__.__name__)
+ self.log.debug(_("Successfully initialized %s") % self.__class__.__name__)
@property
def path(self):
time_start = time.time()
# Run through all objects of this plugin and call the collect method.
- for o in self.objects:
- now = datetime.datetime.utcnow()
+ for object in self.objects:
+ # Run collection
try:
- result = o.collect()
- except:
- self.log.warning(_("Unhandled exception in %s.collect()") % o, exc_info=True)
+ result = object.collect()
+
+ # Catch any unhandled exceptions
+ except Exception as e:
+ self.log.warning(_("Unhandled exception in %s.collect()") % object, exc_info=True)
continue
if not result:
- self.log.warning(_("Received empty result: %s") % o)
+ self.log.warning(_("Received empty result: %s") % object)
continue
- self.log.debug(_("Collected %s: %s") % (o, result))
-
# Add the object to the write queue so that the data is written
# to the databases later.
- self.collecty.write_queue.add(o, now, result)
+ result = self.collecty.write_queue.submit(object, result)
+
+ self.log.debug(_("Collected %s: %s") % (object, result))
# Returns the time this function took to complete.
- return (time.time() - time_start)
+ delay = time.time() - time_start
+
+ # Log some warning when a collect method takes too long to return some data
+ if delay >= 60:
+ self.log.warning(_("A worker thread was stalled for %.4fs") % delay)
+ else:
+ self.log.debug(_("Collection finished in %.2fms") % (delay * 1000))
+
+ def get_object(self, id):
+ for object in self.objects:
+ if not object.id == id:
+ continue
+
+ return object
+
+ def get_template(self, template_name, object_id, locale=None, timezone=None):
+ for template in self.templates:
+ if not template.name == template_name:
+ continue
+
+ return template(self, object_id, locale=locale, timezone=timezone)
- def run(self):
- self.log.debug(_("%s plugin has started") % self.name)
+ def generate_graph(self, template_name, object_id="default",
+ timezone=None, locale=None, **kwargs):
+ template = self.get_template(template_name, object_id=object_id,
+ timezone=timezone, locale=locale)
+ if not template:
+ raise RuntimeError("Could not find template %s" % template_name)
- # Initially collect everything
- self.collect()
+ time_start = time.time()
+
+ with Environment(timezone=timezone, locale=locale):
+ graph = template.generate_graph(**kwargs)
+
+ duration = time.time() - time_start
+ self.log.debug(_("Generated graph %s in %.1fms") \
+ % (template, duration * 1000))
- while self.running:
- # Reset the timer.
- self.timer.reset()
+ return graph
- # Wait until the timer has successfully elapsed.
- if self.timer.wait():
- delay = self.collect()
- self.timer.reset(delay)
+ def graph_info(self, template_name, object_id="default",
+ timezone=None, locale=None, **kwargs):
+ template = self.get_template(template_name, object_id=object_id,
+ timezone=timezone, locale=locale)
+ if not template:
+ raise RuntimeError("Could not find template %s" % template_name)
- self.log.debug(_("%s plugin has stopped") % self.name)
+ return template.graph_info()
- def shutdown(self):
- self.log.debug(_("Received shutdown signal."))
- self.running = False
+ def last_update(self, object_id="default"):
+ object = self.get_object(object_id)
+ if not object:
+ raise RuntimeError("Could not find object %s" % object_id)
- # Kill any running timers.
- if self.timer:
- self.timer.cancel()
+ return object.last_update()
class Object(object):
rrd_schema = None
# RRA properties.
- rra_types = ["AVERAGE", "MIN", "MAX"]
- rra_timespans = [3600, 86400, 604800, 2678400, 31622400]
- rra_rows = 2880
+ rra_types = ("AVERAGE", "MIN", "MAX")
+ rra_timespans = (
+ ("1m", "10d"),
+ ("1h", "18M"),
+ ("1d", "5y"),
+ )
def __init__(self, plugin, *args, **kwargs):
self.plugin = plugin
- # Indicates if this object has collected its data
- self.collected = False
-
# Initialise this object
self.init(*args, **kwargs)
self.create()
def __repr__(self):
- return "<%s>" % self.__class__.__name__
+ return "<%s %s>" % (self.__class__.__name__, self.id)
+
+ def __lt__(self, other):
+ return self.id < other.id
@property
def collecty(self):
"""
The absolute path to the RRD file of this plugin.
"""
- return os.path.join(DATABASE_DIR, self.plugin.path, "%s.rrd" % self.id)
+ filename = self._normalise_filename("%s.rrd" % self.id)
+
+ return os.path.join(DATABASE_DIR, self.plugin.path, filename)
+
+ @staticmethod
+ def _normalise_filename(filename):
+ # Convert the filename into ASCII characters only
+ filename = unicodedata.normalize("NFKC", filename)
+
+ # Replace any spaces by dashes
+ filename = filename.replace(" ", "-")
+
+ return filename
### Basic methods
def info(self):
return rrdtool.info(self.file)
+ def last_update(self):
+ """
+ Returns a dictionary with the timestamp and
+ data set of the last database update.
+ """
+ return {
+ "dataset" : self.last_dataset,
+ "timestamp" : self.last_updated,
+ }
+
+ def _last_update(self):
+ return rrdtool.lastupdate(self.file)
+
+ @property
+ def last_updated(self):
+ """
+ Returns the timestamp when this database was last updated
+ """
+ lu = self._last_update()
+
+ if lu:
+ return lu.get("date")
+
+ @property
+ def last_dataset(self):
+ """
+ Returns the latest dataset in the database
+ """
+ lu = self._last_update()
+
+ if lu:
+ return lu.get("ds")
+
@property
def stepsize(self):
return self.plugin.interval
xff = 0.1
- cdp_length = 0
- for rra_timespan in self.rra_timespans:
- if (rra_timespan / self.stepsize) < self.rra_rows:
- rra_timespan = self.stepsize * self.rra_rows
+ for steps, rows in self.rra_timespans:
+ for type in self.rra_types:
+ schema.append("RRA:%s:%s:%s:%s" % (type, xff, steps, rows))
+
+ return schema
+
+ @property
+ def rrd_schema_names(self):
+ ret = []
+
+ for line in self.rrd_schema:
+ (prefix, name, type, lower_limit, upper_limit) = line.split(":")
+ ret.append(name)
+
+ return ret
+
+ def make_rrd_defs(self, prefix=None):
+ defs = []
- if cdp_length == 0:
- cdp_length = 1
+ for name in self.rrd_schema_names:
+ if prefix:
+ p = "%s_%s" % (prefix, name)
else:
- cdp_length = rra_timespan // (self.rra_rows * self.stepsize)
+ p = name
- cdp_number = math.ceil(rra_timespan / (cdp_length * self.stepsize))
+ defs += [
+ "DEF:%s=%s:%s:AVERAGE" % (p, self.file, name),
+ ]
- for rra_type in self.rra_types:
- schema.append("RRA:%s:%.10f:%d:%d" % \
- (rra_type, xff, cdp_length, cdp_number))
+ return defs
- return schema
+ def get_stddev(self, interval=None):
+ args = self.make_rrd_defs()
- def execute(self):
- if self.collected:
- raise RuntimeError("This object has already collected its data")
+ # Add the correct interval
+ args += ["--start", util.make_interval(interval)]
- self.collected = True
- self.now = datetime.datetime.utcnow()
+ for name in self.rrd_schema_names:
+ args += [
+ "VDEF:%s_stddev=%s,STDEV" % (name, name),
+ "PRINT:%s_stddev:%%lf" % name,
+ ]
- # Call the collect
- result = self.collect()
+ x, y, vals = rrdtool.graph("/dev/null", *args)
+ return dict(zip(self.rrd_schema_names, vals))
def commit(self):
"""
# Make sure that the RRD database has been created
self.create()
+ # Write everything to disk that is in the write queue
+ self.collecty.write_queue.commit_file(self.file)
+
+ # Convenience functions for plugin authors
+
+ def read_file(self, *args, strip=True):
+ """
+ Reads the content of the given file
+ """
+ filename = os.path.join(*args)
+
+ try:
+ with open(filename) as f:
+ value = f.read()
+ except FileNotFoundError as e:
+ return None
+
+ # Strip any excess whitespace
+ if strip:
+ value = value.strip()
+
+ return value
+
+ def read_file_integer(self, filename):
+ """
+ Reads the content from a file and returns it as an integer
+ """
+ value = self.read_file(filename)
+
+ try:
+ return int(value)
+ except (TypeError, ValueError):
+ return None
+
+ def read_proc_stat(self):
+ """
+ Reads /proc/stat and returns it as a dictionary
+ """
+ ret = {}
+
+ with open("/proc/stat") as f:
+ for line in f:
+ # Split the key from the rest of the line
+ key, line = line.split(" ", 1)
+
+ # Remove any line breaks
+ ret[key] = line.rstrip()
+
+ return ret
+
+ def read_proc_meminfo(self):
+ ret = {}
+
+ with open("/proc/meminfo") as f:
+ for line in f:
+ # Split the key from the rest of the line
+ key, line = line.split(":", 1)
+
+ # Remove any whitespace
+ line = line.strip()
+
+ # Remove any trailing kB
+ if line.endswith(" kB"):
+ line = line[:-3]
+
+ # Try to convert to integer
+ try:
+ line = int(line)
+ except (TypeError, ValueError):
+ continue
+
+ ret[key] = line
+
+ return ret
+
class GraphTemplate(object):
# A unique name to identify this graph template.
name = None
+ # Headline of the graph image
+ graph_title = None
+
+ # Vertical label of the graph
+ graph_vertical_label = None
+
+ # Limits
+ lower_limit = None
+ upper_limit = None
+
# Instructions how to create the graph.
rrd_graph = None
# Extra arguments passed to rrdgraph.
rrd_graph_args = []
- def __init__(self, ds):
- self.ds = ds
+ def __init__(self, plugin, object_id, locale=None, timezone=None):
+ self.plugin = plugin
+
+ # Save localisation parameters
+ self.locale = locale
+ self.timezone = timezone
+
+ # Get all required RRD objects
+ self.object_id = object_id
+
+ # Get the main object
+ self.objects = self.get_objects(self.object_id)
+ self.objects.sort()
+
+ def __repr__(self):
+ return "<%s>" % self.__class__.__name__
@property
def collecty(self):
- return self.ds.collecty
+ return self.plugin.collecty
+
+ @property
+ def log(self):
+ return self.plugin.log
+
+ @property
+ def object(self):
+ """
+ Shortcut to the main object
+ """
+ if len(self.objects) == 1:
+ return self.objects[0]
- def graph(self, file, interval=None,
- width=GRAPH_DEFAULT_WIDTH, height=GRAPH_DEFAULT_HEIGHT):
+ def _make_command_line(self, interval, format=DEFAULT_IMAGE_FORMAT,
+ width=None, height=None, with_title=True, thumbnail=False):
args = [
- "--width", "%d" % width,
- "--height", "%d" % height,
+ # Change the background colour
+ "--color", "BACK#FFFFFFFF",
+
+ # Disable the border around the image
+ "--border", "0",
+
+ # Let's width and height define the size of the entire image
+ "--full-size-mode",
+
+ # Gives the curves a more organic look
+ "--slope-mode",
+
+ # Show nicer labels
+ "--dynamic-labels",
+
+ # Brand all generated graphs
+ "--watermark", _("Created by collecty"),
]
- args += self.collecty.graph_default_arguments
+
+ # Set the default dimensions
+ default_width, default_height = 960, 480
+
+ # A thumbnail doesn't have a legend and other labels
+ if thumbnail:
+ args.append("--only-graph")
+
+ default_width, default_height = 80, 20
+
+ args += [
+ "--imgformat", format,
+ "--height", "%s" % (height or default_height),
+ "--width", "%s" % (width or default_width),
+ ]
+
args += self.rrd_graph_args
- intervals = {
- None : "-3h",
- "hour" : "-1h",
- "day" : "-25h",
- "week" : "-360h",
- "year" : "-365d",
- }
+ # Graph title
+ if with_title and self.graph_title:
+ args += ["--title", self.graph_title]
- args.append("--start")
- try:
- args.append(intervals[interval])
- except KeyError:
- args.append(interval)
+ # Vertical label
+ if self.graph_vertical_label:
+ args += ["--vertical-label", self.graph_vertical_label]
- info = { "file" : self.ds.file }
- for item in self.rrd_graph:
- try:
- args.append(item % info)
- except TypeError:
- args.append(item)
+ if self.lower_limit is not None or self.upper_limit is not None:
+ # Force to honour the set limits
+ args.append("--rigid")
+
+ if self.lower_limit is not None:
+ args += ["--lower-limit", self.lower_limit]
+
+ if self.upper_limit is not None:
+ args += ["--upper-limit", self.upper_limit]
+
+ # Add interval
+ args += ["--start", util.make_interval(interval)]
+
+ return args
+
+ def _add_defs(self):
+ use_prefix = len(self.objects) >= 2
+
+ args = []
+ for object in self.objects:
+ if use_prefix:
+ args += object.make_rrd_defs(object.id)
+ else:
+ args += object.make_rrd_defs()
+
+ return args
+
+ def _add_vdefs(self, args):
+ ret = []
+
+ for arg in args:
+ ret.append(arg)
+
+ # Search for all DEFs and CDEFs
+ m = re.match(DEF_MATCH, "%s" % arg)
+ if m:
+ name = m.group(1)
+
+ # Add the VDEFs for minimum, maximum, etc. values
+ ret += [
+ "VDEF:%s_cur=%s,LAST" % (name, name),
+ "VDEF:%s_avg=%s,AVERAGE" % (name, name),
+ "VDEF:%s_max=%s,MAXIMUM" % (name, name),
+ "VDEF:%s_min=%s,MINIMUM" % (name, name),
+ ]
+
+ return ret
+
+ def get_objects(self, *args, **kwargs):
+ object = self.plugin.get_object(*args, **kwargs)
- rrdtool.graph(file, *args)
+ if object:
+ return [object,]
+
+ return []
+
+ def generate_graph(self, interval=None, **kwargs):
+ assert self.objects, "Cannot render graph without any objects"
+
+ # Make sure that all collected data is in the database
+ # to get a recent graph image
+ for object in self.objects:
+ object.commit()
+
+ args = self._make_command_line(interval, **kwargs)
+
+ self.log.info(_("Generating graph %s") % self)
+
+ rrd_graph = self.rrd_graph
+
+ # Add DEFs for all objects
+ if not any((e.startswith("DEF:") for e in rrd_graph)):
+ args += self._add_defs()
+
+ args += rrd_graph
+ args = self._add_vdefs(args)
+
+ # Convert arguments to string
+ args = [str(e) for e in args]
+
+ for arg in args:
+ self.log.debug(" %s" % arg)
+
+ graph = rrdtool.graphv("-", *args)
+
+ return {
+ "image" : graph.get("image"),
+ "image_height" : graph.get("image_height"),
+ "image_width" : graph.get("image_width"),
+ }
+
+ def graph_info(self):
+ """
+ Returns a dictionary with useful information
+ about this graph.
+ """
+ return {
+ "title" : self.graph_title,
+ "object_id" : self.object_id or "",
+ "template" : self.name,
+ }