]> git.ipfire.org Git - collecty.git/blame - src/collecty/daemon.py
Implement worker thread concept
[collecty.git] / src / collecty / daemon.py
CommitLineData
73db5226
MT
1#!/usr/bin/python
2###############################################################################
3# #
4# collecty - A system statistics collection daemon for IPFire #
5# Copyright (C) 2012 IPFire development team #
6# #
7# This program is free software: you can redistribute it and/or modify #
8# it under the terms of the GNU General Public License as published by #
9# the Free Software Foundation, either version 3 of the License, or #
10# (at your option) any later version. #
11# #
12# This program is distributed in the hope that it will be useful, #
13# but WITHOUT ANY WARRANTY; without even the implied warranty of #
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15# GNU General Public License for more details. #
16# #
17# You should have received a copy of the GNU General Public License #
18# along with this program. If not, see <http://www.gnu.org/licenses/>. #
19# #
20###############################################################################
21
72364063 22import Queue as queue
49c1b8fd
MT
23import datetime
24import multiprocessing
72364063 25import rrdtool
73db5226 26import signal
72364063
MT
27import threading
28import time
73db5226 29
c968f6d9 30import bus
73db5226
MT
31import plugins
32
33from constants import *
34from i18n import _
35
36import logging
37log = logging.getLogger("collecty")
38
39class Collecty(object):
40 # The default interval, when all data is written to disk.
41 SUBMIT_INTERVAL = 300
42
49c1b8fd 43 HEARTBEAT = 1
72364063 44
73db5226 45 def __init__(self, debug=False):
72364063
MT
46 self.debug = debug
47
a76917bf 48 # Enable debug logging when running in debug mode
72364063 49 if self.debug:
a76917bf
MT
50 log.setLevel(logging.DEBUG)
51
5d140577 52 self.plugins = []
73db5226
MT
53
54 # Indicates whether this process should be running or not.
55 self.running = True
72364063
MT
56
57 # The write queue holds all collected pieces of data which
58 # will be written to disk later.
59 self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
73db5226 60
49c1b8fd
MT
61 # Create worker threads
62 self.worker_threads = self.create_worker_threads()
63
64 self._timer_queue = queue.PriorityQueue()
65 self._worker_queue = queue.Queue()
66
c968f6d9
MT
67 # Create a thread that connects to dbus and processes requests we
68 # get from there.
69 self.bus = bus.Bus(self)
70
5d140577
MT
71 # Add all plugins
72 for plugin in plugins.get():
73 self.add_plugin(plugin)
73db5226 74
0ee0c42d 75 log.debug(_("Collecty successfully initialized with %s plugins") \
5d140577 76 % len(self.plugins))
73db5226 77
5d140577
MT
78 def add_plugin(self, plugin_class):
79 # Try initialising a new plugin. If that fails, we will log the
80 # error and try to go on.
81 try:
82 plugin = plugin_class(self)
83 except:
84 log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True)
85 return
73db5226 86
5d140577 87 self.plugins.append(plugin)
73db5226 88
c968f6d9
MT
89 @property
90 def templates(self):
91 for plugin in self.plugins:
92 for template in plugin.templates:
93 yield template
94
73db5226
MT
95 def run(self):
96 # Register signal handlers.
97 self.register_signal_handler()
98
c968f6d9
MT
99 # Start the bus
100 self.bus.start()
101
49c1b8fd
MT
102 # Initialise the timer queue
103 self.initialise_timer_queue()
104
105 # Start worker threads
106 for w in self.worker_threads:
107 w.start()
73db5226 108
72364063
MT
109 # Run the write queue thread
110 self.write_queue.start()
111
73db5226
MT
112 # Regularly submit all data to disk.
113 while self.running:
72364063 114 try:
49c1b8fd
MT
115 # Try processing one event from the queue. If that succeeded
116 # we will retry immediately.
117 if self.process_timer_queue():
118 continue
119
120 # Otherwise we will sleep for a bit
72364063 121 time.sleep(self.HEARTBEAT)
49c1b8fd
MT
122
123 # Log warnings if the worker queue is filling up
124 queue_size = self._worker_queue.qsize()
125 if queue_size >= 5:
126 log.warning(_("Worker queue is filling up with %s events") % queue_size)
127
72364063
MT
128 except KeyboardInterrupt:
129 self.shutdown()
130 break
73db5226 131
49c1b8fd
MT
132 # Wait until all worker threads are finished
133 for w in self.worker_threads:
134 w.join()
73db5226 135
c968f6d9
MT
136 # Stop the bus thread
137 self.bus.shutdown()
138
72364063
MT
139 # Write all collected data to disk before ending the main thread
140 self.write_queue.shutdown()
141
142 log.debug(_("Main thread exited"))
73db5226
MT
143
144 def shutdown(self):
72364063
MT
145 if not self.running:
146 return
73db5226 147
0ee0c42d 148 log.info(_("Received shutdown signal"))
73db5226 149 self.running = False
73db5226
MT
150
151 # Propagating shutdown to all threads.
49c1b8fd
MT
152 for w in self.worker_threads:
153 w.shutdown()
73db5226
MT
154
155 def register_signal_handler(self):
156 for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
157 log.debug(_("Registering signal %d") % s)
158
159 signal.signal(s, self.signal_handler)
160
161 def signal_handler(self, sig, *args, **kwargs):
162 log.info(_("Caught signal %d") % sig)
163
164 if sig in (signal.SIGTERM, signal.SIGINT):
165 # Shutdown this application.
166 self.shutdown()
167
168 elif sig == signal.SIGUSR1:
72364063
MT
169 # Commit all data.
170 self.write_queue.commit()
73db5226 171
c968f6d9
MT
172 def get_plugin_from_template(self, template_name):
173 for plugin in self.plugins:
174 if not template_name in [t.name for t in plugin.templates]:
175 continue
176
177 return plugin
178
179 def generate_graph(self, template_name, *args, **kwargs):
180 plugin = self.get_plugin_from_template(template_name)
181 if not plugin:
182 raise RuntimeError("Could not find template %s" % template_name)
183
184 return plugin.generate_graph(template_name, *args, **kwargs)
72364063 185
49c1b8fd
MT
186 def create_worker_threads(self, num=None):
187 """
188 Creates a number of worker threads
189 """
190 # If no number of threads is given, we will create as many as we have
191 # active processor cores but never less than four.
192 if num is None:
193 num = max(multiprocessing.cpu_count(), 4)
194
195 worker_threads = []
196
197 for id in range(num):
198 worker_thread = WorkerThread(self, id)
199 worker_threads.append(worker_thread)
200
201 return worker_threads
202
203 def initialise_timer_queue(self):
204 for p in self.plugins:
205 timer = PluginTimer(p)
206
207 self._timer_queue.put(timer)
208
209 def process_timer_queue(self):
210 # Take the item from the timer queue that is to be due first
211 timer = self._timer_queue.get()
212
213 try:
214 # If the timer event is to be executed, we will put the plugin
215 # into the worker queue and reset the timer
216 if timer.is_due():
217 self._worker_queue.put(timer.plugin)
218 timer.reset_deadline()
219
220 return timer
221 finally:
222 # Put the timer back into the timer queue.
223 self._timer_queue.put(timer)
224
225
226class WorkerThread(threading.Thread):
227 HEARTBEAT = 2.5
228
229 def __init__(self, collecty, id):
230 threading.Thread.__init__(self)
231 self.daemon = True
232
233 self.log = logging.getLogger("collecty.worker")
234 self.log.propagate = 1
235
236 self.collecty = collecty
237 self.id = id
238
239 self.log.debug(_("Worker thread %s has been initialised") % self.id)
240
241 @property
242 def queue(self):
243 """
244 The queue this thread is getting events from
245 """
246 return self.collecty._worker_queue
247
248 def run(self):
249 self.log.debug(_("Worker thread %s has been started") % self.id)
250 self.running = True
251
252 while self.running:
253 try:
254 plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
255
256 # If the queue has been empty we just retry
257 except queue.Empty:
258 continue
259
260 # Execute the collect operation for this plugin
261 plugin.collect()
262
263 self.log.debug(_("Worker thread %s has been terminated") % self.id)
264
265 def shutdown(self):
266 self.running = False
267
268 # Wait until all data has been written.
269 self.join()
270
72364063
MT
271
272class WriteQueue(threading.Thread):
273 def __init__(self, collecty, submit_interval):
274 threading.Thread.__init__(self)
275 self.daemon = True
276
277 self.collecty = collecty
278
279 self.log = logging.getLogger("collecty.queue")
72364063
MT
280 self.log.propagate = 1
281
282 self.timer = plugins.Timer(submit_interval)
283 self._queue = queue.PriorityQueue()
284
285 self.log.debug(_("Initialised write queue"))
286
287 def run(self):
288 self.log.debug(_("Write queue process started"))
289 self.running = True
290
291 while self.running:
292 # Reset the timer.
293 self.timer.reset()
294
295 # Wait until the timer has successfully elapsed.
296 if self.timer.wait():
297 self.commit()
298
299 self.commit()
300 self.log.debug(_("Write queue process stopped"))
301
302 def shutdown(self):
303 self.running = False
304 self.timer.cancel()
305
306 # Wait until all data has been written.
307 self.join()
308
309 def add(self, object, time, data):
310 result = QueueObject(object.file, time, data)
311 self._queue.put(result)
312
313 def commit(self):
314 """
315 Flushes the read data to disk.
316 """
317 # There is nothing to do if the queue is empty
318 if self._queue.empty():
319 self.log.debug(_("No data to commit"))
320 return
321
322 time_start = time.time()
323
324 self.log.debug(_("Submitting data to the databases..."))
325
326 # Get all objects from the queue and group them by the RRD file
327 # to commit them all at once
328 results = {}
329 while not self._queue.empty():
330 result = self._queue.get()
331
332 try:
333 results[result.file].append(result)
334 except KeyError:
335 results[result.file] = [result]
336
337 # Write the collected data to disk
338 for filename, results in results.items():
339 self._commit_file(filename, results)
340
341 duration = time.time() - time_start
342 self.log.debug(_("Emptied write queue in %.2fs") % duration)
343
344 def _commit_file(self, filename, results):
0ee0c42d 345 self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
72364063
MT
346 % { "counter" : len(results), "filename" : filename })
347
0ee0c42d
MT
348 for result in results:
349 self.log.debug(" %s: %s" % (result.time, result.data))
72364063 350
50b8fcff
MT
351 try:
352 rrdtool.update(filename, *["%s" % r for r in results])
353
354 # Catch operational errors like unreadable/unwritable RRD databases
355 # or those where the format has changed. The collected data will be lost.
356 except rrdtool.OperationalError as e:
357 self.log.critical(_("Could not update RRD database %s: %s") \
358 % (filename, e))
72364063
MT
359
360
361class QueueObject(object):
362 def __init__(self, file, time, data):
363 self.file = file
364 self.time = time
365 self.data = data
366
367 def __str__(self):
368 return "%s:%s" % (self.time.strftime("%s"), self.data)
369
370 def __cmp__(self, other):
371 return cmp(self.time, other.time)
49c1b8fd
MT
372
373
374class PluginTimer(object):
375 def __init__(self, plugin):
376 self.plugin = plugin
377
378 self.deadline = datetime.datetime.utcnow()
379
380 def __repr__(self):
381 return "<%s %s>" % (self.__class__.__name__, self.deadline)
382
383 def __cmp__(self, other):
384 return cmp(self.deadline, other.deadline)
385
386 def reset_deadline(self):
387 self.deadline = datetime.datetime.utcnow() \
388 + datetime.timedelta(seconds=self.plugin.interval)
389
390 def is_due(self):
391 return datetime.datetime.utcnow() >= self.deadline