Implement worker thread concept
[collecty.git] / src / collecty / plugins / base.py
1 #!/usr/bin/python
2 ###############################################################################
3 #                                                                             #
4 # collecty - A system statistics collection daemon for IPFire                 #
5 # Copyright (C) 2012 IPFire development team                                  #
6 #                                                                             #
7 # This program is free software: you can redistribute it and/or modify        #
8 # it under the terms of the GNU General Public License as published by        #
9 # the Free Software Foundation, either version 3 of the License, or           #
10 # (at your option) any later version.                                         #
11 #                                                                             #
12 # This program is distributed in the hope that it will be useful,             #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of              #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
15 # GNU General Public License for more details.                                #
16 #                                                                             #
17 # You should have received a copy of the GNU General Public License           #
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
19 #                                                                             #
20 ###############################################################################
21
22 from __future__ import division
23
24 import datetime
25 import logging
26 import math
27 import os
28 import rrdtool
29 import tempfile
30 import threading
31 import time
32
33 from ..constants import *
34 from ..i18n import _
35
36 _plugins = {}
37
38 def get():
39         """
40                 Returns a list with all automatically registered plugins.
41         """
42         return _plugins.values()
43
44 class Timer(object):
45         def __init__(self, timeout, heartbeat=1):
46                 self.timeout = timeout
47                 self.heartbeat = heartbeat
48
49                 self.delay = 0
50
51                 self.reset()
52
53         def reset(self, delay=0):
54                 # Save start time.
55                 self.start = time.time()
56
57                 self.delay = delay
58
59                 # Has this timer been killed?
60                 self.killed = False
61
62         @property
63         def elapsed(self):
64                 return time.time() - self.start - self.delay
65
66         def cancel(self):
67                 self.killed = True
68
69         def wait(self):
70                 while self.elapsed < self.timeout and not self.killed:
71                         time.sleep(self.heartbeat)
72
73                 return self.elapsed > self.timeout
74
75
76 class Plugin(object):
77         # The name of this plugin.
78         name = None
79
80         # A description for this plugin.
81         description = None
82
83         # Templates which can be used to generate a graph out of
84         # the data from this data source.
85         templates = []
86
87         # The default interval for all plugins
88         interval = 60
89
90         # Automatically register all providers.
91         class __metaclass__(type):
92                 def __init__(plugin, name, bases, dict):
93                         type.__init__(plugin, name, bases, dict)
94
95                         # The main class from which is inherited is not registered
96                         # as a plugin.
97                         if name == "Plugin":
98                                 return
99
100                         if not all((plugin.name, plugin.description)):
101                                 raise RuntimeError(_("Plugin is not properly configured: %s") \
102                                         % plugin)
103
104                         _plugins[plugin.name] = plugin
105
106         def __init__(self, collecty, **kwargs):
107                 self.collecty = collecty
108
109                 # Check if this plugin was configured correctly.
110                 assert self.name, "Name of the plugin is not set: %s" % self.name
111                 assert self.description, "Description of the plugin is not set: %s" % self.description
112
113                 # Initialize the logger.
114                 self.log = logging.getLogger("collecty.plugins.%s" % self.name)
115                 self.log.propagate = 1
116
117                 self.data = []
118
119                 # Run some custom initialization.
120                 self.init(**kwargs)
121
122                 self.log.debug(_("Successfully initialized %s") % self.__class__.__name__)
123
124         @property
125         def path(self):
126                 """
127                         Returns the name of the sub directory in which all RRD files
128                         for this plugin should be stored in.
129                 """
130                 return self.name
131
132         ### Basic methods
133
134         def init(self, **kwargs):
135                 """
136                         Do some custom initialization stuff here.
137                 """
138                 pass
139
140         def collect(self):
141                 """
142                         Gathers the statistical data, this plugin collects.
143                 """
144                 time_start = time.time()
145
146                 # Run through all objects of this plugin and call the collect method.
147                 for o in self.objects:
148                         now = datetime.datetime.utcnow()
149                         try:
150                                 result = o.collect()
151
152                                 if isinstance(result, tuple) or isinstance(result, list):
153                                         result = ":".join(("%s" % e for e in result))
154                         except:
155                                 self.log.warning(_("Unhandled exception in %s.collect()") % o, exc_info=True)
156                                 continue
157
158                         if not result:
159                                 self.log.warning(_("Received empty result: %s") % o)
160                                 continue
161
162                         self.log.debug(_("Collected %s: %s") % (o, result))
163
164                         # Add the object to the write queue so that the data is written
165                         # to the databases later.
166                         self.collecty.write_queue.add(o, now, result)
167
168                 # Returns the time this function took to complete.
169                 delay = time.time() - time_start
170
171                 # Log some warning when a collect method takes too long to return some data
172                 if delay >= 60:
173                         self.log.warning(_("A worker thread was stalled for %.4fs") % delay)
174
175         def get_object(self, id):
176                 for object in self.objects:
177                         if not object.id == id:
178                                 continue
179
180                         return object
181
182         def get_template(self, template_name, object_id):
183                 for template in self.templates:
184                         if not template.name == template_name:
185                                 continue
186
187                         return template(self, object_id)
188
189         def generate_graph(self, template_name, object_id="default", **kwargs):
190                 template = self.get_template(template_name, object_id=object_id)
191                 if not template:
192                         raise RuntimeError("Could not find template %s" % template_name)
193
194                 time_start = time.time()
195
196                 graph = template.generate_graph(**kwargs)
197
198                 duration = time.time() - time_start
199                 self.log.debug(_("Generated graph %s in %.1fms") \
200                         % (template, duration * 1000))
201
202                 return graph
203
204
205 class Object(object):
206         # The schema of the RRD database.
207         rrd_schema = None
208
209         # RRA properties.
210         rra_types     = ["AVERAGE", "MIN", "MAX"]
211         rra_timespans = [3600, 86400, 604800, 2678400, 31622400]
212         rra_rows      = 2880
213
214         def __init__(self, plugin, *args, **kwargs):
215                 self.plugin = plugin
216
217                 # Indicates if this object has collected its data
218                 self.collected = False
219
220                 # Initialise this object
221                 self.init(*args, **kwargs)
222
223                 # Create the database file.
224                 self.create()
225
226         def __repr__(self):
227                 return "<%s>" % self.__class__.__name__
228
229         @property
230         def collecty(self):
231                 return self.plugin.collecty
232
233         @property
234         def log(self):
235                 return self.plugin.log
236
237         @property
238         def id(self):
239                 """
240                         Returns a UNIQUE identifier for this object. As this is incorporated
241                         into the path of RRD file, it must only contain ASCII characters.
242                 """
243                 raise NotImplementedError
244
245         @property
246         def file(self):
247                 """
248                         The absolute path to the RRD file of this plugin.
249                 """
250                 filename = self._normalise_filename("%s.rrd" % self.id)
251
252                 return os.path.join(DATABASE_DIR, self.plugin.path, filename)
253
254         @staticmethod
255         def _normalise_filename(filename):
256                 # Convert the filename into ASCII characters only
257                 filename = filename.encode("ascii", "ignore")
258
259                 # Replace any spaces by dashes
260                 filename = filename.replace(" ", "-")
261
262                 return filename
263
264         ### Basic methods
265
266         def init(self, *args, **kwargs):
267                 """
268                         Do some custom initialization stuff here.
269                 """
270                 pass
271
272         def create(self):
273                 """
274                         Creates an empty RRD file with the desired data structures.
275                 """
276                 # Skip if the file does already exist.
277                 if os.path.exists(self.file):
278                         return
279
280                 dirname = os.path.dirname(self.file)
281                 if not os.path.exists(dirname):
282                         os.makedirs(dirname)
283
284                 # Create argument list.
285                 args = self.get_rrd_schema()
286
287                 rrdtool.create(self.file, *args)
288
289                 self.log.debug(_("Created RRD file %s.") % self.file)
290                 for arg in args:
291                         self.log.debug("  %s" % arg)
292
293         def info(self):
294                 return rrdtool.info(self.file)
295
296         @property
297         def stepsize(self):
298                 return self.plugin.interval
299
300         @property
301         def heartbeat(self):
302                 return self.stepsize * 2
303
304         def get_rrd_schema(self):
305                 schema = [
306                         "--step", "%s" % self.stepsize,
307                 ]
308                 for line in self.rrd_schema:
309                         if line.startswith("DS:"):
310                                 try:
311                                         (prefix, name, type, lower_limit, upper_limit) = line.split(":")
312
313                                         line = ":".join((
314                                                 prefix,
315                                                 name,
316                                                 type,
317                                                 "%s" % self.heartbeat,
318                                                 lower_limit,
319                                                 upper_limit
320                                         ))
321                                 except ValueError:
322                                         pass
323
324                         schema.append(line)
325
326                 xff = 0.1
327
328                 cdp_length = 0
329                 for rra_timespan in self.rra_timespans:
330                         if (rra_timespan / self.stepsize) < self.rra_rows:
331                                 rra_timespan = self.stepsize * self.rra_rows
332
333                         if cdp_length == 0:
334                                 cdp_length = 1
335                         else:
336                                 cdp_length = rra_timespan // (self.rra_rows * self.stepsize)
337
338                         cdp_number = math.ceil(rra_timespan / (cdp_length * self.stepsize))
339
340                         for rra_type in self.rra_types:
341                                 schema.append("RRA:%s:%.10f:%d:%d" % \
342                                         (rra_type, xff, cdp_length, cdp_number))
343
344                 return schema
345
346         def execute(self):
347                 if self.collected:
348                         raise RuntimeError("This object has already collected its data")
349
350                 self.collected = True
351                 self.now = datetime.datetime.utcnow()
352
353                 # Call the collect
354                 result = self.collect()
355
356         def commit(self):
357                 """
358                         Will commit the collected data to the database.
359                 """
360                 # Make sure that the RRD database has been created
361                 self.create()
362
363
364 class GraphTemplate(object):
365         # A unique name to identify this graph template.
366         name = None
367
368         # Headline of the graph image
369         graph_title = None
370
371         # Vertical label of the graph
372         graph_vertical_label = None
373
374         # Limits
375         lower_limit = None
376         upper_limit = None
377
378         # Instructions how to create the graph.
379         rrd_graph = None
380
381         # Extra arguments passed to rrdgraph.
382         rrd_graph_args = []
383
384         intervals = {
385                 None   : "-3h",
386                 "hour" : "-1h",
387                 "day"  : "-25h",
388                 "week" : "-360h",
389                 "year" : "-365d",
390         }
391
392         # Default dimensions for this graph
393         height = GRAPH_DEFAULT_HEIGHT
394         width  = GRAPH_DEFAULT_WIDTH
395
396         def __init__(self, plugin, object_id):
397                 self.plugin = plugin
398
399                 # Get all required RRD objects
400                 self.object_id = object_id
401
402                 # Get the main object
403                 self.object = self.get_object(self.object_id)
404
405         def __repr__(self):
406                 return "<%s>" % self.__class__.__name__
407
408         @property
409         def collecty(self):
410                 return self.plugin.collecty
411
412         @property
413         def log(self):
414                 return self.plugin.log
415
416         def _make_command_line(self, interval, format=DEFAULT_IMAGE_FORMAT,
417                         width=None, height=None):
418                 args = []
419
420                 args += GRAPH_DEFAULT_ARGUMENTS
421
422                 args += [
423                         "--imgformat", format,
424                         "--height", "%s" % (height or self.height),
425                         "--width", "%s" % (width or self.width),
426                 ]
427
428                 args += self.rrd_graph_args
429
430                 # Graph title
431                 if self.graph_title:
432                         args += ["--title", self.graph_title]
433
434                 # Vertical label
435                 if self.graph_vertical_label:
436                         args += ["--vertical-label", self.graph_vertical_label]
437
438                 if self.lower_limit is not None or self.upper_limit is not None:
439                         # Force to honour the set limits
440                         args.append("--rigid")
441
442                         if self.lower_limit is not None:
443                                 args += ["--lower-limit", self.lower_limit]
444
445                         if self.upper_limit is not None:
446                                 args += ["--upper-limit", self.upper_limit]
447
448                 # Add interval
449                 args.append("--start")
450
451                 try:
452                         args.append(self.intervals[interval])
453                 except KeyError:
454                         args.append(str(interval))
455
456                 return args
457
458         def get_object(self, *args, **kwargs):
459                 return self.plugin.get_object(*args, **kwargs)
460
461         def get_object_table(self):
462                 return {
463                         "file" : self.object,
464                 }
465
466         @property
467         def object_table(self):
468                 if not hasattr(self, "_object_table"):
469                         self._object_table = self.get_object_table()
470
471                 return self._object_table
472
473         def get_object_files(self):
474                 files = {}
475
476                 for id, obj in self.object_table.items():
477                         files[id] = obj.file
478
479                 return files
480
481         def generate_graph(self, interval=None, **kwargs):
482                 args = self._make_command_line(interval, **kwargs)
483
484                 self.log.info(_("Generating graph %s") % self)
485                 self.log.debug("  args: %s" % args)
486
487                 object_files = self.get_object_files()
488
489                 for item in self.rrd_graph:
490                         try:
491                                 args.append(item % object_files)
492                         except TypeError:
493                                 args.append(item)
494
495                         self.log.debug("  %s" % args[-1])
496
497                 return self.write_graph(*args)
498
499         def write_graph(self, *args):
500                 # Convert all arguments to string
501                 args = [str(e) for e in args]
502
503                 with tempfile.NamedTemporaryFile() as f:
504                         rrdtool.graph(f.name, *args)
505
506                         # Get back to the beginning of the file
507                         f.seek(0)
508
509                         # Return all the content
510                         return f.read()