# #
###############################################################################
+import Queue as queue
+import rrdtool
import signal
+import threading
+import time
import plugins
# The default interval, when all data is written to disk.
SUBMIT_INTERVAL = 300
+ HEARTBEAT = 5
+
def __init__(self, debug=False):
+ self.debug = debug
+
# Enable debug logging when running in debug mode
- if debug:
+ if self.debug:
log.setLevel(logging.DEBUG)
self.plugins = []
# Indicates whether this process should be running or not.
self.running = True
- self.timer = plugins.Timer(self.SUBMIT_INTERVAL, heartbeat=2)
+
+ # The write queue holds all collected pieces of data which
+ # will be written to disk later.
+ self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
# Add all plugins
for plugin in plugins.get():
for p in self.plugins:
p.start()
+ # Run the write queue thread
+ self.write_queue.start()
+
# Regularly submit all data to disk.
while self.running:
- if self.timer.wait():
- self.submit_all()
-
- # Wait until all instances are finished.
- while self.plugins:
- for p in self.plugins[:]:
- if not p.isAlive():
- log.debug(_("%s is not alive anymore. Removing.") % p)
- self.plugins.remove(p)
-
- # Wait a bit.
- time.sleep(0.1)
+ try:
+ time.sleep(self.HEARTBEAT)
+ except KeyboardInterrupt:
+ self.shutdown()
+ break
- log.debug(_("No thread running. Exiting main thread."))
-
- def submit_all(self):
- """
- Submit all data right now.
- """
- log.debug(_("Submitting all data in memory"))
+ # Wait until all plugins are finished.
for p in self.plugins:
- p._submit()
+ p.join()
- # Schedule the next submit.
- self.timer.reset()
+ # Write all collected data to disk before ending the main thread
+ self.write_queue.shutdown()
+
+ log.debug(_("Main thread exited"))
def shutdown(self):
- log.debug(_("Received shutdown signal"))
+ if not self.running:
+ return
+ log.debug(_("Received shutdown signal"))
self.running = False
- if self.timer:
- self.timer.cancel()
# Propagating shutdown to all threads.
for p in self.plugins:
self.shutdown()
elif sig == signal.SIGUSR1:
- # Submit all data.
- self.submit_all()
+ # Commit all data.
+ self.write_queue.commit()
@property
def graph_default_arguments(self):
return GRAPH_DEFAULT_ARGUMENTS
+
+
+class WriteQueue(threading.Thread):
+ def __init__(self, collecty, submit_interval):
+ threading.Thread.__init__(self)
+ self.daemon = True
+
+ self.collecty = collecty
+
+ self.log = logging.getLogger("collecty.queue")
+ self.log.setLevel(logging.DEBUG)
+ self.log.propagate = 1
+
+ self.timer = plugins.Timer(submit_interval)
+ self._queue = queue.PriorityQueue()
+
+ self.log.debug(_("Initialised write queue"))
+
+ def run(self):
+ self.log.debug(_("Write queue process started"))
+ self.running = True
+
+ while self.running:
+ # Reset the timer.
+ self.timer.reset()
+
+ # Wait until the timer has successfully elapsed.
+ if self.timer.wait():
+ self.commit()
+
+ self.commit()
+ self.log.debug(_("Write queue process stopped"))
+
+ def shutdown(self):
+ self.running = False
+ self.timer.cancel()
+
+ # Wait until all data has been written.
+ self.join()
+
+ def add(self, object, time, data):
+ result = QueueObject(object.file, time, data)
+ self._queue.put(result)
+
+ def commit(self):
+ """
+ Flushes the read data to disk.
+ """
+ # There is nothing to do if the queue is empty
+ if self._queue.empty():
+ self.log.debug(_("No data to commit"))
+ return
+
+ time_start = time.time()
+
+ self.log.debug(_("Submitting data to the databases..."))
+
+ # Get all objects from the queue and group them by the RRD file
+ # to commit them all at once
+ results = {}
+ while not self._queue.empty():
+ result = self._queue.get()
+
+ try:
+ results[result.file].append(result)
+ except KeyError:
+ results[result.file] = [result]
+
+ # Write the collected data to disk
+ for filename, results in results.items():
+ self._commit_file(filename, results)
+
+ duration = time.time() - time_start
+ self.log.debug(_("Emptied write queue in %.2fs") % duration)
+
+ def _commit_file(self, filename, results):
+ self.log.debug(_("Committing %(counter)s entries to %(filename)s:") \
+ % { "counter" : len(results), "filename" : filename })
+
+ if self.collecty.debug:
+ for result in results:
+ self.log.debug(" %s: %s" % (result.time, result.data))
+
+ rrdtool.update(filename, *["%s" % r for r in results])
+
+
+class QueueObject(object):
+ def __init__(self, file, time, data):
+ self.file = file
+ self.time = time
+ self.data = data
+
+ def __str__(self):
+ return "%s:%s" % (self.time.strftime("%s"), self.data)
+
+ def __cmp__(self, other):
+ return cmp(self.time, other.time)
from __future__ import division
+import datetime
import logging
import math
import os
# the data from this data source.
templates = []
- # The schema of the RRD database.
- rrd_schema = None
-
- # RRA properties.
- rra_types = ["AVERAGE", "MIN", "MAX"]
- rra_timespans = [3600, 86400, 604800, 2678400, 31622400]
- rra_rows = 2880
-
- # The default interval of this plugin.
- default_interval = 60
+ # The default interval for all plugins
+ interval = 60
# Automatically register all providers.
class __metaclass__(type):
# Check if this plugin was configured correctly.
assert self.name, "Name of the plugin is not set: %s" % self.name
assert self.description, "Description of the plugin is not set: %s" % self.description
- assert self.rrd_schema
# Initialize the logger.
self.log = logging.getLogger("collecty.plugins.%s" % self.name)
# Run some custom initialization.
self.init(**kwargs)
- # Create the database file.
- self.create()
-
# Keepalive options
self.running = True
self.timer = Timer(self.interval)
- self.log.info(_("Successfully initialized (%s).") % self.id)
-
- def __repr__(self):
- return "<%s %s>" % (self.__class__.__name__, self.id)
+ self.log.info(_("Successfully initialized %s") % self.__class__.__name__)
@property
- def id(self):
+ def path(self):
"""
- A unique ID of the plugin instance.
+ Returns the name of the sub directory in which all RRD files
+ for this plugin should be stored in.
"""
return self.name
- @property
- def interval(self):
+ ### Basic methods
+
+ def init(self, **kwargs):
"""
- Returns the interval in milliseconds, when the read method
- should be called again.
+ Do some custom initialization stuff here.
"""
- # XXX read this from the settings
+ pass
+
+ def collect(self):
+ """
+ Gathers the statistical data, this plugin collects.
+ """
+ time_start = time.time()
+
+ # Run through all objects of this plugin and call the collect method.
+ for o in self.objects:
+ now = datetime.datetime.utcnow()
+ try:
+ result = o.collect()
+ except:
+ self.log.warning(_("Unhandled exception in %s.collect()") % o, exc_info=True)
+ continue
+
+ if not result:
+ self.log.warning(_("Received empty result: %s") % o)
+ continue
+
+ self.log.debug(_("Collected %s: %s") % (o, result))
+
+ # Add the object to the write queue so that the data is written
+ # to the databases later.
+ self.collecty.write_queue.add(o, now, result)
+
+ # Returns the time this function took to complete.
+ return (time.time() - time_start)
+
+ def run(self):
+ self.log.debug(_("%s plugin has started") % self.name)
+
+ # Initially collect everything
+ self.collect()
+
+ while self.running:
+ # Reset the timer.
+ self.timer.reset()
+
+ # Wait until the timer has successfully elapsed.
+ if self.timer.wait():
+ delay = self.collect()
+ self.timer.reset(delay)
+
+ self.log.debug(_("%s plugin has stopped") % self.name)
+
+ def shutdown(self):
+ self.log.debug(_("Received shutdown signal."))
+ self.running = False
+
+ # Kill any running timers.
+ if self.timer:
+ self.timer.cancel()
- # Otherwise return the default.
- return self.default_interval
+
+class Object(object):
+ # The schema of the RRD database.
+ rrd_schema = None
+
+ # RRA properties.
+ rra_types = ["AVERAGE", "MIN", "MAX"]
+ rra_timespans = [3600, 86400, 604800, 2678400, 31622400]
+ rra_rows = 2880
+
+ def __init__(self, plugin, *args, **kwargs):
+ self.plugin = plugin
+
+ # Indicates if this object has collected its data
+ self.collected = False
+
+ # Initialise this object
+ self.init(*args, **kwargs)
+
+ # Create the database file.
+ self.create()
+
+ def __repr__(self):
+ return "<%s>" % self.__class__.__name__
@property
- def stepsize(self):
- return self.interval
+ def collecty(self):
+ return self.plugin.collecty
@property
- def heartbeat(self):
- return self.stepsize * 2
+ def log(self):
+ return self.plugin.log
+
+ @property
+ def id(self):
+ """
+ Returns a UNIQUE identifier for this object. As this is incorporated
+ into the path of RRD file, it must only contain ASCII characters.
+ """
+ raise NotImplementedError
@property
def file(self):
"""
The absolute path to the RRD file of this plugin.
"""
- return os.path.join(DATABASE_DIR, "%s.rrd" % self.id)
+ return os.path.join(DATABASE_DIR, self.plugin.path, "%s.rrd" % self.id)
+
+ ### Basic methods
+
+ def init(self, *args, **kwargs):
+ """
+ Do some custom initialization stuff here.
+ """
+ pass
def create(self):
"""
for arg in args:
self.log.debug(" %s" % arg)
+ def info(self):
+ return rrdtool.info(self.file)
+
+ @property
+ def stepsize(self):
+ return self.plugin.interval
+
+ @property
+ def heartbeat(self):
+ return self.stepsize * 2
+
def get_rrd_schema(self):
schema = [
"--step", "%s" % self.stepsize,
return schema
- def info(self):
- return rrdtool.info(self.file)
+ def execute(self):
+ if self.collected:
+ raise RuntimeError("This object has already collected its data")
- ### Basic methods
+ self.collected = True
+ self.now = datetime.datetime.utcnow()
- def init(self, **kwargs):
- """
- Do some custom initialization stuff here.
- """
- pass
+ # Call the collect
+ result = self.collect()
- def read(self):
+ def commit(self):
"""
- Gathers the statistical data, this plugin collects.
+ Will commit the collected data to the database.
"""
- raise NotImplementedError
-
- def submit(self):
- """
- Flushes the read data to disk.
- """
- # Do nothing in case there is no data to submit.
- if not self.data:
- return
-
- self.log.debug(_("Submitting data to database. %d entries.") % len(self.data))
- for data in self.data:
- self.log.debug(" %s" % data)
-
- # Create the RRD files (if they don't exist yet or
- # have vanished for some reason).
+ # Make sure that the RRD database has been created
self.create()
- rrdtool.update(self.file, *self.data)
- self.data = []
-
- def _read(self, *args, **kwargs):
- """
- This method catches errors from the read() method and logs them.
- """
- start_time = time.time()
-
- try:
- data = self.read(*args, **kwargs)
- if data is None:
- self.log.warning(_("Received empty data."))
- else:
- self.data.append("%d:%s" % (start_time, data))
-
- # Catch any exceptions, so collecty does not crash.
- except Exception, e:
- self.log.critical(_("Unhandled exception in read()!"), exc_info=True)
-
- # Return the elapsed time since _read() has been called.
- return (time.time() - start_time)
-
- def _submit(self, *args, **kwargs):
- """
- This method catches errors from the submit() method and logs them.
- """
- try:
- return self.submit(*args, **kwargs)
-
- # Catch any exceptions, so collecty does not crash.
- except Exception, e:
- self.log.critical(_("Unhandled exception in submit()!"), exc_info=True)
-
- def run(self):
- self.log.debug(_("Started."))
-
- while self.running:
- # Reset the timer.
- self.timer.reset()
-
- # Wait until the timer has successfully elapsed.
- if self.timer.wait():
- self.log.debug(_("Collecting..."))
- delay = self._read()
-
- self.timer.reset(delay)
-
- self._submit()
- self.log.debug(_("Stopped."))
-
- def shutdown(self):
- self.log.debug(_("Received shutdown signal."))
- self.running = False
-
- # Kill any running timers.
- if self.timer:
- self.timer.cancel()
-
class GraphTemplate(object):
# A unique name to identify this graph template.
]
-class ProcessorPlugin(base.Plugin):
- name = "cpu"
- description = "CPU Usage Data Source"
-
- templates = [GraphTemplateCPU,]
-
+class ProcessorObject(base.Object):
rrd_schema = [
"DS:user:DERIVE:0:U",
"DS:nice:DERIVE:0:U",
"DS:sirq:DERIVE:0:U",
]
- @classmethod
- def autocreate(cls, collecty, **kwargs):
- # Every system has got at least one CPU.
- return cls(collecty, **kwargs)
+ @property
+ def id(self):
+ return "default"
- def read(self):
+ def collect(self):
"""
Reads the CPU usage.
"""
finally:
if f:
f.close()
+
+
+class ProcessorPlugin(base.Plugin):
+ name = "processor"
+ description = "Processor Usage Plugin"
+
+ templates = [GraphTemplateCPU,]
+
+ interval = 30
+
+ @property
+ def objects(self):
+ yield ProcessorObject(self)
]
-class EntropyPlugin(base.Plugin):
- name = "entropy"
- description = "Entropy Data Source"
-
- templates = [GraphTemplateEntropy,]
-
+class EntropyObject(base.Object):
rrd_schema = [
"DS:entropy:GAUGE:0:U",
]
- @classmethod
- def autocreate(cls, collecty, **kwargs):
- if not os.path.exists(ENTROPY_FILE):
- self.log.debug(_("Entropy kernel interface does not exist."))
- return
+ @property
+ def id(self):
+ return "default"
- return cls(collecty, **kwargs)
+ def collect(self):
+ with open(ENTROPY_FILE) as f:
+ return f.readline().strip()
- def read(self):
- f = None
- try:
- f = open(ENTROPY_FILE)
- entropy = f.readline()
+class EntropyPlugin(base.Plugin):
+ name = "entropy"
+ description = "Entropy Plugin"
+
+ templates = [GraphTemplateEntropy,]
+
+ @property
+ def objects(self):
+ if not os.path.exists(ENTROPY_FILE):
+ self.log.debug(_("Entropy kernel interface does not exist"))
+ return []
- return entropy.strip()
- finally:
- if f:
- f.close()
+ return [EntropyObject(self)]
]
-class InterfacePlugin(base.Plugin):
- name = "interface"
- description = "Interface Statistics Data Source"
-
- templates = [
- GraphTemplateInterfaceBits,
- GraphTemplateInterfacePackets,
- GraphTemplateInterfaceErrors,
- ]
-
+class InterfaceObject(base.Object):
rrd_schema = [
"DS:bytes_rx:DERIVE:0:U",
"DS:bytes_tx:DERIVE:0:U",
"DS:packets_tx:DERIVE:0:U",
]
- @classmethod
- def autocreate(cls, collecty, **kwargs):
- if not os.path.exists(SYS_CLASS_NET):
- return
-
- instances = []
- for interface in os.listdir(SYS_CLASS_NET):
- # Skip some unwanted interfaces.
- if interface == "lo" or interface.startswith("mon."):
- continue
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.interface)
- path = os.path.join(SYS_CLASS_NET, interface)
- if not os.path.isdir(path):
- continue
-
- instance = cls(collecty, interface=interface)
- instances.append(instance)
-
- return instances
-
- def init(self, **kwargs):
- self.interface = kwargs.get("interface")
+ def init(self, interface):
+ self.interface = interface
@property
def id(self):
- return "-".join((self.name, self.interface))
+ return self.interface
- def read(self):
+ def collect(self):
interface_path = os.path.join(SYS_CLASS_NET, self.interface)
# Check if the interface exists.
f.close()
return ":".join(ret)
+
+
+class InterfacePlugin(base.Plugin):
+ name = "interface"
+ description = "Interface Statistics Plugin"
+
+ templates = [
+ GraphTemplateInterfaceBits,
+ GraphTemplateInterfacePackets,
+ GraphTemplateInterfaceErrors,
+ ]
+
+ interval = 20
+
+ def get_interfaces(self):
+ for interface in os.listdir(SYS_CLASS_NET):
+ # Skip some unwanted interfaces.
+ if interface == "lo" or interface.startswith("mon."):
+ continue
+
+ path = os.path.join(SYS_CLASS_NET, interface)
+ if not os.path.isdir(path):
+ continue
+
+ yield interface
+
+ @property
+ def objects(self):
+ for interface in self.get_interfaces():
+ yield InterfaceObject(self, interface=interface)
]
-class LatencyPlugin(base.Plugin):
- name = "latency"
- description = "Latency (ICMP ping) Data Source"
-
- templates = [GraphTemplateLatency,]
-
+class LatencyObject(base.Object):
rrd_schema = [
"DS:latency:GAUGE:0:U",
"DS:latency_loss:GAUGE:0:100",
"DS:latency_stddev:GAUGE:0:U",
]
- @property
- def id(self):
- return "-".join((self.name, self.host))
-
- @classmethod
- def autocreate(cls, collecty, **kwargs):
- ret = []
- for host in PING_HOSTS:
- ds = cls(collecty, host=host, **kwargs)
- ret.append(ds)
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.hostname)
- return ret
-
- def init(self, **kwargs):
- self.host = kwargs.get("host")
- assert self.host
+ def init(self, hostname, deadline=None):
+ self.hostname = hostname
+ self.deadline = deadline
@property
- def deadline(self):
- return self.interval - 10
+ def id(self):
+ return self.hostname
- def read(self):
+ def collect(self):
# Send up to five ICMP echo requests.
try:
- ping = collecty.ping.Ping(destination=self.host, timeout=20000)
+ ping = collecty.ping.Ping(destination=self.hostname, timeout=20000)
ping.run(count=5, deadline=self.deadline)
except collecty.ping.PingError, e:
self.log.warning(_("Could not run latency check for %(host)s: %(msg)s") \
- % { "host" : self.host, "msg" : e.msg })
+ % { "host" : self.hostname, "msg" : e.msg })
return
return ":".join((
"%.10f" % ping.loss,
"%.10f" % ping.stddev,
))
+
+
+class LatencyPlugin(base.Plugin):
+ name = "latency"
+ description = "Latency (ICMP ping) Plugin"
+
+ templates = [GraphTemplateLatency,]
+
+ interval = 60
+
+ @property
+ def objects(self):
+ deadline = self.interval / len(PING_HOSTS)
+
+ for hostname in PING_HOSTS:
+ yield LatencyObject(self, hostname, deadline=deadline)
]
-class LoadAvgPlugin(base.Plugin):
- name = "loadavg"
- description = "Load Average Data Source"
-
- templates = [GraphTemplateLoadAvg,]
-
+class LoadAvgObject(base.Object):
rrd_schema = [
"DS:load1:GAUGE:0:U",
"DS:load5:GAUGE:0:U",
"DS:load15:GAUGE:0:U",
]
- @classmethod
- def autocreate(cls, collecty, **kwargs):
- return cls(collecty, **kwargs)
+ @property
+ def id(self):
+ return "default"
- def read(self):
+ def collect(self):
return ":".join(["%.10f" % l for l in os.getloadavg()])
+
+
+class LoadAvgPlugin(base.Plugin):
+ name = "loadavg"
+ description = "Load Average Plugin"
+
+ templates = [GraphTemplateLoadAvg,]
+
+ interval = 30
+
+ @property
+ def objects(self):
+ return [LoadAvgObject(self)]
]
-class MemoryPlugin(base.Plugin):
- name = "memory"
- description = "Memory Usage Data Source"
-
- templates = [GraphTemplateMemory,]
-
+class MemoryObject(base.Object):
rrd_schema = [
"DS:used:GAUGE:0:100",
"DS:cached:GAUGE:0:100",
"DS:swap:GAUGE:0:100",
]
- @classmethod
- def autocreate(cls, collecty, **kwargs):
- # Every system has got memory.
- return cls(collecty, **kwargs)
+ @property
+ def id(self):
+ return "default"
- def read(self):
+ def collect(self):
f = None
try:
finally:
if f:
f.close()
+
+
+class MemoryPlugin(base.Plugin):
+ name = "memory"
+ description = "Memory Usage Plugin"
+
+ templates = [GraphTemplateMemory,]
+
+ @property
+ def objects(self):
+ yield MemoryObject(self)