From 72364063b0582dd1b4bddf8557271eb231af846d Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Fri, 8 May 2015 14:20:02 +0000 Subject: [PATCH] Rewrite plugin architecture The plugin architecture has been rewritten so that there is now one thread for each plugin and not for each object any more. The objects are now processed in a queue and the results are stored in a write cache module that writes data to the RRD databases regularly or when needed. This enables us to achieve our goal for a zeroconf approach much better because every time data is collected it is scanned for the available disks or interfaces or what ever is monitored again. --- src/collecty/daemon.py | 158 +++++++++++++++---- src/collecty/plugins/base.py | 248 ++++++++++++++++-------------- src/collecty/plugins/cpu.py | 29 ++-- src/collecty/plugins/entropy.py | 40 +++-- src/collecty/plugins/interface.py | 70 +++++---- src/collecty/plugins/latency.py | 53 +++---- src/collecty/plugins/loadavg.py | 28 ++-- src/collecty/plugins/memory.py | 27 ++-- 8 files changed, 395 insertions(+), 258 deletions(-) diff --git a/src/collecty/daemon.py b/src/collecty/daemon.py index a66fa10..e678899 100644 --- a/src/collecty/daemon.py +++ b/src/collecty/daemon.py @@ -19,7 +19,11 @@ # # ############################################################################### +import Queue as queue +import rrdtool import signal +import threading +import time import plugins @@ -33,16 +37,23 @@ class Collecty(object): # The default interval, when all data is written to disk. SUBMIT_INTERVAL = 300 + HEARTBEAT = 5 + def __init__(self, debug=False): + self.debug = debug + # Enable debug logging when running in debug mode - if debug: + if self.debug: log.setLevel(logging.DEBUG) self.plugins = [] # Indicates whether this process should be running or not. self.running = True - self.timer = plugins.Timer(self.SUBMIT_INTERVAL, heartbeat=2) + + # The write queue holds all collected pieces of data which + # will be written to disk later. + self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL) # Add all plugins for plugin in plugins.get(): @@ -70,40 +81,32 @@ class Collecty(object): for p in self.plugins: p.start() + # Run the write queue thread + self.write_queue.start() + # Regularly submit all data to disk. while self.running: - if self.timer.wait(): - self.submit_all() - - # Wait until all instances are finished. - while self.plugins: - for p in self.plugins[:]: - if not p.isAlive(): - log.debug(_("%s is not alive anymore. Removing.") % p) - self.plugins.remove(p) - - # Wait a bit. - time.sleep(0.1) + try: + time.sleep(self.HEARTBEAT) + except KeyboardInterrupt: + self.shutdown() + break - log.debug(_("No thread running. Exiting main thread.")) - - def submit_all(self): - """ - Submit all data right now. - """ - log.debug(_("Submitting all data in memory")) + # Wait until all plugins are finished. for p in self.plugins: - p._submit() + p.join() - # Schedule the next submit. - self.timer.reset() + # Write all collected data to disk before ending the main thread + self.write_queue.shutdown() + + log.debug(_("Main thread exited")) def shutdown(self): - log.debug(_("Received shutdown signal")) + if not self.running: + return + log.debug(_("Received shutdown signal")) self.running = False - if self.timer: - self.timer.cancel() # Propagating shutdown to all threads. for p in self.plugins: @@ -123,9 +126,106 @@ class Collecty(object): self.shutdown() elif sig == signal.SIGUSR1: - # Submit all data. - self.submit_all() + # Commit all data. + self.write_queue.commit() @property def graph_default_arguments(self): return GRAPH_DEFAULT_ARGUMENTS + + +class WriteQueue(threading.Thread): + def __init__(self, collecty, submit_interval): + threading.Thread.__init__(self) + self.daemon = True + + self.collecty = collecty + + self.log = logging.getLogger("collecty.queue") + self.log.setLevel(logging.DEBUG) + self.log.propagate = 1 + + self.timer = plugins.Timer(submit_interval) + self._queue = queue.PriorityQueue() + + self.log.debug(_("Initialised write queue")) + + def run(self): + self.log.debug(_("Write queue process started")) + self.running = True + + while self.running: + # Reset the timer. + self.timer.reset() + + # Wait until the timer has successfully elapsed. + if self.timer.wait(): + self.commit() + + self.commit() + self.log.debug(_("Write queue process stopped")) + + def shutdown(self): + self.running = False + self.timer.cancel() + + # Wait until all data has been written. + self.join() + + def add(self, object, time, data): + result = QueueObject(object.file, time, data) + self._queue.put(result) + + def commit(self): + """ + Flushes the read data to disk. + """ + # There is nothing to do if the queue is empty + if self._queue.empty(): + self.log.debug(_("No data to commit")) + return + + time_start = time.time() + + self.log.debug(_("Submitting data to the databases...")) + + # Get all objects from the queue and group them by the RRD file + # to commit them all at once + results = {} + while not self._queue.empty(): + result = self._queue.get() + + try: + results[result.file].append(result) + except KeyError: + results[result.file] = [result] + + # Write the collected data to disk + for filename, results in results.items(): + self._commit_file(filename, results) + + duration = time.time() - time_start + self.log.debug(_("Emptied write queue in %.2fs") % duration) + + def _commit_file(self, filename, results): + self.log.debug(_("Committing %(counter)s entries to %(filename)s:") \ + % { "counter" : len(results), "filename" : filename }) + + if self.collecty.debug: + for result in results: + self.log.debug(" %s: %s" % (result.time, result.data)) + + rrdtool.update(filename, *["%s" % r for r in results]) + + +class QueueObject(object): + def __init__(self, file, time, data): + self.file = file + self.time = time + self.data = data + + def __str__(self): + return "%s:%s" % (self.time.strftime("%s"), self.data) + + def __cmp__(self, other): + return cmp(self.time, other.time) diff --git a/src/collecty/plugins/base.py b/src/collecty/plugins/base.py index 82b0982..94b0bc0 100644 --- a/src/collecty/plugins/base.py +++ b/src/collecty/plugins/base.py @@ -21,6 +21,7 @@ from __future__ import division +import datetime import logging import math import os @@ -82,16 +83,8 @@ class Plugin(threading.Thread): # the data from this data source. templates = [] - # The schema of the RRD database. - rrd_schema = None - - # RRA properties. - rra_types = ["AVERAGE", "MIN", "MAX"] - rra_timespans = [3600, 86400, 604800, 2678400, 31622400] - rra_rows = 2880 - - # The default interval of this plugin. - default_interval = 60 + # The default interval for all plugins + interval = 60 # Automatically register all providers. class __metaclass__(type): @@ -118,7 +111,6 @@ class Plugin(threading.Thread): # Check if this plugin was configured correctly. assert self.name, "Name of the plugin is not set: %s" % self.name assert self.description, "Description of the plugin is not set: %s" % self.description - assert self.rrd_schema # Initialize the logger. self.log = logging.getLogger("collecty.plugins.%s" % self.name) @@ -129,50 +121,136 @@ class Plugin(threading.Thread): # Run some custom initialization. self.init(**kwargs) - # Create the database file. - self.create() - # Keepalive options self.running = True self.timer = Timer(self.interval) - self.log.info(_("Successfully initialized (%s).") % self.id) - - def __repr__(self): - return "<%s %s>" % (self.__class__.__name__, self.id) + self.log.info(_("Successfully initialized %s") % self.__class__.__name__) @property - def id(self): + def path(self): """ - A unique ID of the plugin instance. + Returns the name of the sub directory in which all RRD files + for this plugin should be stored in. """ return self.name - @property - def interval(self): + ### Basic methods + + def init(self, **kwargs): """ - Returns the interval in milliseconds, when the read method - should be called again. + Do some custom initialization stuff here. """ - # XXX read this from the settings + pass + + def collect(self): + """ + Gathers the statistical data, this plugin collects. + """ + time_start = time.time() + + # Run through all objects of this plugin and call the collect method. + for o in self.objects: + now = datetime.datetime.utcnow() + try: + result = o.collect() + except: + self.log.warning(_("Unhandled exception in %s.collect()") % o, exc_info=True) + continue + + if not result: + self.log.warning(_("Received empty result: %s") % o) + continue + + self.log.debug(_("Collected %s: %s") % (o, result)) + + # Add the object to the write queue so that the data is written + # to the databases later. + self.collecty.write_queue.add(o, now, result) + + # Returns the time this function took to complete. + return (time.time() - time_start) + + def run(self): + self.log.debug(_("%s plugin has started") % self.name) + + # Initially collect everything + self.collect() + + while self.running: + # Reset the timer. + self.timer.reset() + + # Wait until the timer has successfully elapsed. + if self.timer.wait(): + delay = self.collect() + self.timer.reset(delay) + + self.log.debug(_("%s plugin has stopped") % self.name) + + def shutdown(self): + self.log.debug(_("Received shutdown signal.")) + self.running = False + + # Kill any running timers. + if self.timer: + self.timer.cancel() - # Otherwise return the default. - return self.default_interval + +class Object(object): + # The schema of the RRD database. + rrd_schema = None + + # RRA properties. + rra_types = ["AVERAGE", "MIN", "MAX"] + rra_timespans = [3600, 86400, 604800, 2678400, 31622400] + rra_rows = 2880 + + def __init__(self, plugin, *args, **kwargs): + self.plugin = plugin + + # Indicates if this object has collected its data + self.collected = False + + # Initialise this object + self.init(*args, **kwargs) + + # Create the database file. + self.create() + + def __repr__(self): + return "<%s>" % self.__class__.__name__ @property - def stepsize(self): - return self.interval + def collecty(self): + return self.plugin.collecty @property - def heartbeat(self): - return self.stepsize * 2 + def log(self): + return self.plugin.log + + @property + def id(self): + """ + Returns a UNIQUE identifier for this object. As this is incorporated + into the path of RRD file, it must only contain ASCII characters. + """ + raise NotImplementedError @property def file(self): """ The absolute path to the RRD file of this plugin. """ - return os.path.join(DATABASE_DIR, "%s.rrd" % self.id) + return os.path.join(DATABASE_DIR, self.plugin.path, "%s.rrd" % self.id) + + ### Basic methods + + def init(self, *args, **kwargs): + """ + Do some custom initialization stuff here. + """ + pass def create(self): """ @@ -195,6 +273,17 @@ class Plugin(threading.Thread): for arg in args: self.log.debug(" %s" % arg) + def info(self): + return rrdtool.info(self.file) + + @property + def stepsize(self): + return self.plugin.interval + + @property + def heartbeat(self): + return self.stepsize * 2 + def get_rrd_schema(self): schema = [ "--step", "%s" % self.stepsize, @@ -237,98 +326,23 @@ class Plugin(threading.Thread): return schema - def info(self): - return rrdtool.info(self.file) + def execute(self): + if self.collected: + raise RuntimeError("This object has already collected its data") - ### Basic methods + self.collected = True + self.now = datetime.datetime.utcnow() - def init(self, **kwargs): - """ - Do some custom initialization stuff here. - """ - pass + # Call the collect + result = self.collect() - def read(self): + def commit(self): """ - Gathers the statistical data, this plugin collects. + Will commit the collected data to the database. """ - raise NotImplementedError - - def submit(self): - """ - Flushes the read data to disk. - """ - # Do nothing in case there is no data to submit. - if not self.data: - return - - self.log.debug(_("Submitting data to database. %d entries.") % len(self.data)) - for data in self.data: - self.log.debug(" %s" % data) - - # Create the RRD files (if they don't exist yet or - # have vanished for some reason). + # Make sure that the RRD database has been created self.create() - rrdtool.update(self.file, *self.data) - self.data = [] - - def _read(self, *args, **kwargs): - """ - This method catches errors from the read() method and logs them. - """ - start_time = time.time() - - try: - data = self.read(*args, **kwargs) - if data is None: - self.log.warning(_("Received empty data.")) - else: - self.data.append("%d:%s" % (start_time, data)) - - # Catch any exceptions, so collecty does not crash. - except Exception, e: - self.log.critical(_("Unhandled exception in read()!"), exc_info=True) - - # Return the elapsed time since _read() has been called. - return (time.time() - start_time) - - def _submit(self, *args, **kwargs): - """ - This method catches errors from the submit() method and logs them. - """ - try: - return self.submit(*args, **kwargs) - - # Catch any exceptions, so collecty does not crash. - except Exception, e: - self.log.critical(_("Unhandled exception in submit()!"), exc_info=True) - - def run(self): - self.log.debug(_("Started.")) - - while self.running: - # Reset the timer. - self.timer.reset() - - # Wait until the timer has successfully elapsed. - if self.timer.wait(): - self.log.debug(_("Collecting...")) - delay = self._read() - - self.timer.reset(delay) - - self._submit() - self.log.debug(_("Stopped.")) - - def shutdown(self): - self.log.debug(_("Received shutdown signal.")) - self.running = False - - # Kill any running timers. - if self.timer: - self.timer.cancel() - class GraphTemplate(object): # A unique name to identify this graph template. diff --git a/src/collecty/plugins/cpu.py b/src/collecty/plugins/cpu.py index 1d26dcb..c9825b7 100644 --- a/src/collecty/plugins/cpu.py +++ b/src/collecty/plugins/cpu.py @@ -102,12 +102,7 @@ class GraphTemplateCPU(base.GraphTemplate): ] -class ProcessorPlugin(base.Plugin): - name = "cpu" - description = "CPU Usage Data Source" - - templates = [GraphTemplateCPU,] - +class ProcessorObject(base.Object): rrd_schema = [ "DS:user:DERIVE:0:U", "DS:nice:DERIVE:0:U", @@ -118,12 +113,11 @@ class ProcessorPlugin(base.Plugin): "DS:sirq:DERIVE:0:U", ] - @classmethod - def autocreate(cls, collecty, **kwargs): - # Every system has got at least one CPU. - return cls(collecty, **kwargs) + @property + def id(self): + return "default" - def read(self): + def collect(self): """ Reads the CPU usage. """ @@ -154,3 +148,16 @@ class ProcessorPlugin(base.Plugin): finally: if f: f.close() + + +class ProcessorPlugin(base.Plugin): + name = "processor" + description = "Processor Usage Plugin" + + templates = [GraphTemplateCPU,] + + interval = 30 + + @property + def objects(self): + yield ProcessorObject(self) diff --git a/src/collecty/plugins/entropy.py b/src/collecty/plugins/entropy.py index 7966e8b..f800f9a 100644 --- a/src/collecty/plugins/entropy.py +++ b/src/collecty/plugins/entropy.py @@ -53,32 +53,30 @@ class GraphTemplateEntropy(base.GraphTemplate): ] -class EntropyPlugin(base.Plugin): - name = "entropy" - description = "Entropy Data Source" - - templates = [GraphTemplateEntropy,] - +class EntropyObject(base.Object): rrd_schema = [ "DS:entropy:GAUGE:0:U", ] - @classmethod - def autocreate(cls, collecty, **kwargs): - if not os.path.exists(ENTROPY_FILE): - self.log.debug(_("Entropy kernel interface does not exist.")) - return + @property + def id(self): + return "default" - return cls(collecty, **kwargs) + def collect(self): + with open(ENTROPY_FILE) as f: + return f.readline().strip() - def read(self): - f = None - try: - f = open(ENTROPY_FILE) - entropy = f.readline() +class EntropyPlugin(base.Plugin): + name = "entropy" + description = "Entropy Plugin" + + templates = [GraphTemplateEntropy,] + + @property + def objects(self): + if not os.path.exists(ENTROPY_FILE): + self.log.debug(_("Entropy kernel interface does not exist")) + return [] - return entropy.strip() - finally: - if f: - f.close() + return [EntropyObject(self)] diff --git a/src/collecty/plugins/interface.py b/src/collecty/plugins/interface.py index fbf715a..4ab5f01 100644 --- a/src/collecty/plugins/interface.py +++ b/src/collecty/plugins/interface.py @@ -197,16 +197,7 @@ class GraphTemplateInterfaceErrors(base.GraphTemplate): ] -class InterfacePlugin(base.Plugin): - name = "interface" - description = "Interface Statistics Data Source" - - templates = [ - GraphTemplateInterfaceBits, - GraphTemplateInterfacePackets, - GraphTemplateInterfaceErrors, - ] - +class InterfaceObject(base.Object): rrd_schema = [ "DS:bytes_rx:DERIVE:0:U", "DS:bytes_tx:DERIVE:0:U", @@ -220,34 +211,17 @@ class InterfacePlugin(base.Plugin): "DS:packets_tx:DERIVE:0:U", ] - @classmethod - def autocreate(cls, collecty, **kwargs): - if not os.path.exists(SYS_CLASS_NET): - return - - instances = [] - for interface in os.listdir(SYS_CLASS_NET): - # Skip some unwanted interfaces. - if interface == "lo" or interface.startswith("mon."): - continue + def __repr__(self): + return "<%s %s>" % (self.__class__.__name__, self.interface) - path = os.path.join(SYS_CLASS_NET, interface) - if not os.path.isdir(path): - continue - - instance = cls(collecty, interface=interface) - instances.append(instance) - - return instances - - def init(self, **kwargs): - self.interface = kwargs.get("interface") + def init(self, interface): + self.interface = interface @property def id(self): - return "-".join((self.name, self.interface)) + return self.interface - def read(self): + def collect(self): interface_path = os.path.join(SYS_CLASS_NET, self.interface) # Check if the interface exists. @@ -286,3 +260,33 @@ class InterfacePlugin(base.Plugin): f.close() return ":".join(ret) + + +class InterfacePlugin(base.Plugin): + name = "interface" + description = "Interface Statistics Plugin" + + templates = [ + GraphTemplateInterfaceBits, + GraphTemplateInterfacePackets, + GraphTemplateInterfaceErrors, + ] + + interval = 20 + + def get_interfaces(self): + for interface in os.listdir(SYS_CLASS_NET): + # Skip some unwanted interfaces. + if interface == "lo" or interface.startswith("mon."): + continue + + path = os.path.join(SYS_CLASS_NET, interface) + if not os.path.isdir(path): + continue + + yield interface + + @property + def objects(self): + for interface in self.get_interfaces(): + yield InterfaceObject(self, interface=interface) diff --git a/src/collecty/plugins/latency.py b/src/collecty/plugins/latency.py index b269221..98e6936 100644 --- a/src/collecty/plugins/latency.py +++ b/src/collecty/plugins/latency.py @@ -82,48 +82,33 @@ class GraphTemplateLatency(base.GraphTemplate): ] -class LatencyPlugin(base.Plugin): - name = "latency" - description = "Latency (ICMP ping) Data Source" - - templates = [GraphTemplateLatency,] - +class LatencyObject(base.Object): rrd_schema = [ "DS:latency:GAUGE:0:U", "DS:latency_loss:GAUGE:0:100", "DS:latency_stddev:GAUGE:0:U", ] - @property - def id(self): - return "-".join((self.name, self.host)) - - @classmethod - def autocreate(cls, collecty, **kwargs): - ret = [] - for host in PING_HOSTS: - ds = cls(collecty, host=host, **kwargs) - ret.append(ds) + def __repr__(self): + return "<%s %s>" % (self.__class__.__name__, self.hostname) - return ret - - def init(self, **kwargs): - self.host = kwargs.get("host") - assert self.host + def init(self, hostname, deadline=None): + self.hostname = hostname + self.deadline = deadline @property - def deadline(self): - return self.interval - 10 + def id(self): + return self.hostname - def read(self): + def collect(self): # Send up to five ICMP echo requests. try: - ping = collecty.ping.Ping(destination=self.host, timeout=20000) + ping = collecty.ping.Ping(destination=self.hostname, timeout=20000) ping.run(count=5, deadline=self.deadline) except collecty.ping.PingError, e: self.log.warning(_("Could not run latency check for %(host)s: %(msg)s") \ - % { "host" : self.host, "msg" : e.msg }) + % { "host" : self.hostname, "msg" : e.msg }) return return ":".join(( @@ -131,3 +116,19 @@ class LatencyPlugin(base.Plugin): "%.10f" % ping.loss, "%.10f" % ping.stddev, )) + + +class LatencyPlugin(base.Plugin): + name = "latency" + description = "Latency (ICMP ping) Plugin" + + templates = [GraphTemplateLatency,] + + interval = 60 + + @property + def objects(self): + deadline = self.interval / len(PING_HOSTS) + + for hostname in PING_HOSTS: + yield LatencyObject(self, hostname, deadline=deadline) diff --git a/src/collecty/plugins/loadavg.py b/src/collecty/plugins/loadavg.py index c14145f..b5550eb 100644 --- a/src/collecty/plugins/loadavg.py +++ b/src/collecty/plugins/loadavg.py @@ -69,21 +69,29 @@ class GraphTemplateLoadAvg(base.GraphTemplate): ] -class LoadAvgPlugin(base.Plugin): - name = "loadavg" - description = "Load Average Data Source" - - templates = [GraphTemplateLoadAvg,] - +class LoadAvgObject(base.Object): rrd_schema = [ "DS:load1:GAUGE:0:U", "DS:load5:GAUGE:0:U", "DS:load15:GAUGE:0:U", ] - @classmethod - def autocreate(cls, collecty, **kwargs): - return cls(collecty, **kwargs) + @property + def id(self): + return "default" - def read(self): + def collect(self): return ":".join(["%.10f" % l for l in os.getloadavg()]) + + +class LoadAvgPlugin(base.Plugin): + name = "loadavg" + description = "Load Average Plugin" + + templates = [GraphTemplateLoadAvg,] + + interval = 30 + + @property + def objects(self): + return [LoadAvgObject(self)] diff --git a/src/collecty/plugins/memory.py b/src/collecty/plugins/memory.py index e66e223..b58cbca 100644 --- a/src/collecty/plugins/memory.py +++ b/src/collecty/plugins/memory.py @@ -87,12 +87,7 @@ class GraphTemplateMemory(base.GraphTemplate): ] -class MemoryPlugin(base.Plugin): - name = "memory" - description = "Memory Usage Data Source" - - templates = [GraphTemplateMemory,] - +class MemoryObject(base.Object): rrd_schema = [ "DS:used:GAUGE:0:100", "DS:cached:GAUGE:0:100", @@ -101,12 +96,11 @@ class MemoryPlugin(base.Plugin): "DS:swap:GAUGE:0:100", ] - @classmethod - def autocreate(cls, collecty, **kwargs): - # Every system has got memory. - return cls(collecty, **kwargs) + @property + def id(self): + return "default" - def read(self): + def collect(self): f = None try: @@ -141,3 +135,14 @@ class MemoryPlugin(base.Plugin): finally: if f: f.close() + + +class MemoryPlugin(base.Plugin): + name = "memory" + description = "Memory Usage Plugin" + + templates = [GraphTemplateMemory,] + + @property + def objects(self): + yield MemoryObject(self) -- 2.39.2