]> git.ipfire.org Git - oddments/collecty.git/blob - src/collecty/daemon.py
daemon: Make WriteQueue thread-safe
[oddments/collecty.git] / src / collecty / daemon.py
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # collecty - A system statistics collection daemon for IPFire #
5 # Copyright (C) 2012 IPFire development team #
6 # #
7 # This program is free software: you can redistribute it and/or modify #
8 # it under the terms of the GNU General Public License as published by #
9 # the Free Software Foundation, either version 3 of the License, or #
10 # (at your option) any later version. #
11 # #
12 # This program is distributed in the hope that it will be useful, #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15 # GNU General Public License for more details. #
16 # #
17 # You should have received a copy of the GNU General Public License #
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
19 # #
20 ###############################################################################
21
22 import logging
23 import os
24 import queue
25 import rrdtool
26 import sched
27 import signal
28 import tarfile
29 import tempfile
30 import threading
31 import time
32
33 from . import bus
34 from . import plugins
35
36 from .constants import *
37 from .i18n import _
38
39 log = logging.getLogger("collecty")
40
41 class Collecty(object):
42 # The default interval, when all data is written to disk.
43 COMMIT_INTERVAL = 300
44
45 def __init__(self, debug=False):
46 self.debug = debug
47
48 # Reset timezone to UTC
49 # rrdtool is reading that from the environment
50 os.environ["TZ"] = "UTC"
51
52 # Enable debug logging when running in debug mode
53 if self.debug:
54 log.setLevel(logging.DEBUG)
55
56 self.plugins = []
57
58 # Create the scheduler
59 self.scheduler = sched.scheduler()
60 self._schedule_commit()
61
62 # The write queue holds all collected pieces of data which
63 # will be written to disk later.
64 self.write_queue = WriteQueue(self)
65
66 # Create a thread that connects to dbus and processes requests we
67 # get from there.
68 self.bus = bus.Bus(self)
69
70 log.debug(_("Collecty successfully initialized"))
71
72 def add_plugin(self, plugin_class):
73 # Try initialising a new plugin. If that fails, we will log the
74 # error and try to go on.
75 try:
76 plugin = plugin_class(self)
77 except:
78 log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True)
79 return
80
81 self.plugins.append(plugin)
82
83 # Collect immediately
84 self._schedule_plugin(plugin, interval=0)
85
86 @property
87 def templates(self):
88 for plugin in self.plugins:
89 for template in plugin.templates:
90 yield template
91
92 def _schedule_plugin(self, plugin, interval=None):
93 """
94 Schedules a collection event for the given plugin
95 """
96 log.debug("Scheduling plugin %s for executing in %ss" % (plugin, plugin.interval))
97
98 self.scheduler.enter(
99 plugin.interval if interval is None else interval, plugin.priority, self._collect, (plugin,),
100 )
101
102 def _schedule_commit(self):
103 log.debug("Scheduling commit in %ss" % self.COMMIT_INTERVAL)
104
105 self.scheduler.enter(
106 self.COMMIT_INTERVAL, -1, self._commit,
107 )
108
109 def _collect(self, plugin, **kwargs):
110 """
111 Called for each plugin when it is time to collect some data
112 """
113 log.debug("Collection started for %s" % plugin)
114
115 # Add the next collection event to the scheduler
116 self._schedule_plugin(plugin)
117
118 # Run collection
119 plugin.collect()
120
121 def _commit(self):
122 """
123 Called when all data should be committed to disk
124 """
125 # Schedule the next commit
126 self._schedule_commit()
127
128 # Write everything in the queue
129 self.write_queue.commit()
130
131 def run(self):
132 # Register signal handlers.
133 self.register_signal_handler()
134
135 # Start the bus
136 self.bus.start()
137
138 # Add all plugins
139 for plugin in plugins.get():
140 self.add_plugin(plugin)
141
142 # Run the scheduler
143 try:
144 self.scheduler.run()
145 except KeyboardInterrupt:
146 pass
147
148 # Clear all plugins
149 self.plugins.clear()
150
151 # Stop the bus thread
152 self.bus.shutdown()
153
154 # Write all collected data to disk before ending the main thread
155 self.write_queue.commit()
156
157 log.debug(_("Main thread exited"))
158
159 def shutdown(self):
160 log.info(_("Received shutdown signal"))
161
162 def register_signal_handler(self):
163 for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
164 log.debug(_("Registering signal %d") % s)
165
166 signal.signal(s, self.signal_handler)
167
168 def signal_handler(self, sig, *args, **kwargs):
169 log.info(_("Caught signal %d") % sig)
170
171 if sig in (signal.SIGTERM, signal.SIGINT):
172 # Shutdown this application.
173 self.shutdown()
174
175 elif sig == signal.SIGUSR1:
176 # Commit all data.
177 self.write_queue.commit()
178
179 def get_plugin_from_template(self, template_name):
180 for plugin in self.plugins:
181 if not template_name in [t.name for t in plugin.templates]:
182 continue
183
184 return plugin
185
186 def generate_graph(self, template_name, *args, **kwargs):
187 plugin = self.get_plugin_from_template(template_name)
188 if not plugin:
189 raise RuntimeError("Could not find template %s" % template_name)
190
191 return plugin.generate_graph(template_name, *args, **kwargs)
192
193 def graph_info(self, template_name, *args, **kwargs):
194 plugin = self.get_plugin_from_template(template_name)
195 if not plugin:
196 raise RuntimeError("Could not find template %s" % template_name)
197
198 return plugin.graph_info(template_name, *args, **kwargs)
199
200 def last_update(self, template_name, *args, **kwargs):
201 plugin = self.get_plugin_from_template(template_name)
202 if not plugin:
203 raise RuntimeError("Could not find template %s" % template_name)
204
205 return plugin.last_update(*args, **kwargs)
206
207 def backup(self, filename):
208 # Write all data to disk first
209 self.write_queue.commit()
210
211 log.info(_("Backing up to %s..." % filename))
212
213 # Opening a compressed tar file with will have all files added to it
214 with tarfile.open(filename, mode="w:gz") as archive:
215 for path, directories, files in os.walk(DATABASE_DIR):
216 for file in files:
217 # Skip any non-RRD files
218 if not file.endswith(".rrd"):
219 continue
220
221 # Compose the full file path
222 file = os.path.join(path, file)
223
224 log.debug(_("Adding %s to backup...") % file)
225
226 with tempfile.NamedTemporaryFile() as t:
227 rrdtool.dump(file, t.name)
228
229 # Add the file to the archive
230 archive.add(
231 t.name, arcname=file[len(DATABASE_DIR):],
232 )
233
234 log.info(_("Backup finished"))
235
236
237 class WriteQueue(object):
238 def __init__(self, collecty):
239 self.collecty = collecty
240
241 self.log = logging.getLogger("collecty.queue")
242
243 # Lock to make this class thread-safe
244 self._lock = threading.Lock()
245
246 self._queue = queue.PriorityQueue()
247
248 self.log.debug(_("Initialised write queue"))
249
250 def add(self, object, time, data):
251 result = QueueObject(object.file, time, data)
252
253 with self._lock:
254 self._queue.put(result)
255
256 def commit(self):
257 """
258 Flushes the read data to disk.
259 """
260 # There is nothing to do if the queue is empty
261 if self._queue.empty():
262 self.log.debug(_("No data to commit"))
263 return
264
265 time_start = time.time()
266
267 self.log.debug(_("Submitting data to the databases..."))
268
269 # Get all objects from the queue and group them by the RRD file
270 # to commit them all at once
271 results = {}
272
273 with self._lock:
274 while not self._queue.empty():
275 result = self._queue.get()
276
277 try:
278 results[result.file].append(result)
279 except KeyError:
280 results[result.file] = [result]
281
282 # Write the collected data to disk
283 for filename, results in list(results.items()):
284 self._commit_file(filename, results)
285
286 duration = time.time() - time_start
287 self.log.debug(_("Emptied write queue in %.2fs") % duration)
288
289 def _commit_file(self, filename, results):
290 self.log.debug(_("Committing %(counter)s entries to %(filename)s") \
291 % { "counter" : len(results), "filename" : filename })
292
293 for result in results:
294 self.log.debug(" %s: %s" % (result.time, result.data))
295
296 try:
297 rrdtool.update(filename, *["%s" % r for r in results])
298
299 # Catch operational errors like unreadable/unwritable RRD databases
300 # or those where the format has changed. The collected data will be lost.
301 except rrdtool.OperationalError as e:
302 self.log.critical(_("Could not update RRD database %s: %s") \
303 % (filename, e))
304
305 def commit_file(self, filename):
306 """
307 Commits all data that is in the write queue for the given
308 RRD database.
309 """
310 results, others = [], []
311
312 # We will have to walk through the entire queue since we cannot
313 # ready any items selectively. Everything that belongs to our
314 # transaction is kept. Everything else will be put back into the
315 # queue.
316 with self._lock:
317 while not self._queue.empty():
318 result = self._queue.get()
319
320 if result.file == filename:
321 results.append(result)
322 else:
323 others.append(result)
324
325 # Put back all items that did not match
326 for result in others:
327 self._queue.put(result)
328
329 # Write everything else to disk
330 if results:
331 self._commit_file(filename, results)
332
333
334 class QueueObject(object):
335 def __init__(self, file, time, data):
336 self.file = file
337 self.time = time
338 self.data = data
339
340 def __str__(self):
341 return "%s:%s" % (self.time.strftime("%s"), self.data)
342
343 def __lt__(self, other):
344 return self.time < other.time