Migrate to Python 3
[collecty.git] / src / collecty / daemon.py
1 #!/usr/bin/python3
2 ###############################################################################
3 #                                                                             #
4 # collecty - A system statistics collection daemon for IPFire                 #
5 # Copyright (C) 2012 IPFire development team                                  #
6 #                                                                             #
7 # This program is free software: you can redistribute it and/or modify        #
8 # it under the terms of the GNU General Public License as published by        #
9 # the Free Software Foundation, either version 3 of the License, or           #
10 # (at your option) any later version.                                         #
11 #                                                                             #
12 # This program is distributed in the hope that it will be useful,             #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of              #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
15 # GNU General Public License for more details.                                #
16 #                                                                             #
17 # You should have received a copy of the GNU General Public License           #
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
19 #                                                                             #
20 ###############################################################################
21
22 import datetime
23 import multiprocessing
24 import queue
25 import rrdtool
26 import signal
27 import threading
28 import time
29
30 from . import bus
31 from . import plugins
32
33 from .constants import *
34 from .i18n import _
35
36 import logging
37 log = logging.getLogger("collecty")
38
39 class Collecty(object):
40         # The default interval, when all data is written to disk.
41         SUBMIT_INTERVAL = 300
42
43         HEARTBEAT = 1
44
45         def __init__(self, debug=False):
46                 self.debug = debug
47
48                 # Enable debug logging when running in debug mode
49                 if self.debug:
50                         log.setLevel(logging.DEBUG)
51
52                 self.plugins = []
53
54                 # Indicates whether this process should be running or not.
55                 self.running = True
56
57                 # The write queue holds all collected pieces of data which
58                 # will be written to disk later.
59                 self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
60
61                 # Create worker threads
62                 self.worker_threads = self.create_worker_threads()
63
64                 self._timer_queue = queue.PriorityQueue()
65                 self._worker_queue = queue.Queue()
66
67                 # Create a thread that connects to dbus and processes requests we
68                 # get from there.
69                 self.bus = bus.Bus(self)
70
71                 # Add all plugins
72                 for plugin in plugins.get():
73                         self.add_plugin(plugin)
74
75                 log.debug(_("Collecty successfully initialized with %s plugins") \
76                         % len(self.plugins))
77
78         def add_plugin(self, plugin_class):
79                 # Try initialising a new plugin. If that fails, we will log the
80                 # error and try to go on.
81                 try:
82                         plugin = plugin_class(self)
83                 except:
84                         log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True)
85                         return
86
87                 self.plugins.append(plugin)
88
89         @property
90         def templates(self):
91                 for plugin in self.plugins:
92                         for template in plugin.templates:
93                                 yield template
94
95         def run(self):
96                 # Register signal handlers.
97                 self.register_signal_handler()
98
99                 # Cannot do anything if no plugins have been initialised
100                 if not self.plugins:
101                         log.critical(_("No plugins have been initialised"))
102                         return
103
104                 # Start the bus
105                 self.bus.start()
106
107                 # Initialise the timer queue
108                 self.initialise_timer_queue()
109
110                 # Start worker threads
111                 for w in self.worker_threads:
112                         w.start()
113
114                 # Run the write queue thread
115                 self.write_queue.start()
116
117                 # Regularly submit all data to disk.
118                 while self.running:
119                         try:
120                                 # Try processing one event from the queue. If that succeeded
121                                 # we will retry immediately.
122                                 if self.process_timer_queue():
123                                         continue
124
125                                 # Otherwise we will sleep for a bit
126                                 time.sleep(self.HEARTBEAT)
127
128                                 # Log warnings if the worker queue is filling up
129                                 queue_size = self._worker_queue.qsize()
130                                 if queue_size >= 5:
131                                         log.warning(_("Worker queue is filling up with %s events") % queue_size)
132
133                         except KeyboardInterrupt:
134                                 self.shutdown()
135                                 break
136
137                 # Wait until all worker threads are finished
138                 for w in self.worker_threads:
139                         w.join()
140
141                 # Stop the bus thread
142                 self.bus.shutdown()
143
144                 # Write all collected data to disk before ending the main thread
145                 self.write_queue.shutdown()
146
147                 log.debug(_("Main thread exited"))
148
149         def shutdown(self):
150                 if not self.running:
151                         return
152
153                 log.info(_("Received shutdown signal"))
154                 self.running = False
155
156                 # Propagating shutdown to all threads.
157                 for w in self.worker_threads:
158                         w.shutdown()
159
160         def register_signal_handler(self):
161                 for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
162                         log.debug(_("Registering signal %d") % s)
163
164                 signal.signal(s, self.signal_handler)
165
166         def signal_handler(self, sig, *args, **kwargs):
167                 log.info(_("Caught signal %d") % sig)
168
169                 if sig in (signal.SIGTERM, signal.SIGINT):
170                         # Shutdown this application.
171                         self.shutdown()
172
173                 elif sig == signal.SIGUSR1:
174                         # Commit all data.
175                         self.write_queue.commit()
176
177         def get_plugin_from_template(self, template_name):
178                 for plugin in self.plugins:
179                         if not template_name in [t.name for t in plugin.templates]:
180                                 continue
181
182                         return plugin
183
184         def generate_graph(self, template_name, *args, **kwargs):
185                 plugin = self.get_plugin_from_template(template_name)
186                 if not plugin:
187                         raise RuntimeError("Could not find template %s" % template_name)
188
189                 return plugin.generate_graph(template_name, *args, **kwargs)
190
191         def create_worker_threads(self, num=None):
192                 """
193                         Creates a number of worker threads
194                 """
195                 # If no number of threads is given, we will create as many as we have
196                 # active processor cores but never less than four.
197                 if num is None:
198                         num = max(multiprocessing.cpu_count(), 4)
199
200                 worker_threads = []
201
202                 for id in range(num):
203                         worker_thread = WorkerThread(self, id)
204                         worker_threads.append(worker_thread)
205
206                 return worker_threads
207
208         def initialise_timer_queue(self):
209                 for p in self.plugins:
210                         timer = PluginTimer(p)
211
212                         self._timer_queue.put(timer)
213
214         def process_timer_queue(self):
215                 # Take the item from the timer queue that is to be due first
216                 timer = self._timer_queue.get()
217
218                 try:
219                         # If the timer event is to be executed, we will put the plugin
220                         # into the worker queue and reset the timer
221                         if timer.is_due():
222                                 self._worker_queue.put(timer.plugin)
223                                 timer.reset_deadline()
224
225                                 return timer
226                 finally:
227                         # Put the timer back into the timer queue.
228                         self._timer_queue.put(timer)
229
230
231 class WorkerThread(threading.Thread):
232         HEARTBEAT = 2.5
233
234         def __init__(self, collecty, id):
235                 threading.Thread.__init__(self)
236                 self.daemon = True
237
238                 self.log = logging.getLogger("collecty.worker")
239                 self.log.propagate = 1
240
241                 self.collecty = collecty
242                 self.id = id
243
244                 self.log.debug(_("Worker thread %s has been initialised") % self.id)
245
246         @property
247         def queue(self):
248                 """
249                         The queue this thread is getting events from
250                 """
251                 return self.collecty._worker_queue
252
253         def run(self):
254                 self.log.debug(_("Worker thread %s has been started") % self.id)
255                 self.running = True
256
257                 while self.running:
258                         try:
259                                 plugin = self.queue.get(block=True, timeout=self.HEARTBEAT)
260
261                         # If the queue has been empty we just retry
262                         except queue.Empty:
263                                 continue
264
265                         # Execute the collect operation for this plugin
266                         plugin.collect()
267
268                 self.log.debug(_("Worker thread %s has been terminated") % self.id)
269
270         def shutdown(self):
271                 self.running = False
272
273                 # Wait until all data has been written.
274                 self.join()
275
276
277 class WriteQueue(threading.Thread):
278         def __init__(self, collecty, submit_interval):
279                 threading.Thread.__init__(self)
280                 self.daemon = True
281
282                 self.collecty = collecty
283
284                 self.log = logging.getLogger("collecty.queue")
285                 self.log.propagate = 1
286
287                 self.timer = plugins.Timer(submit_interval)
288                 self._queue = queue.PriorityQueue()
289
290                 self.log.debug(_("Initialised write queue"))
291
292         def run(self):
293                 self.log.debug(_("Write queue process started"))
294                 self.running = True
295
296                 while self.running:
297                         # Reset the timer.
298                         self.timer.reset()
299
300                         # Wait until the timer has successfully elapsed.
301                         if self.timer.wait():
302                                 self.commit()
303
304                 self.commit()
305                 self.log.debug(_("Write queue process stopped"))
306
307         def shutdown(self):
308                 self.running = False
309                 self.timer.cancel()
310
311                 # Wait until all data has been written.
312                 self.join()
313
314         def add(self, object, time, data):
315                 result = QueueObject(object.file, time, data)
316                 self._queue.put(result)
317
318         def commit(self):
319                 """
320                         Flushes the read data to disk.
321                 """
322                 # There is nothing to do if the queue is empty
323                 if self._queue.empty():
324                         self.log.debug(_("No data to commit"))
325                         return
326
327                 time_start = time.time()
328
329                 self.log.debug(_("Submitting data to the databases..."))
330
331                 # Get all objects from the queue and group them by the RRD file
332                 # to commit them all at once
333                 results = {}
334                 while not self._queue.empty():
335                         result = self._queue.get()
336
337                         try:
338                                 results[result.file].append(result)
339                         except KeyError:
340                                 results[result.file] = [result]
341
342                 # Write the collected data to disk
343                 for filename, results in list(results.items()):
344                         self._commit_file(filename, results)
345
346                 duration = time.time() - time_start
347                 self.log.debug(_("Emptied write queue in %.2fs") % duration)
348
349         def _commit_file(self, filename, results):
350                 self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
351                         % { "counter" : len(results), "filename" : filename })
352
353                 for result in results:
354                         self.log.debug("  %s: %s" % (result.time, result.data))
355
356                 try:
357                         rrdtool.update(filename, *["%s" % r for r in results])
358
359                 # Catch operational errors like unreadable/unwritable RRD databases
360                 # or those where the format has changed. The collected data will be lost.
361                 except rrdtool.OperationalError as e:
362                         self.log.critical(_("Could not update RRD database %s: %s") \
363                                 % (filename, e))
364
365
366 class QueueObject(object):
367         def __init__(self, file, time, data):
368                 self.file = file
369                 self.time = time
370                 self.data = data
371
372         def __str__(self):
373                 return "%s:%s" % (self.time.strftime("%s"), self.data)
374
375         def __lt__(self, other):
376                 return self.time < other.time
377
378
379 class PluginTimer(object):
380         def __init__(self, plugin):
381                 self.plugin = plugin
382
383                 self.deadline = datetime.datetime.utcnow()
384
385         def __repr__(self):
386                 return "<%s %s>" % (self.__class__.__name__, self.deadline)
387
388         def __lt__(self, other):
389                 return self.deadline < other.deadline
390
391         def reset_deadline(self):
392                 self.deadline = datetime.datetime.utcnow() \
393                         + datetime.timedelta(seconds=self.plugin.interval)
394
395         def is_due(self):
396                 return datetime.datetime.utcnow() >= self.deadline