]>
Commit | Line | Data |
---|---|---|
f37913e8 | 1 | #!/usr/bin/python3 |
73db5226 MT |
2 | ############################################################################### |
3 | # # | |
4 | # collecty - A system statistics collection daemon for IPFire # | |
5 | # Copyright (C) 2012 IPFire development team # | |
6 | # # | |
7 | # This program is free software: you can redistribute it and/or modify # | |
8 | # it under the terms of the GNU General Public License as published by # | |
9 | # the Free Software Foundation, either version 3 of the License, or # | |
10 | # (at your option) any later version. # | |
11 | # # | |
12 | # This program is distributed in the hope that it will be useful, # | |
13 | # but WITHOUT ANY WARRANTY; without even the implied warranty of # | |
14 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # | |
15 | # GNU General Public License for more details. # | |
16 | # # | |
17 | # You should have received a copy of the GNU General Public License # | |
18 | # along with this program. If not, see <http://www.gnu.org/licenses/>. # | |
19 | # # | |
20 | ############################################################################### | |
21 | ||
16b84672 | 22 | import logging |
682b512d | 23 | import os |
72364063 | 24 | import rrdtool |
6e603f14 | 25 | import sched |
73db5226 | 26 | import signal |
6d7f3cac MT |
27 | import tarfile |
28 | import tempfile | |
2c0fa15e | 29 | import threading |
72364063 | 30 | import time |
73db5226 | 31 | |
f37913e8 MT |
32 | from . import bus |
33 | from . import plugins | |
73db5226 | 34 | |
f37913e8 MT |
35 | from .constants import * |
36 | from .i18n import _ | |
73db5226 | 37 | |
73db5226 MT |
38 | log = logging.getLogger("collecty") |
39 | ||
ecb9b401 | 40 | class Daemon(object): |
73db5226 | 41 | # The default interval, when all data is written to disk. |
6e603f14 | 42 | COMMIT_INTERVAL = 300 |
72364063 | 43 | |
73db5226 | 44 | def __init__(self, debug=False): |
72364063 MT |
45 | self.debug = debug |
46 | ||
682b512d MT |
47 | # Reset timezone to UTC |
48 | # rrdtool is reading that from the environment | |
49 | os.environ["TZ"] = "UTC" | |
50 | ||
a76917bf | 51 | # Enable debug logging when running in debug mode |
72364063 | 52 | if self.debug: |
a76917bf MT |
53 | log.setLevel(logging.DEBUG) |
54 | ||
5d140577 | 55 | self.plugins = [] |
73db5226 | 56 | |
6e603f14 MT |
57 | # Create the scheduler |
58 | self.scheduler = sched.scheduler() | |
59 | self._schedule_commit() | |
72364063 MT |
60 | |
61 | # The write queue holds all collected pieces of data which | |
62 | # will be written to disk later. | |
6e603f14 | 63 | self.write_queue = WriteQueue(self) |
49c1b8fd | 64 | |
c968f6d9 MT |
65 | # Create a thread that connects to dbus and processes requests we |
66 | # get from there. | |
67 | self.bus = bus.Bus(self) | |
68 | ||
8647f0b2 | 69 | log.debug(_("Collecty successfully initialized")) |
73db5226 | 70 | |
5d140577 MT |
71 | def add_plugin(self, plugin_class): |
72 | # Try initialising a new plugin. If that fails, we will log the | |
73 | # error and try to go on. | |
74 | try: | |
75 | plugin = plugin_class(self) | |
76 | except: | |
77 | log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True) | |
78 | return | |
73db5226 | 79 | |
5d140577 | 80 | self.plugins.append(plugin) |
73db5226 | 81 | |
8647f0b2 MT |
82 | # Collect immediately |
83 | self._schedule_plugin(plugin, interval=0) | |
84 | ||
c968f6d9 MT |
85 | @property |
86 | def templates(self): | |
87 | for plugin in self.plugins: | |
88 | for template in plugin.templates: | |
89 | yield template | |
90 | ||
8647f0b2 | 91 | def _schedule_plugin(self, plugin, interval=None): |
6e603f14 MT |
92 | """ |
93 | Schedules a collection event for the given plugin | |
94 | """ | |
95 | log.debug("Scheduling plugin %s for executing in %ss" % (plugin, plugin.interval)) | |
96 | ||
97 | self.scheduler.enter( | |
8647f0b2 | 98 | plugin.interval if interval is None else interval, plugin.priority, self._collect, (plugin,), |
6e603f14 MT |
99 | ) |
100 | ||
101 | def _schedule_commit(self): | |
102 | log.debug("Scheduling commit in %ss" % self.COMMIT_INTERVAL) | |
103 | ||
104 | self.scheduler.enter( | |
105 | self.COMMIT_INTERVAL, -1, self._commit, | |
106 | ) | |
107 | ||
108 | def _collect(self, plugin, **kwargs): | |
109 | """ | |
110 | Called for each plugin when it is time to collect some data | |
111 | """ | |
112 | log.debug("Collection started for %s" % plugin) | |
113 | ||
114 | # Add the next collection event to the scheduler | |
115 | self._schedule_plugin(plugin) | |
116 | ||
117 | # Run collection | |
50ac7c3f | 118 | plugin.collect() |
6e603f14 MT |
119 | |
120 | def _commit(self): | |
121 | """ | |
122 | Called when all data should be committed to disk | |
123 | """ | |
124 | # Schedule the next commit | |
125 | self._schedule_commit() | |
126 | ||
127 | # Write everything in the queue | |
128 | self.write_queue.commit() | |
129 | ||
73db5226 MT |
130 | def run(self): |
131 | # Register signal handlers. | |
132 | self.register_signal_handler() | |
133 | ||
c968f6d9 MT |
134 | # Start the bus |
135 | self.bus.start() | |
136 | ||
8647f0b2 MT |
137 | # Add all plugins |
138 | for plugin in plugins.get(): | |
139 | self.add_plugin(plugin) | |
084fa197 | 140 | |
6e603f14 MT |
141 | # Run the scheduler |
142 | try: | |
143 | self.scheduler.run() | |
144 | except KeyboardInterrupt: | |
145 | pass | |
73db5226 | 146 | |
8647f0b2 MT |
147 | # Clear all plugins |
148 | self.plugins.clear() | |
149 | ||
c968f6d9 MT |
150 | # Stop the bus thread |
151 | self.bus.shutdown() | |
152 | ||
72364063 | 153 | # Write all collected data to disk before ending the main thread |
6e603f14 | 154 | self.write_queue.commit() |
72364063 MT |
155 | |
156 | log.debug(_("Main thread exited")) | |
73db5226 MT |
157 | |
158 | def shutdown(self): | |
0ee0c42d | 159 | log.info(_("Received shutdown signal")) |
73db5226 MT |
160 | |
161 | def register_signal_handler(self): | |
162 | for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1): | |
163 | log.debug(_("Registering signal %d") % s) | |
164 | ||
165 | signal.signal(s, self.signal_handler) | |
166 | ||
167 | def signal_handler(self, sig, *args, **kwargs): | |
168 | log.info(_("Caught signal %d") % sig) | |
169 | ||
170 | if sig in (signal.SIGTERM, signal.SIGINT): | |
171 | # Shutdown this application. | |
172 | self.shutdown() | |
173 | ||
174 | elif sig == signal.SIGUSR1: | |
72364063 MT |
175 | # Commit all data. |
176 | self.write_queue.commit() | |
73db5226 | 177 | |
c968f6d9 MT |
178 | def get_plugin_from_template(self, template_name): |
179 | for plugin in self.plugins: | |
180 | if not template_name in [t.name for t in plugin.templates]: | |
181 | continue | |
182 | ||
183 | return plugin | |
184 | ||
185 | def generate_graph(self, template_name, *args, **kwargs): | |
186 | plugin = self.get_plugin_from_template(template_name) | |
187 | if not plugin: | |
188 | raise RuntimeError("Could not find template %s" % template_name) | |
189 | ||
190 | return plugin.generate_graph(template_name, *args, **kwargs) | |
72364063 | 191 | |
a3864812 MT |
192 | def graph_info(self, template_name, *args, **kwargs): |
193 | plugin = self.get_plugin_from_template(template_name) | |
194 | if not plugin: | |
195 | raise RuntimeError("Could not find template %s" % template_name) | |
196 | ||
197 | return plugin.graph_info(template_name, *args, **kwargs) | |
198 | ||
8ee5a71a MT |
199 | def last_update(self, template_name, *args, **kwargs): |
200 | plugin = self.get_plugin_from_template(template_name) | |
201 | if not plugin: | |
202 | raise RuntimeError("Could not find template %s" % template_name) | |
203 | ||
204 | return plugin.last_update(*args, **kwargs) | |
205 | ||
6d7f3cac MT |
206 | def backup(self, filename): |
207 | # Write all data to disk first | |
208 | self.write_queue.commit() | |
209 | ||
210 | log.info(_("Backing up to %s..." % filename)) | |
211 | ||
212 | # Opening a compressed tar file with will have all files added to it | |
213 | with tarfile.open(filename, mode="w:gz") as archive: | |
214 | for path, directories, files in os.walk(DATABASE_DIR): | |
215 | for file in files: | |
216 | # Skip any non-RRD files | |
217 | if not file.endswith(".rrd"): | |
218 | continue | |
219 | ||
220 | # Compose the full file path | |
221 | file = os.path.join(path, file) | |
222 | ||
223 | log.debug(_("Adding %s to backup...") % file) | |
224 | ||
225 | with tempfile.NamedTemporaryFile() as t: | |
226 | rrdtool.dump(file, t.name) | |
227 | ||
228 | # Add the file to the archive | |
229 | archive.add( | |
230 | t.name, arcname=file[len(DATABASE_DIR):], | |
231 | ) | |
232 | ||
233 | log.info(_("Backup finished")) | |
234 | ||
72364063 | 235 | |
6e603f14 MT |
236 | class WriteQueue(object): |
237 | def __init__(self, collecty): | |
72364063 MT |
238 | self.collecty = collecty |
239 | ||
240 | self.log = logging.getLogger("collecty.queue") | |
72364063 | 241 | |
1d17077f MT |
242 | # Store data here |
243 | self._data = [] | |
244 | ||
2c0fa15e MT |
245 | # Lock to make this class thread-safe |
246 | self._lock = threading.Lock() | |
247 | ||
72364063 MT |
248 | self.log.debug(_("Initialised write queue")) |
249 | ||
41cf5f72 MT |
250 | def submit(self, object, data): |
251 | """ | |
252 | Submit a new data point for object | |
253 | """ | |
254 | data = QueueObject(object.file, data) | |
2c0fa15e MT |
255 | |
256 | with self._lock: | |
1d17077f | 257 | self._data.append(data) |
41cf5f72 MT |
258 | |
259 | return data | |
72364063 MT |
260 | |
261 | def commit(self): | |
262 | """ | |
263 | Flushes the read data to disk. | |
264 | """ | |
1d17077f | 265 | self.log.debug(_("Committing data to disk...")) |
72364063 MT |
266 | |
267 | time_start = time.time() | |
268 | ||
1d17077f | 269 | # There is nothing to do if the queue is empty |
2c0fa15e | 270 | with self._lock: |
1d17077f MT |
271 | if not self._data: |
272 | self.log.debug(_("No data to commit")) | |
273 | return | |
274 | ||
275 | # Get all objects from the queue and group them by the RRD file | |
276 | # to commit them all at once | |
277 | results = {} | |
2c0fa15e | 278 | |
1d17077f MT |
279 | # Group all datapoints by file |
280 | for data in self._data: | |
2c0fa15e | 281 | try: |
1d17077f | 282 | results[data.file].append(data) |
2c0fa15e | 283 | except KeyError: |
1d17077f MT |
284 | results[data.file] = [data] |
285 | ||
286 | # Clear the queue | |
287 | self._data.clear() | |
72364063 MT |
288 | |
289 | # Write the collected data to disk | |
1d17077f MT |
290 | for filename in sorted(results): |
291 | self._commit_file(filename, results[filename]) | |
72364063 MT |
292 | |
293 | duration = time.time() - time_start | |
294 | self.log.debug(_("Emptied write queue in %.2fs") % duration) | |
295 | ||
296 | def _commit_file(self, filename, results): | |
0ee0c42d | 297 | self.log.debug(_("Committing %(counter)s entries to %(filename)s") \ |
72364063 MT |
298 | % { "counter" : len(results), "filename" : filename }) |
299 | ||
1d17077f MT |
300 | # Sort data before submitting it to rrdtool |
301 | results.sort() | |
302 | ||
303 | for data in results: | |
304 | self.log.debug(" %s" % data) | |
72364063 | 305 | |
50b8fcff MT |
306 | try: |
307 | rrdtool.update(filename, *["%s" % r for r in results]) | |
308 | ||
309 | # Catch operational errors like unreadable/unwritable RRD databases | |
310 | # or those where the format has changed. The collected data will be lost. | |
311 | except rrdtool.OperationalError as e: | |
312 | self.log.critical(_("Could not update RRD database %s: %s") \ | |
313 | % (filename, e)) | |
72364063 | 314 | |
ca9b9221 MT |
315 | def commit_file(self, filename): |
316 | """ | |
317 | Commits all data that is in the write queue for the given | |
318 | RRD database. | |
319 | """ | |
320 | results, others = [], [] | |
321 | ||
322 | # We will have to walk through the entire queue since we cannot | |
323 | # ready any items selectively. Everything that belongs to our | |
324 | # transaction is kept. Everything else will be put back into the | |
325 | # queue. | |
2c0fa15e | 326 | with self._lock: |
1d17077f MT |
327 | for data in self._data: |
328 | if data.file == filename: | |
329 | results.append(data) | |
2c0fa15e | 330 | else: |
1d17077f | 331 | others.append(data) |
2c0fa15e MT |
332 | |
333 | # Put back all items that did not match | |
1d17077f | 334 | self._data = others |
ca9b9221 MT |
335 | |
336 | # Write everything else to disk | |
337 | if results: | |
338 | self._commit_file(filename, results) | |
339 | ||
72364063 MT |
340 | |
341 | class QueueObject(object): | |
41cf5f72 | 342 | def __init__(self, file, data): |
72364063 | 343 | self.file = file |
41cf5f72 MT |
344 | self.data = self._format_data(data) |
345 | ||
346 | # Save current timestamp | |
347 | self.time = time.time() | |
72364063 MT |
348 | |
349 | def __str__(self): | |
41cf5f72 | 350 | return "%.0f:%s" % (self.time, self.data) |
72364063 | 351 | |
f37913e8 | 352 | def __lt__(self, other): |
41cf5f72 MT |
353 | if isinstance(other, self.__class__): |
354 | return self.time < other.time | |
355 | ||
356 | return NotImplemented | |
357 | ||
358 | @staticmethod | |
359 | def _format_data(data): | |
41cf5f72 MT |
360 | # Replace all Nones by UNKNOWN |
361 | s = [] | |
362 | ||
363 | for e in data: | |
364 | if e is None: | |
365 | e = "U" | |
366 | ||
367 | s.append("%s" % e) | |
368 | ||
369 | return ":".join(s) |