bb46971493368d3d18789d118555870ac14c6268
[collecty.git] / src / collecty / daemon.py
1 #!/usr/bin/python
2 ###############################################################################
3 #                                                                             #
4 # collecty - A system statistics collection daemon for IPFire                 #
5 # Copyright (C) 2012 IPFire development team                                  #
6 #                                                                             #
7 # This program is free software: you can redistribute it and/or modify        #
8 # it under the terms of the GNU General Public License as published by        #
9 # the Free Software Foundation, either version 3 of the License, or           #
10 # (at your option) any later version.                                         #
11 #                                                                             #
12 # This program is distributed in the hope that it will be useful,             #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of              #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
15 # GNU General Public License for more details.                                #
16 #                                                                             #
17 # You should have received a copy of the GNU General Public License           #
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
19 #                                                                             #
20 ###############################################################################
21
22 import Queue as queue
23 import datetime
24 import multiprocessing
25 import rrdtool
26 import signal
27 import threading
28 import time
29
30 import bus
31 import plugins
32
33 from constants import *
34 from i18n import _
35
36 import logging
37 log = logging.getLogger("collecty")
38
39 class Collecty(object):
40         # The default interval, when all data is written to disk.
41         SUBMIT_INTERVAL = 300
42
43         HEARTBEAT = 1
44
45         def __init__(self, debug=False):
46                 self.debug = debug
47
48                 # Enable debug logging when running in debug mode
49                 if self.debug:
50                         log.setLevel(logging.DEBUG)
51
52                 self.plugins = []
53
54                 # Indicates whether this process should be running or not.
55                 self.running = True
56
57                 # The write queue holds all collected pieces of data which
58                 # will be written to disk later.
59                 self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
60
61                 # Create worker threads
62                 self.worker_threads = self.create_worker_threads()
63
64                 self._timer_queue = queue.PriorityQueue()
65                 self._worker_queue = queue.Queue()
66
67                 # Create a thread that connects to dbus and processes requests we
68                 # get from there.
69                 self.bus = bus.Bus(self)
70
71                 # Add all plugins
72                 for plugin in plugins.get():
73                         self.add_plugin(plugin)
74
75                 log.debug(_("Collecty successfully initialized with %s plugins") \
76                         % len(self.plugins))
77
78         def add_plugin(self, plugin_class):
79                 # Try initialising a new plugin. If that fails, we will log the
80                 # error and try to go on.
81                 try:
82                         plugin = plugin_class(self)
83                 except:
84                         log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True)
85                         return
86
87                 self.plugins.append(plugin)
88
89         @property
90         def templates(self):
91                 for plugin in self.plugins:
92                         for template in plugin.templates:
93                                 yield template
94
95         def run(self):
96                 # Register signal handlers.
97                 self.register_signal_handler()
98
99                 # Start the bus
100                 self.bus.start()
101
102                 # Initialise the timer queue
103                 self.initialise_timer_queue()
104
105                 # Start worker threads
106                 for w in self.worker_threads:
107                         w.start()
108
109                 # Run the write queue thread
110                 self.write_queue.start()
111
112                 # Regularly submit all data to disk.
113                 while self.running:
114                         try:
115                                 # Try processing one event from the queue. If that succeeded
116                                 # we will retry immediately.
117                                 if self.process_timer_queue():
118                                         continue
119
120                                 # Otherwise we will sleep for a bit
121                                 time.sleep(self.HEARTBEAT)
122
123                                 # Log warnings if the worker queue is filling up
124                                 queue_size = self._worker_queue.qsize()
125                                 if queue_size >= 5:
126                                         log.warning(_("Worker queue is filling up with %s events") % queue_size)
127
128                         except KeyboardInterrupt:
129                                 self.shutdown()
130                                 break
131
132                 # Wait until all worker threads are finished
133                 for w in self.worker_threads:
134                         w.join()
135
136                 # Stop the bus thread
137                 self.bus.shutdown()
138
139                 # Write all collected data to disk before ending the main thread
140                 self.write_queue.shutdown()
141
142                 log.debug(_("Main thread exited"))
143
144         def shutdown(self):
145                 if not self.running:
146                         return
147
148                 log.info(_("Received shutdown signal"))
149                 self.running = False
150
151                 # Propagating shutdown to all threads.
152                 for w in self.worker_threads:
153                         w.shutdown()
154
155         def register_signal_handler(self):
156                 for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
157                         log.debug(_("Registering signal %d") % s)
158
159                 signal.signal(s, self.signal_handler)
160
161         def signal_handler(self, sig, *args, **kwargs):
162                 log.info(_("Caught signal %d") % sig)
163
164                 if sig in (signal.SIGTERM, signal.SIGINT):
165                         # Shutdown this application.
166                         self.shutdown()
167
168                 elif sig == signal.SIGUSR1:
169                         # Commit all data.
170                         self.write_queue.commit()
171
172         def get_plugin_from_template(self, template_name):
173                 for plugin in self.plugins:
174                         if not template_name in [t.name for t in plugin.templates]:
175                                 continue
176
177                         return plugin
178
179         def generate_graph(self, template_name, *args, **kwargs):
180                 plugin = self.get_plugin_from_template(template_name)
181                 if not plugin:
182                         raise RuntimeError("Could not find template %s" % template_name)
183
184                 return plugin.generate_graph(template_name, *args, **kwargs)
185
186         def create_worker_threads(self, num=None):
187                 """
188                         Creates a number of worker threads
189                 """
190                 # If no number of threads is given, we will create as many as we have
191                 # active processor cores but never less than four.
192                 if num is None:
193                         num = max(multiprocessing.cpu_count(), 4)
194
195                 worker_threads = []
196
197                 for id in range(num):
198                         worker_thread = WorkerThread(self, id)
199                         worker_threads.append(worker_thread)
200
201                 return worker_threads
202
203         def initialise_timer_queue(self):
204                 for p in self.plugins:
205                         timer = PluginTimer(p)
206
207                         self._timer_queue.put(timer)
208
209         def process_timer_queue(self):
210                 # Take the item from the timer queue that is to be due first
211                 timer = self._timer_queue.get()
212
213                 try:
214                         # If the timer event is to be executed, we will put the plugin
215                         # into the worker queue and reset the timer
216                         if timer.is_due():
217                                 self._worker_queue.put(timer.plugin)
218                                 timer.reset_deadline()
219
220                                 return timer
221                 finally:
222                         # Put the timer back into the timer queue.
223                         self._timer_queue.put(timer)
224
225
226 class WorkerThread(threading.Thread):
227         HEARTBEAT = 2.5
228
229         def __init__(self, collecty, id):
230                 threading.Thread.__init__(self)
231                 self.daemon = True
232
233                 self.log = logging.getLogger("collecty.worker")
234                 self.log.propagate = 1
235
236                 self.collecty = collecty
237                 self.id = id
238
239                 self.log.debug(_("Worker thread %s has been initialised") % self.id)
240
241         @property
242         def queue(self):
243                 """
244                         The queue this thread is getting events from
245                 """
246                 return self.collecty._worker_queue
247
248         def run(self):
249                 self.log.debug(_("Worker thread %s has been started") % self.id)
250                 self.running = True
251
252                 while self.running:
253                         try:
254                                 plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
255
256                         # If the queue has been empty we just retry
257                         except queue.Empty:
258                                 continue
259
260                         # Execute the collect operation for this plugin
261                         plugin.collect()
262
263                 self.log.debug(_("Worker thread %s has been terminated") % self.id)
264
265         def shutdown(self):
266                 self.running = False
267
268                 # Wait until all data has been written.
269                 self.join()
270
271
272 class WriteQueue(threading.Thread):
273         def __init__(self, collecty, submit_interval):
274                 threading.Thread.__init__(self)
275                 self.daemon = True
276
277                 self.collecty = collecty
278
279                 self.log = logging.getLogger("collecty.queue")
280                 self.log.propagate = 1
281
282                 self.timer = plugins.Timer(submit_interval)
283                 self._queue = queue.PriorityQueue()
284
285                 self.log.debug(_("Initialised write queue"))
286
287         def run(self):
288                 self.log.debug(_("Write queue process started"))
289                 self.running = True
290
291                 while self.running:
292                         # Reset the timer.
293                         self.timer.reset()
294
295                         # Wait until the timer has successfully elapsed.
296                         if self.timer.wait():
297                                 self.commit()
298
299                 self.commit()
300                 self.log.debug(_("Write queue process stopped"))
301
302         def shutdown(self):
303                 self.running = False
304                 self.timer.cancel()
305
306                 # Wait until all data has been written.
307                 self.join()
308
309         def add(self, object, time, data):
310                 result = QueueObject(object.file, time, data)
311                 self._queue.put(result)
312
313         def commit(self):
314                 """
315                         Flushes the read data to disk.
316                 """
317                 # There is nothing to do if the queue is empty
318                 if self._queue.empty():
319                         self.log.debug(_("No data to commit"))
320                         return
321
322                 time_start = time.time()
323
324                 self.log.debug(_("Submitting data to the databases..."))
325
326                 # Get all objects from the queue and group them by the RRD file
327                 # to commit them all at once
328                 results = {}
329                 while not self._queue.empty():
330                         result = self._queue.get()
331
332                         try:
333                                 results[result.file].append(result)
334                         except KeyError:
335                                 results[result.file] = [result]
336
337                 # Write the collected data to disk
338                 for filename, results in results.items():
339                         self._commit_file(filename, results)
340
341                 duration = time.time() - time_start
342                 self.log.debug(_("Emptied write queue in %.2fs") % duration)
343
344         def _commit_file(self, filename, results):
345                 self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
346                         % { "counter" : len(results), "filename" : filename })
347
348                 for result in results:
349                         self.log.debug("  %s: %s" % (result.time, result.data))
350
351                 try:
352                         rrdtool.update(filename, *["%s" % r for r in results])
353
354                 # Catch operational errors like unreadable/unwritable RRD databases
355                 # or those where the format has changed. The collected data will be lost.
356                 except rrdtool.OperationalError as e:
357                         self.log.critical(_("Could not update RRD database %s: %s") \
358                                 % (filename, e))
359
360
361 class QueueObject(object):
362         def __init__(self, file, time, data):
363                 self.file = file
364                 self.time = time
365                 self.data = data
366
367         def __str__(self):
368                 return "%s:%s" % (self.time.strftime("%s"), self.data)
369
370         def __cmp__(self, other):
371                 return cmp(self.time, other.time)
372
373
374 class PluginTimer(object):
375         def __init__(self, plugin):
376                 self.plugin = plugin
377
378                 self.deadline = datetime.datetime.utcnow()
379
380         def __repr__(self):
381                 return "<%s %s>" % (self.__class__.__name__, self.deadline)
382
383         def __cmp__(self, other):
384                 return cmp(self.deadline, other.deadline)
385
386         def reset_deadline(self):
387                 self.deadline = datetime.datetime.utcnow() \
388                         + datetime.timedelta(seconds=self.plugin.interval)
389
390         def is_due(self):
391                 return datetime.datetime.utcnow() >= self.deadline