]> git.ipfire.org Git - collecty.git/blame - src/collecty/daemon.py
Tidy up collection and remove double-handling of unexpected exceptions
[collecty.git] / src / collecty / daemon.py
CommitLineData
f37913e8 1#!/usr/bin/python3
73db5226
MT
2###############################################################################
3# #
4# collecty - A system statistics collection daemon for IPFire #
5# Copyright (C) 2012 IPFire development team #
6# #
7# This program is free software: you can redistribute it and/or modify #
8# it under the terms of the GNU General Public License as published by #
9# the Free Software Foundation, either version 3 of the License, or #
10# (at your option) any later version. #
11# #
12# This program is distributed in the hope that it will be useful, #
13# but WITHOUT ANY WARRANTY; without even the implied warranty of #
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15# GNU General Public License for more details. #
16# #
17# You should have received a copy of the GNU General Public License #
18# along with this program. If not, see <http://www.gnu.org/licenses/>. #
19# #
20###############################################################################
21
16b84672 22import logging
682b512d 23import os
f37913e8 24import queue
72364063 25import rrdtool
6e603f14 26import sched
73db5226 27import signal
6d7f3cac
MT
28import tarfile
29import tempfile
72364063 30import time
73db5226 31
f37913e8
MT
32from . import bus
33from . import plugins
73db5226 34
f37913e8
MT
35from .constants import *
36from .i18n import _
73db5226 37
73db5226
MT
38log = logging.getLogger("collecty")
39
40class Collecty(object):
41 # The default interval, when all data is written to disk.
6e603f14 42 COMMIT_INTERVAL = 300
72364063 43
73db5226 44 def __init__(self, debug=False):
72364063
MT
45 self.debug = debug
46
682b512d
MT
47 # Reset timezone to UTC
48 # rrdtool is reading that from the environment
49 os.environ["TZ"] = "UTC"
50
a76917bf 51 # Enable debug logging when running in debug mode
72364063 52 if self.debug:
a76917bf
MT
53 log.setLevel(logging.DEBUG)
54
5d140577 55 self.plugins = []
73db5226 56
6e603f14
MT
57 # Create the scheduler
58 self.scheduler = sched.scheduler()
59 self._schedule_commit()
72364063
MT
60
61 # The write queue holds all collected pieces of data which
62 # will be written to disk later.
6e603f14 63 self.write_queue = WriteQueue(self)
49c1b8fd 64
c968f6d9
MT
65 # Create a thread that connects to dbus and processes requests we
66 # get from there.
67 self.bus = bus.Bus(self)
68
8647f0b2 69 log.debug(_("Collecty successfully initialized"))
73db5226 70
5d140577
MT
71 def add_plugin(self, plugin_class):
72 # Try initialising a new plugin. If that fails, we will log the
73 # error and try to go on.
74 try:
75 plugin = plugin_class(self)
76 except:
77 log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True)
78 return
73db5226 79
5d140577 80 self.plugins.append(plugin)
73db5226 81
8647f0b2
MT
82 # Collect immediately
83 self._schedule_plugin(plugin, interval=0)
84
c968f6d9
MT
85 @property
86 def templates(self):
87 for plugin in self.plugins:
88 for template in plugin.templates:
89 yield template
90
8647f0b2 91 def _schedule_plugin(self, plugin, interval=None):
6e603f14
MT
92 """
93 Schedules a collection event for the given plugin
94 """
95 log.debug("Scheduling plugin %s for executing in %ss" % (plugin, plugin.interval))
96
97 self.scheduler.enter(
8647f0b2 98 plugin.interval if interval is None else interval, plugin.priority, self._collect, (plugin,),
6e603f14
MT
99 )
100
101 def _schedule_commit(self):
102 log.debug("Scheduling commit in %ss" % self.COMMIT_INTERVAL)
103
104 self.scheduler.enter(
105 self.COMMIT_INTERVAL, -1, self._commit,
106 )
107
108 def _collect(self, plugin, **kwargs):
109 """
110 Called for each plugin when it is time to collect some data
111 """
112 log.debug("Collection started for %s" % plugin)
113
114 # Add the next collection event to the scheduler
115 self._schedule_plugin(plugin)
116
117 # Run collection
50ac7c3f 118 plugin.collect()
6e603f14
MT
119
120 def _commit(self):
121 """
122 Called when all data should be committed to disk
123 """
124 # Schedule the next commit
125 self._schedule_commit()
126
127 # Write everything in the queue
128 self.write_queue.commit()
129
73db5226
MT
130 def run(self):
131 # Register signal handlers.
132 self.register_signal_handler()
133
c968f6d9
MT
134 # Start the bus
135 self.bus.start()
136
8647f0b2
MT
137 # Add all plugins
138 for plugin in plugins.get():
139 self.add_plugin(plugin)
084fa197 140
6e603f14
MT
141 # Run the scheduler
142 try:
143 self.scheduler.run()
144 except KeyboardInterrupt:
145 pass
73db5226 146
8647f0b2
MT
147 # Clear all plugins
148 self.plugins.clear()
149
c968f6d9
MT
150 # Stop the bus thread
151 self.bus.shutdown()
152
72364063 153 # Write all collected data to disk before ending the main thread
6e603f14 154 self.write_queue.commit()
72364063
MT
155
156 log.debug(_("Main thread exited"))
73db5226
MT
157
158 def shutdown(self):
0ee0c42d 159 log.info(_("Received shutdown signal"))
73db5226
MT
160
161 def register_signal_handler(self):
162 for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
163 log.debug(_("Registering signal %d") % s)
164
165 signal.signal(s, self.signal_handler)
166
167 def signal_handler(self, sig, *args, **kwargs):
168 log.info(_("Caught signal %d") % sig)
169
170 if sig in (signal.SIGTERM, signal.SIGINT):
171 # Shutdown this application.
172 self.shutdown()
173
174 elif sig == signal.SIGUSR1:
72364063
MT
175 # Commit all data.
176 self.write_queue.commit()
73db5226 177
c968f6d9
MT
178 def get_plugin_from_template(self, template_name):
179 for plugin in self.plugins:
180 if not template_name in [t.name for t in plugin.templates]:
181 continue
182
183 return plugin
184
185 def generate_graph(self, template_name, *args, **kwargs):
186 plugin = self.get_plugin_from_template(template_name)
187 if not plugin:
188 raise RuntimeError("Could not find template %s" % template_name)
189
190 return plugin.generate_graph(template_name, *args, **kwargs)
72364063 191
a3864812
MT
192 def graph_info(self, template_name, *args, **kwargs):
193 plugin = self.get_plugin_from_template(template_name)
194 if not plugin:
195 raise RuntimeError("Could not find template %s" % template_name)
196
197 return plugin.graph_info(template_name, *args, **kwargs)
198
8ee5a71a
MT
199 def last_update(self, template_name, *args, **kwargs):
200 plugin = self.get_plugin_from_template(template_name)
201 if not plugin:
202 raise RuntimeError("Could not find template %s" % template_name)
203
204 return plugin.last_update(*args, **kwargs)
205
6d7f3cac
MT
206 def backup(self, filename):
207 # Write all data to disk first
208 self.write_queue.commit()
209
210 log.info(_("Backing up to %s..." % filename))
211
212 # Opening a compressed tar file with will have all files added to it
213 with tarfile.open(filename, mode="w:gz") as archive:
214 for path, directories, files in os.walk(DATABASE_DIR):
215 for file in files:
216 # Skip any non-RRD files
217 if not file.endswith(".rrd"):
218 continue
219
220 # Compose the full file path
221 file = os.path.join(path, file)
222
223 log.debug(_("Adding %s to backup...") % file)
224
225 with tempfile.NamedTemporaryFile() as t:
226 rrdtool.dump(file, t.name)
227
228 # Add the file to the archive
229 archive.add(
230 t.name, arcname=file[len(DATABASE_DIR):],
231 )
232
233 log.info(_("Backup finished"))
234
72364063 235
6e603f14
MT
236class WriteQueue(object):
237 def __init__(self, collecty):
72364063
MT
238 self.collecty = collecty
239
240 self.log = logging.getLogger("collecty.queue")
72364063 241
72364063
MT
242 self._queue = queue.PriorityQueue()
243
244 self.log.debug(_("Initialised write queue"))
245
72364063
MT
246 def add(self, object, time, data):
247 result = QueueObject(object.file, time, data)
248 self._queue.put(result)
249
250 def commit(self):
251 """
252 Flushes the read data to disk.
253 """
254 # There is nothing to do if the queue is empty
255 if self._queue.empty():
256 self.log.debug(_("No data to commit"))
257 return
258
259 time_start = time.time()
260
261 self.log.debug(_("Submitting data to the databases..."))
262
263 # Get all objects from the queue and group them by the RRD file
264 # to commit them all at once
265 results = {}
266 while not self._queue.empty():
267 result = self._queue.get()
268
269 try:
270 results[result.file].append(result)
271 except KeyError:
272 results[result.file] = [result]
273
274 # Write the collected data to disk
f37913e8 275 for filename, results in list(results.items()):
72364063
MT
276 self._commit_file(filename, results)
277
278 duration = time.time() - time_start
279 self.log.debug(_("Emptied write queue in %.2fs") % duration)
280
281 def _commit_file(self, filename, results):
0ee0c42d 282 self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
72364063
MT
283 % { "counter" : len(results), "filename" : filename })
284
0ee0c42d
MT
285 for result in results:
286 self.log.debug(" %s: %s" % (result.time, result.data))
72364063 287
50b8fcff
MT
288 try:
289 rrdtool.update(filename, *["%s" % r for r in results])
290
291 # Catch operational errors like unreadable/unwritable RRD databases
292 # or those where the format has changed. The collected data will be lost.
293 except rrdtool.OperationalError as e:
294 self.log.critical(_("Could not update RRD database %s: %s") \
295 % (filename, e))
72364063 296
ca9b9221
MT
297 def commit_file(self, filename):
298 """
299 Commits all data that is in the write queue for the given
300 RRD database.
301 """
302 results, others = [], []
303
304 # We will have to walk through the entire queue since we cannot
305 # ready any items selectively. Everything that belongs to our
306 # transaction is kept. Everything else will be put back into the
307 # queue.
308 while not self._queue.empty():
309 result = self._queue.get()
310
311 if result.file == filename:
312 results.append(result)
313 else:
314 others.append(result)
315
316 # Put back all items that did not match
317 for result in others:
318 self._queue.put(result)
319
320 # Write everything else to disk
321 if results:
322 self._commit_file(filename, results)
323
72364063
MT
324
325class QueueObject(object):
326 def __init__(self, file, time, data):
327 self.file = file
328 self.time = time
329 self.data = data
330
331 def __str__(self):
332 return "%s:%s" % (self.time.strftime("%s"), self.data)
333
f37913e8
MT
334 def __lt__(self, other):
335 return self.time < other.time