]> git.ipfire.org Git - collecty.git/blame - src/collecty/daemon.py
psi: Add graph template
[collecty.git] / src / collecty / daemon.py
CommitLineData
f37913e8 1#!/usr/bin/python3
73db5226
MT
2###############################################################################
3# #
4# collecty - A system statistics collection daemon for IPFire #
5# Copyright (C) 2012 IPFire development team #
6# #
7# This program is free software: you can redistribute it and/or modify #
8# it under the terms of the GNU General Public License as published by #
9# the Free Software Foundation, either version 3 of the License, or #
10# (at your option) any later version. #
11# #
12# This program is distributed in the hope that it will be useful, #
13# but WITHOUT ANY WARRANTY; without even the implied warranty of #
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15# GNU General Public License for more details. #
16# #
17# You should have received a copy of the GNU General Public License #
18# along with this program. If not, see <http://www.gnu.org/licenses/>. #
19# #
20###############################################################################
21
16b84672 22import logging
682b512d 23import os
72364063 24import rrdtool
6e603f14 25import sched
73db5226 26import signal
6d7f3cac
MT
27import tarfile
28import tempfile
2c0fa15e 29import threading
72364063 30import time
73db5226 31
f37913e8
MT
32from . import bus
33from . import plugins
73db5226 34
f37913e8
MT
35from .constants import *
36from .i18n import _
73db5226 37
73db5226
MT
38log = logging.getLogger("collecty")
39
ecb9b401 40class Daemon(object):
73db5226 41 # The default interval, when all data is written to disk.
6e603f14 42 COMMIT_INTERVAL = 300
72364063 43
73db5226 44 def __init__(self, debug=False):
72364063
MT
45 self.debug = debug
46
682b512d
MT
47 # Reset timezone to UTC
48 # rrdtool is reading that from the environment
49 os.environ["TZ"] = "UTC"
50
a76917bf 51 # Enable debug logging when running in debug mode
72364063 52 if self.debug:
a76917bf
MT
53 log.setLevel(logging.DEBUG)
54
5d140577 55 self.plugins = []
73db5226 56
6e603f14
MT
57 # Create the scheduler
58 self.scheduler = sched.scheduler()
59 self._schedule_commit()
72364063
MT
60
61 # The write queue holds all collected pieces of data which
62 # will be written to disk later.
6e603f14 63 self.write_queue = WriteQueue(self)
49c1b8fd 64
c968f6d9
MT
65 # Create a thread that connects to dbus and processes requests we
66 # get from there.
67 self.bus = bus.Bus(self)
68
8647f0b2 69 log.debug(_("Collecty successfully initialized"))
73db5226 70
5d140577
MT
71 def add_plugin(self, plugin_class):
72 # Try initialising a new plugin. If that fails, we will log the
73 # error and try to go on.
74 try:
75 plugin = plugin_class(self)
76 except:
77 log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True)
78 return
73db5226 79
5d140577 80 self.plugins.append(plugin)
73db5226 81
8647f0b2
MT
82 # Collect immediately
83 self._schedule_plugin(plugin, interval=0)
84
c968f6d9
MT
85 @property
86 def templates(self):
87 for plugin in self.plugins:
88 for template in plugin.templates:
89 yield template
90
8647f0b2 91 def _schedule_plugin(self, plugin, interval=None):
6e603f14
MT
92 """
93 Schedules a collection event for the given plugin
94 """
95 log.debug("Scheduling plugin %s for executing in %ss" % (plugin, plugin.interval))
96
97 self.scheduler.enter(
8647f0b2 98 plugin.interval if interval is None else interval, plugin.priority, self._collect, (plugin,),
6e603f14
MT
99 )
100
101 def _schedule_commit(self):
102 log.debug("Scheduling commit in %ss" % self.COMMIT_INTERVAL)
103
104 self.scheduler.enter(
105 self.COMMIT_INTERVAL, -1, self._commit,
106 )
107
108 def _collect(self, plugin, **kwargs):
109 """
110 Called for each plugin when it is time to collect some data
111 """
112 log.debug("Collection started for %s" % plugin)
113
114 # Add the next collection event to the scheduler
115 self._schedule_plugin(plugin)
116
117 # Run collection
50ac7c3f 118 plugin.collect()
6e603f14
MT
119
120 def _commit(self):
121 """
122 Called when all data should be committed to disk
123 """
124 # Schedule the next commit
125 self._schedule_commit()
126
127 # Write everything in the queue
128 self.write_queue.commit()
129
73db5226
MT
130 def run(self):
131 # Register signal handlers.
132 self.register_signal_handler()
133
c968f6d9
MT
134 # Start the bus
135 self.bus.start()
136
8647f0b2
MT
137 # Add all plugins
138 for plugin in plugins.get():
139 self.add_plugin(plugin)
084fa197 140
6e603f14
MT
141 # Run the scheduler
142 try:
143 self.scheduler.run()
144 except KeyboardInterrupt:
145 pass
73db5226 146
8647f0b2
MT
147 # Clear all plugins
148 self.plugins.clear()
149
c968f6d9
MT
150 # Stop the bus thread
151 self.bus.shutdown()
152
72364063 153 # Write all collected data to disk before ending the main thread
6e603f14 154 self.write_queue.commit()
72364063
MT
155
156 log.debug(_("Main thread exited"))
73db5226
MT
157
158 def shutdown(self):
0ee0c42d 159 log.info(_("Received shutdown signal"))
73db5226
MT
160
161 def register_signal_handler(self):
162 for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
163 log.debug(_("Registering signal %d") % s)
164
165 signal.signal(s, self.signal_handler)
166
167 def signal_handler(self, sig, *args, **kwargs):
168 log.info(_("Caught signal %d") % sig)
169
170 if sig in (signal.SIGTERM, signal.SIGINT):
171 # Shutdown this application.
172 self.shutdown()
173
174 elif sig == signal.SIGUSR1:
72364063
MT
175 # Commit all data.
176 self.write_queue.commit()
73db5226 177
c968f6d9
MT
178 def get_plugin_from_template(self, template_name):
179 for plugin in self.plugins:
180 if not template_name in [t.name for t in plugin.templates]:
181 continue
182
183 return plugin
184
185 def generate_graph(self, template_name, *args, **kwargs):
186 plugin = self.get_plugin_from_template(template_name)
187 if not plugin:
188 raise RuntimeError("Could not find template %s" % template_name)
189
190 return plugin.generate_graph(template_name, *args, **kwargs)
72364063 191
a3864812
MT
192 def graph_info(self, template_name, *args, **kwargs):
193 plugin = self.get_plugin_from_template(template_name)
194 if not plugin:
195 raise RuntimeError("Could not find template %s" % template_name)
196
197 return plugin.graph_info(template_name, *args, **kwargs)
198
8ee5a71a
MT
199 def last_update(self, template_name, *args, **kwargs):
200 plugin = self.get_plugin_from_template(template_name)
201 if not plugin:
202 raise RuntimeError("Could not find template %s" % template_name)
203
204 return plugin.last_update(*args, **kwargs)
205
6d7f3cac
MT
206 def backup(self, filename):
207 # Write all data to disk first
208 self.write_queue.commit()
209
210 log.info(_("Backing up to %s..." % filename))
211
212 # Opening a compressed tar file with will have all files added to it
213 with tarfile.open(filename, mode="w:gz") as archive:
214 for path, directories, files in os.walk(DATABASE_DIR):
215 for file in files:
216 # Skip any non-RRD files
217 if not file.endswith(".rrd"):
218 continue
219
220 # Compose the full file path
221 file = os.path.join(path, file)
222
223 log.debug(_("Adding %s to backup...") % file)
224
225 with tempfile.NamedTemporaryFile() as t:
226 rrdtool.dump(file, t.name)
227
228 # Add the file to the archive
229 archive.add(
230 t.name, arcname=file[len(DATABASE_DIR):],
231 )
232
233 log.info(_("Backup finished"))
234
72364063 235
6e603f14
MT
236class WriteQueue(object):
237 def __init__(self, collecty):
72364063
MT
238 self.collecty = collecty
239
240 self.log = logging.getLogger("collecty.queue")
72364063 241
1d17077f
MT
242 # Store data here
243 self._data = []
244
2c0fa15e
MT
245 # Lock to make this class thread-safe
246 self._lock = threading.Lock()
247
72364063
MT
248 self.log.debug(_("Initialised write queue"))
249
41cf5f72
MT
250 def submit(self, object, data):
251 """
252 Submit a new data point for object
253 """
254 data = QueueObject(object.file, data)
2c0fa15e
MT
255
256 with self._lock:
1d17077f 257 self._data.append(data)
41cf5f72
MT
258
259 return data
72364063
MT
260
261 def commit(self):
262 """
263 Flushes the read data to disk.
264 """
1d17077f 265 self.log.debug(_("Committing data to disk..."))
72364063
MT
266
267 time_start = time.time()
268
1d17077f 269 # There is nothing to do if the queue is empty
2c0fa15e 270 with self._lock:
1d17077f
MT
271 if not self._data:
272 self.log.debug(_("No data to commit"))
273 return
274
275 # Get all objects from the queue and group them by the RRD file
276 # to commit them all at once
277 results = {}
2c0fa15e 278
1d17077f
MT
279 # Group all datapoints by file
280 for data in self._data:
2c0fa15e 281 try:
1d17077f 282 results[data.file].append(data)
2c0fa15e 283 except KeyError:
1d17077f
MT
284 results[data.file] = [data]
285
286 # Clear the queue
287 self._data.clear()
72364063
MT
288
289 # Write the collected data to disk
1d17077f
MT
290 for filename in sorted(results):
291 self._commit_file(filename, results[filename])
72364063
MT
292
293 duration = time.time() - time_start
294 self.log.debug(_("Emptied write queue in %.2fs") % duration)
295
296 def _commit_file(self, filename, results):
0ee0c42d 297 self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
72364063
MT
298 % { "counter" : len(results), "filename" : filename })
299
1d17077f
MT
300 # Sort data before submitting it to rrdtool
301 results.sort()
302
303 for data in results:
304 self.log.debug(" %s" % data)
72364063 305
50b8fcff
MT
306 try:
307 rrdtool.update(filename, *["%s" % r for r in results])
308
309 # Catch operational errors like unreadable/unwritable RRD databases
310 # or those where the format has changed. The collected data will be lost.
311 except rrdtool.OperationalError as e:
312 self.log.critical(_("Could not update RRD database %s: %s") \
313 % (filename, e))
72364063 314
ca9b9221
MT
315 def commit_file(self, filename):
316 """
317 Commits all data that is in the write queue for the given
318 RRD database.
319 """
320 results, others = [], []
321
322 # We will have to walk through the entire queue since we cannot
323 # ready any items selectively. Everything that belongs to our
324 # transaction is kept. Everything else will be put back into the
325 # queue.
2c0fa15e 326 with self._lock:
1d17077f
MT
327 for data in self._data:
328 if data.file == filename:
329 results.append(data)
2c0fa15e 330 else:
1d17077f 331 others.append(data)
2c0fa15e
MT
332
333 # Put back all items that did not match
1d17077f 334 self._data = others
ca9b9221
MT
335
336 # Write everything else to disk
337 if results:
338 self._commit_file(filename, results)
339
72364063
MT
340
341class QueueObject(object):
41cf5f72 342 def __init__(self, file, data):
72364063 343 self.file = file
41cf5f72
MT
344 self.data = self._format_data(data)
345
346 # Save current timestamp
347 self.time = time.time()
72364063
MT
348
349 def __str__(self):
41cf5f72 350 return "%.0f:%s" % (self.time, self.data)
72364063 351
f37913e8 352 def __lt__(self, other):
41cf5f72
MT
353 if isinstance(other, self.__class__):
354 return self.time < other.time
355
356 return NotImplemented
357
358 @staticmethod
359 def _format_data(data):
41cf5f72
MT
360 # Replace all Nones by UNKNOWN
361 s = []
362
363 for e in data:
364 if e is None:
365 e = "U"
366
367 s.append("%s" % e)
368
369 return ":".join(s)