]> git.ipfire.org Git - collecty.git/blob - src/collecty/daemon.py
daemon: Rename class and do not load with main module
[collecty.git] / src / collecty / daemon.py
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # collecty - A system statistics collection daemon for IPFire #
5 # Copyright (C) 2012 IPFire development team #
6 # #
7 # This program is free software: you can redistribute it and/or modify #
8 # it under the terms of the GNU General Public License as published by #
9 # the Free Software Foundation, either version 3 of the License, or #
10 # (at your option) any later version. #
11 # #
12 # This program is distributed in the hope that it will be useful, #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15 # GNU General Public License for more details. #
16 # #
17 # You should have received a copy of the GNU General Public License #
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
19 # #
20 ###############################################################################
21
22 import logging
23 import os
24 import rrdtool
25 import sched
26 import signal
27 import tarfile
28 import tempfile
29 import threading
30 import time
31
32 from . import bus
33 from . import plugins
34
35 from .constants import *
36 from .i18n import _
37
38 log = logging.getLogger("collecty")
39
40 class Daemon(object):
41 # The default interval, when all data is written to disk.
42 COMMIT_INTERVAL = 300
43
44 def __init__(self, debug=False):
45 self.debug = debug
46
47 # Reset timezone to UTC
48 # rrdtool is reading that from the environment
49 os.environ["TZ"] = "UTC"
50
51 # Enable debug logging when running in debug mode
52 if self.debug:
53 log.setLevel(logging.DEBUG)
54
55 self.plugins = []
56
57 # Create the scheduler
58 self.scheduler = sched.scheduler()
59 self._schedule_commit()
60
61 # The write queue holds all collected pieces of data which
62 # will be written to disk later.
63 self.write_queue = WriteQueue(self)
64
65 # Create a thread that connects to dbus and processes requests we
66 # get from there.
67 self.bus = bus.Bus(self)
68
69 log.debug(_("Collecty successfully initialized"))
70
71 def add_plugin(self, plugin_class):
72 # Try initialising a new plugin. If that fails, we will log the
73 # error and try to go on.
74 try:
75 plugin = plugin_class(self)
76 except:
77 log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True)
78 return
79
80 self.plugins.append(plugin)
81
82 # Collect immediately
83 self._schedule_plugin(plugin, interval=0)
84
85 @property
86 def templates(self):
87 for plugin in self.plugins:
88 for template in plugin.templates:
89 yield template
90
91 def _schedule_plugin(self, plugin, interval=None):
92 """
93 Schedules a collection event for the given plugin
94 """
95 log.debug("Scheduling plugin %s for executing in %ss" % (plugin, plugin.interval))
96
97 self.scheduler.enter(
98 plugin.interval if interval is None else interval, plugin.priority, self._collect, (plugin,),
99 )
100
101 def _schedule_commit(self):
102 log.debug("Scheduling commit in %ss" % self.COMMIT_INTERVAL)
103
104 self.scheduler.enter(
105 self.COMMIT_INTERVAL, -1, self._commit,
106 )
107
108 def _collect(self, plugin, **kwargs):
109 """
110 Called for each plugin when it is time to collect some data
111 """
112 log.debug("Collection started for %s" % plugin)
113
114 # Add the next collection event to the scheduler
115 self._schedule_plugin(plugin)
116
117 # Run collection
118 plugin.collect()
119
120 def _commit(self):
121 """
122 Called when all data should be committed to disk
123 """
124 # Schedule the next commit
125 self._schedule_commit()
126
127 # Write everything in the queue
128 self.write_queue.commit()
129
130 def run(self):
131 # Register signal handlers.
132 self.register_signal_handler()
133
134 # Start the bus
135 self.bus.start()
136
137 # Add all plugins
138 for plugin in plugins.get():
139 self.add_plugin(plugin)
140
141 # Run the scheduler
142 try:
143 self.scheduler.run()
144 except KeyboardInterrupt:
145 pass
146
147 # Clear all plugins
148 self.plugins.clear()
149
150 # Stop the bus thread
151 self.bus.shutdown()
152
153 # Write all collected data to disk before ending the main thread
154 self.write_queue.commit()
155
156 log.debug(_("Main thread exited"))
157
158 def shutdown(self):
159 log.info(_("Received shutdown signal"))
160
161 def register_signal_handler(self):
162 for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
163 log.debug(_("Registering signal %d") % s)
164
165 signal.signal(s, self.signal_handler)
166
167 def signal_handler(self, sig, *args, **kwargs):
168 log.info(_("Caught signal %d") % sig)
169
170 if sig in (signal.SIGTERM, signal.SIGINT):
171 # Shutdown this application.
172 self.shutdown()
173
174 elif sig == signal.SIGUSR1:
175 # Commit all data.
176 self.write_queue.commit()
177
178 def get_plugin_from_template(self, template_name):
179 for plugin in self.plugins:
180 if not template_name in [t.name for t in plugin.templates]:
181 continue
182
183 return plugin
184
185 def generate_graph(self, template_name, *args, **kwargs):
186 plugin = self.get_plugin_from_template(template_name)
187 if not plugin:
188 raise RuntimeError("Could not find template %s" % template_name)
189
190 return plugin.generate_graph(template_name, *args, **kwargs)
191
192 def graph_info(self, template_name, *args, **kwargs):
193 plugin = self.get_plugin_from_template(template_name)
194 if not plugin:
195 raise RuntimeError("Could not find template %s" % template_name)
196
197 return plugin.graph_info(template_name, *args, **kwargs)
198
199 def last_update(self, template_name, *args, **kwargs):
200 plugin = self.get_plugin_from_template(template_name)
201 if not plugin:
202 raise RuntimeError("Could not find template %s" % template_name)
203
204 return plugin.last_update(*args, **kwargs)
205
206 def backup(self, filename):
207 # Write all data to disk first
208 self.write_queue.commit()
209
210 log.info(_("Backing up to %s..." % filename))
211
212 # Opening a compressed tar file with will have all files added to it
213 with tarfile.open(filename, mode="w:gz") as archive:
214 for path, directories, files in os.walk(DATABASE_DIR):
215 for file in files:
216 # Skip any non-RRD files
217 if not file.endswith(".rrd"):
218 continue
219
220 # Compose the full file path
221 file = os.path.join(path, file)
222
223 log.debug(_("Adding %s to backup...") % file)
224
225 with tempfile.NamedTemporaryFile() as t:
226 rrdtool.dump(file, t.name)
227
228 # Add the file to the archive
229 archive.add(
230 t.name, arcname=file[len(DATABASE_DIR):],
231 )
232
233 log.info(_("Backup finished"))
234
235
236 class WriteQueue(object):
237 def __init__(self, collecty):
238 self.collecty = collecty
239
240 self.log = logging.getLogger("collecty.queue")
241
242 # Store data here
243 self._data = []
244
245 # Lock to make this class thread-safe
246 self._lock = threading.Lock()
247
248 self.log.debug(_("Initialised write queue"))
249
250 def submit(self, object, data):
251 """
252 Submit a new data point for object
253 """
254 data = QueueObject(object.file, data)
255
256 with self._lock:
257 self._data.append(data)
258
259 return data
260
261 def commit(self):
262 """
263 Flushes the read data to disk.
264 """
265 self.log.debug(_("Committing data to disk..."))
266
267 time_start = time.time()
268
269 # There is nothing to do if the queue is empty
270 with self._lock:
271 if not self._data:
272 self.log.debug(_("No data to commit"))
273 return
274
275 # Get all objects from the queue and group them by the RRD file
276 # to commit them all at once
277 results = {}
278
279 # Group all datapoints by file
280 for data in self._data:
281 try:
282 results[data.file].append(data)
283 except KeyError:
284 results[data.file] = [data]
285
286 # Clear the queue
287 self._data.clear()
288
289 # Write the collected data to disk
290 for filename in sorted(results):
291 self._commit_file(filename, results[filename])
292
293 duration = time.time() - time_start
294 self.log.debug(_("Emptied write queue in %.2fs") % duration)
295
296 def _commit_file(self, filename, results):
297 self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
298 % { "counter" : len(results), "filename" : filename })
299
300 # Sort data before submitting it to rrdtool
301 results.sort()
302
303 for data in results:
304 self.log.debug(" %s" % data)
305
306 try:
307 rrdtool.update(filename, *["%s" % r for r in results])
308
309 # Catch operational errors like unreadable/unwritable RRD databases
310 # or those where the format has changed. The collected data will be lost.
311 except rrdtool.OperationalError as e:
312 self.log.critical(_("Could not update RRD database %s: %s") \
313 % (filename, e))
314
315 def commit_file(self, filename):
316 """
317 Commits all data that is in the write queue for the given
318 RRD database.
319 """
320 results, others = [], []
321
322 # We will have to walk through the entire queue since we cannot
323 # ready any items selectively. Everything that belongs to our
324 # transaction is kept. Everything else will be put back into the
325 # queue.
326 with self._lock:
327 for data in self._data:
328 if data.file == filename:
329 results.append(data)
330 else:
331 others.append(data)
332
333 # Put back all items that did not match
334 self._data = others
335
336 # Write everything else to disk
337 if results:
338 self._commit_file(filename, results)
339
340
341 class QueueObject(object):
342 def __init__(self, file, data):
343 self.file = file
344 self.data = self._format_data(data)
345
346 # Save current timestamp
347 self.time = time.time()
348
349 def __str__(self):
350 return "%.0f:%s" % (self.time, self.data)
351
352 def __lt__(self, other):
353 if isinstance(other, self.__class__):
354 return self.time < other.time
355
356 return NotImplemented
357
358 @staticmethod
359 def _format_data(data):
360 if not isinstance(data, tuple) and not isinstance(data, list):
361 return data
362
363 # Replace all Nones by UNKNOWN
364 s = []
365
366 for e in data:
367 if e is None:
368 e = "U"
369
370 s.append("%s" % e)
371
372 return ":".join(s)