Implement worker thread concept
[collecty.git] / src / collecty / daemon.py
1 #!/usr/bin/python
2 ###############################################################################
3 # #
4 # collecty - A system statistics collection daemon for IPFire #
5 # Copyright (C) 2012 IPFire development team #
6 # #
7 # This program is free software: you can redistribute it and/or modify #
8 # it under the terms of the GNU General Public License as published by #
9 # the Free Software Foundation, either version 3 of the License, or #
10 # (at your option) any later version. #
11 # #
12 # This program is distributed in the hope that it will be useful, #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15 # GNU General Public License for more details. #
16 # #
17 # You should have received a copy of the GNU General Public License #
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
19 # #
20 ###############################################################################
21
22 import Queue as queue
23 import datetime
24 import multiprocessing
25 import rrdtool
26 import signal
27 import threading
28 import time
29
30 import bus
31 import plugins
32
33 from constants import *
34 from i18n import _
35
36 import logging
37 log = logging.getLogger("collecty")
38
39 class Collecty(object):
40 # The default interval, when all data is written to disk.
41 SUBMIT_INTERVAL = 300
42
43 HEARTBEAT = 1
44
45 def __init__(self, debug=False):
46 self.debug = debug
47
48 # Enable debug logging when running in debug mode
49 if self.debug:
50 log.setLevel(logging.DEBUG)
51
52 self.plugins = []
53
54 # Indicates whether this process should be running or not.
55 self.running = True
56
57 # The write queue holds all collected pieces of data which
58 # will be written to disk later.
59 self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
60
61 # Create worker threads
62 self.worker_threads = self.create_worker_threads()
63
64 self._timer_queue = queue.PriorityQueue()
65 self._worker_queue = queue.Queue()
66
67 # Create a thread that connects to dbus and processes requests we
68 # get from there.
69 self.bus = bus.Bus(self)
70
71 # Add all plugins
72 for plugin in plugins.get():
73 self.add_plugin(plugin)
74
75 log.debug(_("Collecty successfully initialized with %s plugins") \
76 % len(self.plugins))
77
78 def add_plugin(self, plugin_class):
79 # Try initialising a new plugin. If that fails, we will log the
80 # error and try to go on.
81 try:
82 plugin = plugin_class(self)
83 except:
84 log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True)
85 return
86
87 self.plugins.append(plugin)
88
89 @property
90 def templates(self):
91 for plugin in self.plugins:
92 for template in plugin.templates:
93 yield template
94
95 def run(self):
96 # Register signal handlers.
97 self.register_signal_handler()
98
99 # Start the bus
100 self.bus.start()
101
102 # Initialise the timer queue
103 self.initialise_timer_queue()
104
105 # Start worker threads
106 for w in self.worker_threads:
107 w.start()
108
109 # Run the write queue thread
110 self.write_queue.start()
111
112 # Regularly submit all data to disk.
113 while self.running:
114 try:
115 # Try processing one event from the queue. If that succeeded
116 # we will retry immediately.
117 if self.process_timer_queue():
118 continue
119
120 # Otherwise we will sleep for a bit
121 time.sleep(self.HEARTBEAT)
122
123 # Log warnings if the worker queue is filling up
124 queue_size = self._worker_queue.qsize()
125 if queue_size >= 5:
126 log.warning(_("Worker queue is filling up with %s events") % queue_size)
127
128 except KeyboardInterrupt:
129 self.shutdown()
130 break
131
132 # Wait until all worker threads are finished
133 for w in self.worker_threads:
134 w.join()
135
136 # Stop the bus thread
137 self.bus.shutdown()
138
139 # Write all collected data to disk before ending the main thread
140 self.write_queue.shutdown()
141
142 log.debug(_("Main thread exited"))
143
144 def shutdown(self):
145 if not self.running:
146 return
147
148 log.info(_("Received shutdown signal"))
149 self.running = False
150
151 # Propagating shutdown to all threads.
152 for w in self.worker_threads:
153 w.shutdown()
154
155 def register_signal_handler(self):
156 for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
157 log.debug(_("Registering signal %d") % s)
158
159 signal.signal(s, self.signal_handler)
160
161 def signal_handler(self, sig, *args, **kwargs):
162 log.info(_("Caught signal %d") % sig)
163
164 if sig in (signal.SIGTERM, signal.SIGINT):
165 # Shutdown this application.
166 self.shutdown()
167
168 elif sig == signal.SIGUSR1:
169 # Commit all data.
170 self.write_queue.commit()
171
172 def get_plugin_from_template(self, template_name):
173 for plugin in self.plugins:
174 if not template_name in [t.name for t in plugin.templates]:
175 continue
176
177 return plugin
178
179 def generate_graph(self, template_name, *args, **kwargs):
180 plugin = self.get_plugin_from_template(template_name)
181 if not plugin:
182 raise RuntimeError("Could not find template %s" % template_name)
183
184 return plugin.generate_graph(template_name, *args, **kwargs)
185
186 def create_worker_threads(self, num=None):
187 """
188 Creates a number of worker threads
189 """
190 # If no number of threads is given, we will create as many as we have
191 # active processor cores but never less than four.
192 if num is None:
193 num = max(multiprocessing.cpu_count(), 4)
194
195 worker_threads = []
196
197 for id in range(num):
198 worker_thread = WorkerThread(self, id)
199 worker_threads.append(worker_thread)
200
201 return worker_threads
202
203 def initialise_timer_queue(self):
204 for p in self.plugins:
205 timer = PluginTimer(p)
206
207 self._timer_queue.put(timer)
208
209 def process_timer_queue(self):
210 # Take the item from the timer queue that is to be due first
211 timer = self._timer_queue.get()
212
213 try:
214 # If the timer event is to be executed, we will put the plugin
215 # into the worker queue and reset the timer
216 if timer.is_due():
217 self._worker_queue.put(timer.plugin)
218 timer.reset_deadline()
219
220 return timer
221 finally:
222 # Put the timer back into the timer queue.
223 self._timer_queue.put(timer)
224
225
226 class WorkerThread(threading.Thread):
227 HEARTBEAT = 2.5
228
229 def __init__(self, collecty, id):
230 threading.Thread.__init__(self)
231 self.daemon = True
232
233 self.log = logging.getLogger("collecty.worker")
234 self.log.propagate = 1
235
236 self.collecty = collecty
237 self.id = id
238
239 self.log.debug(_("Worker thread %s has been initialised") % self.id)
240
241 @property
242 def queue(self):
243 """
244 The queue this thread is getting events from
245 """
246 return self.collecty._worker_queue
247
248 def run(self):
249 self.log.debug(_("Worker thread %s has been started") % self.id)
250 self.running = True
251
252 while self.running:
253 try:
254 plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
255
256 # If the queue has been empty we just retry
257 except queue.Empty:
258 continue
259
260 # Execute the collect operation for this plugin
261 plugin.collect()
262
263 self.log.debug(_("Worker thread %s has been terminated") % self.id)
264
265 def shutdown(self):
266 self.running = False
267
268 # Wait until all data has been written.
269 self.join()
270
271
272 class WriteQueue(threading.Thread):
273 def __init__(self, collecty, submit_interval):
274 threading.Thread.__init__(self)
275 self.daemon = True
276
277 self.collecty = collecty
278
279 self.log = logging.getLogger("collecty.queue")
280 self.log.propagate = 1
281
282 self.timer = plugins.Timer(submit_interval)
283 self._queue = queue.PriorityQueue()
284
285 self.log.debug(_("Initialised write queue"))
286
287 def run(self):
288 self.log.debug(_("Write queue process started"))
289 self.running = True
290
291 while self.running:
292 # Reset the timer.
293 self.timer.reset()
294
295 # Wait until the timer has successfully elapsed.
296 if self.timer.wait():
297 self.commit()
298
299 self.commit()
300 self.log.debug(_("Write queue process stopped"))
301
302 def shutdown(self):
303 self.running = False
304 self.timer.cancel()
305
306 # Wait until all data has been written.
307 self.join()
308
309 def add(self, object, time, data):
310 result = QueueObject(object.file, time, data)
311 self._queue.put(result)
312
313 def commit(self):
314 """
315 Flushes the read data to disk.
316 """
317 # There is nothing to do if the queue is empty
318 if self._queue.empty():
319 self.log.debug(_("No data to commit"))
320 return
321
322 time_start = time.time()
323
324 self.log.debug(_("Submitting data to the databases..."))
325
326 # Get all objects from the queue and group them by the RRD file
327 # to commit them all at once
328 results = {}
329 while not self._queue.empty():
330 result = self._queue.get()
331
332 try:
333 results[result.file].append(result)
334 except KeyError:
335 results[result.file] = [result]
336
337 # Write the collected data to disk
338 for filename, results in results.items():
339 self._commit_file(filename, results)
340
341 duration = time.time() - time_start
342 self.log.debug(_("Emptied write queue in %.2fs") % duration)
343
344 def _commit_file(self, filename, results):
345 self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
346 % { "counter" : len(results), "filename" : filename })
347
348 for result in results:
349 self.log.debug(" %s: %s" % (result.time, result.data))
350
351 try:
352 rrdtool.update(filename, *["%s" % r for r in results])
353
354 # Catch operational errors like unreadable/unwritable RRD databases
355 # or those where the format has changed. The collected data will be lost.
356 except rrdtool.OperationalError as e:
357 self.log.critical(_("Could not update RRD database %s: %s") \
358 % (filename, e))
359
360
361 class QueueObject(object):
362 def __init__(self, file, time, data):
363 self.file = file
364 self.time = time
365 self.data = data
366
367 def __str__(self):
368 return "%s:%s" % (self.time.strftime("%s"), self.data)
369
370 def __cmp__(self, other):
371 return cmp(self.time, other.time)
372
373
374 class PluginTimer(object):
375 def __init__(self, plugin):
376 self.plugin = plugin
377
378 self.deadline = datetime.datetime.utcnow()
379
380 def __repr__(self):
381 return "<%s %s>" % (self.__class__.__name__, self.deadline)
382
383 def __cmp__(self, other):
384 return cmp(self.deadline, other.deadline)
385
386 def reset_deadline(self):
387 self.deadline = datetime.datetime.utcnow() \
388 + datetime.timedelta(seconds=self.plugin.interval)
389
390 def is_due(self):
391 return datetime.datetime.utcnow() >= self.deadline