Rewrite plugin architecture
[collecty.git] / src / collecty / plugins / base.py
1 #!/usr/bin/python
2 ###############################################################################
3 # #
4 # collecty - A system statistics collection daemon for IPFire #
5 # Copyright (C) 2012 IPFire development team #
6 # #
7 # This program is free software: you can redistribute it and/or modify #
8 # it under the terms of the GNU General Public License as published by #
9 # the Free Software Foundation, either version 3 of the License, or #
10 # (at your option) any later version. #
11 # #
12 # This program is distributed in the hope that it will be useful, #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15 # GNU General Public License for more details. #
16 # #
17 # You should have received a copy of the GNU General Public License #
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
19 # #
20 ###############################################################################
21
22 from __future__ import division
23
24 import datetime
25 import logging
26 import math
27 import os
28 import rrdtool
29 import threading
30 import time
31
32 from ..constants import *
33 from ..i18n import _
34
35 _plugins = {}
36
37 def get():
38 """
39 Returns a list with all automatically registered plugins.
40 """
41 return _plugins.values()
42
43 class Timer(object):
44 def __init__(self, timeout, heartbeat=1):
45 self.timeout = timeout
46 self.heartbeat = heartbeat
47
48 self.delay = 0
49
50 self.reset()
51
52 def reset(self, delay=0):
53 # Save start time.
54 self.start = time.time()
55
56 self.delay = delay
57
58 # Has this timer been killed?
59 self.killed = False
60
61 @property
62 def elapsed(self):
63 return time.time() - self.start - self.delay
64
65 def cancel(self):
66 self.killed = True
67
68 def wait(self):
69 while self.elapsed < self.timeout and not self.killed:
70 time.sleep(self.heartbeat)
71
72 return self.elapsed > self.timeout
73
74
75 class Plugin(threading.Thread):
76 # The name of this plugin.
77 name = None
78
79 # A description for this plugin.
80 description = None
81
82 # Templates which can be used to generate a graph out of
83 # the data from this data source.
84 templates = []
85
86 # The default interval for all plugins
87 interval = 60
88
89 # Automatically register all providers.
90 class __metaclass__(type):
91 def __init__(plugin, name, bases, dict):
92 type.__init__(plugin, name, bases, dict)
93
94 # The main class from which is inherited is not registered
95 # as a plugin.
96 if name == "Plugin":
97 return
98
99 if not all((plugin.name, plugin.description)):
100 raise RuntimeError(_("Plugin is not properly configured: %s") \
101 % plugin)
102
103 _plugins[plugin.name] = plugin
104
105 def __init__(self, collecty, **kwargs):
106 threading.Thread.__init__(self, name=self.description)
107 self.daemon = True
108
109 self.collecty = collecty
110
111 # Check if this plugin was configured correctly.
112 assert self.name, "Name of the plugin is not set: %s" % self.name
113 assert self.description, "Description of the plugin is not set: %s" % self.description
114
115 # Initialize the logger.
116 self.log = logging.getLogger("collecty.plugins.%s" % self.name)
117 self.log.propagate = 1
118
119 self.data = []
120
121 # Run some custom initialization.
122 self.init(**kwargs)
123
124 # Keepalive options
125 self.running = True
126 self.timer = Timer(self.interval)
127
128 self.log.info(_("Successfully initialized %s") % self.__class__.__name__)
129
130 @property
131 def path(self):
132 """
133 Returns the name of the sub directory in which all RRD files
134 for this plugin should be stored in.
135 """
136 return self.name
137
138 ### Basic methods
139
140 def init(self, **kwargs):
141 """
142 Do some custom initialization stuff here.
143 """
144 pass
145
146 def collect(self):
147 """
148 Gathers the statistical data, this plugin collects.
149 """
150 time_start = time.time()
151
152 # Run through all objects of this plugin and call the collect method.
153 for o in self.objects:
154 now = datetime.datetime.utcnow()
155 try:
156 result = o.collect()
157 except:
158 self.log.warning(_("Unhandled exception in %s.collect()") % o, exc_info=True)
159 continue
160
161 if not result:
162 self.log.warning(_("Received empty result: %s") % o)
163 continue
164
165 self.log.debug(_("Collected %s: %s") % (o, result))
166
167 # Add the object to the write queue so that the data is written
168 # to the databases later.
169 self.collecty.write_queue.add(o, now, result)
170
171 # Returns the time this function took to complete.
172 return (time.time() - time_start)
173
174 def run(self):
175 self.log.debug(_("%s plugin has started") % self.name)
176
177 # Initially collect everything
178 self.collect()
179
180 while self.running:
181 # Reset the timer.
182 self.timer.reset()
183
184 # Wait until the timer has successfully elapsed.
185 if self.timer.wait():
186 delay = self.collect()
187 self.timer.reset(delay)
188
189 self.log.debug(_("%s plugin has stopped") % self.name)
190
191 def shutdown(self):
192 self.log.debug(_("Received shutdown signal."))
193 self.running = False
194
195 # Kill any running timers.
196 if self.timer:
197 self.timer.cancel()
198
199
200 class Object(object):
201 # The schema of the RRD database.
202 rrd_schema = None
203
204 # RRA properties.
205 rra_types = ["AVERAGE", "MIN", "MAX"]
206 rra_timespans = [3600, 86400, 604800, 2678400, 31622400]
207 rra_rows = 2880
208
209 def __init__(self, plugin, *args, **kwargs):
210 self.plugin = plugin
211
212 # Indicates if this object has collected its data
213 self.collected = False
214
215 # Initialise this object
216 self.init(*args, **kwargs)
217
218 # Create the database file.
219 self.create()
220
221 def __repr__(self):
222 return "<%s>" % self.__class__.__name__
223
224 @property
225 def collecty(self):
226 return self.plugin.collecty
227
228 @property
229 def log(self):
230 return self.plugin.log
231
232 @property
233 def id(self):
234 """
235 Returns a UNIQUE identifier for this object. As this is incorporated
236 into the path of RRD file, it must only contain ASCII characters.
237 """
238 raise NotImplementedError
239
240 @property
241 def file(self):
242 """
243 The absolute path to the RRD file of this plugin.
244 """
245 return os.path.join(DATABASE_DIR, self.plugin.path, "%s.rrd" % self.id)
246
247 ### Basic methods
248
249 def init(self, *args, **kwargs):
250 """
251 Do some custom initialization stuff here.
252 """
253 pass
254
255 def create(self):
256 """
257 Creates an empty RRD file with the desired data structures.
258 """
259 # Skip if the file does already exist.
260 if os.path.exists(self.file):
261 return
262
263 dirname = os.path.dirname(self.file)
264 if not os.path.exists(dirname):
265 os.makedirs(dirname)
266
267 # Create argument list.
268 args = self.get_rrd_schema()
269
270 rrdtool.create(self.file, *args)
271
272 self.log.debug(_("Created RRD file %s.") % self.file)
273 for arg in args:
274 self.log.debug(" %s" % arg)
275
276 def info(self):
277 return rrdtool.info(self.file)
278
279 @property
280 def stepsize(self):
281 return self.plugin.interval
282
283 @property
284 def heartbeat(self):
285 return self.stepsize * 2
286
287 def get_rrd_schema(self):
288 schema = [
289 "--step", "%s" % self.stepsize,
290 ]
291 for line in self.rrd_schema:
292 if line.startswith("DS:"):
293 try:
294 (prefix, name, type, lower_limit, upper_limit) = line.split(":")
295
296 line = ":".join((
297 prefix,
298 name,
299 type,
300 "%s" % self.heartbeat,
301 lower_limit,
302 upper_limit
303 ))
304 except ValueError:
305 pass
306
307 schema.append(line)
308
309 xff = 0.1
310
311 cdp_length = 0
312 for rra_timespan in self.rra_timespans:
313 if (rra_timespan / self.stepsize) < self.rra_rows:
314 rra_timespan = self.stepsize * self.rra_rows
315
316 if cdp_length == 0:
317 cdp_length = 1
318 else:
319 cdp_length = rra_timespan // (self.rra_rows * self.stepsize)
320
321 cdp_number = math.ceil(rra_timespan / (cdp_length * self.stepsize))
322
323 for rra_type in self.rra_types:
324 schema.append("RRA:%s:%.10f:%d:%d" % \
325 (rra_type, xff, cdp_length, cdp_number))
326
327 return schema
328
329 def execute(self):
330 if self.collected:
331 raise RuntimeError("This object has already collected its data")
332
333 self.collected = True
334 self.now = datetime.datetime.utcnow()
335
336 # Call the collect
337 result = self.collect()
338
339 def commit(self):
340 """
341 Will commit the collected data to the database.
342 """
343 # Make sure that the RRD database has been created
344 self.create()
345
346
347 class GraphTemplate(object):
348 # A unique name to identify this graph template.
349 name = None
350
351 # Instructions how to create the graph.
352 rrd_graph = None
353
354 # Extra arguments passed to rrdgraph.
355 rrd_graph_args = []
356
357 def __init__(self, ds):
358 self.ds = ds
359
360 @property
361 def collecty(self):
362 return self.ds.collecty
363
364 def graph(self, file, interval=None,
365 width=GRAPH_DEFAULT_WIDTH, height=GRAPH_DEFAULT_HEIGHT):
366 args = [
367 "--width", "%d" % width,
368 "--height", "%d" % height,
369 ]
370 args += self.collecty.graph_default_arguments
371 args += self.rrd_graph_args
372
373 intervals = {
374 None : "-3h",
375 "hour" : "-1h",
376 "day" : "-25h",
377 "week" : "-360h",
378 "year" : "-365d",
379 }
380
381 args.append("--start")
382 try:
383 args.append(intervals[interval])
384 except KeyError:
385 args.append(interval)
386
387 info = { "file" : self.ds.file }
388 for item in self.rrd_graph:
389 try:
390 args.append(item % info)
391 except TypeError:
392 args.append(item)
393
394 rrdtool.graph(file, *args)