]>
git.ipfire.org Git - collecty.git/blob - src/collecty/daemon.py
2 ###############################################################################
4 # collecty - A system statistics collection daemon for IPFire #
5 # Copyright (C) 2012 IPFire development team #
7 # This program is free software: you can redistribute it and/or modify #
8 # it under the terms of the GNU General Public License as published by #
9 # the Free Software Foundation, either version 3 of the License, or #
10 # (at your option) any later version. #
12 # This program is distributed in the hope that it will be useful, #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15 # GNU General Public License for more details. #
17 # You should have received a copy of the GNU General Public License #
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
20 ###############################################################################
35 from .constants
import *
38 log
= logging
.getLogger("collecty")
41 # The default interval, when all data is written to disk.
44 def __init__(self
, debug
=False):
47 # Reset timezone to UTC
48 # rrdtool is reading that from the environment
49 os
.environ
["TZ"] = "UTC"
51 # Enable debug logging when running in debug mode
53 log
.setLevel(logging
.DEBUG
)
57 # Create the scheduler
58 self
.scheduler
= sched
.scheduler()
59 self
._schedule
_commit
()
61 # The write queue holds all collected pieces of data which
62 # will be written to disk later.
63 self
.write_queue
= WriteQueue(self
)
65 # Create a thread that connects to dbus and processes requests we
67 self
.bus
= bus
.Bus(self
)
69 log
.debug(_("Collecty successfully initialized"))
71 def add_plugin(self
, plugin_class
):
72 # Try initialising a new plugin. If that fails, we will log the
73 # error and try to go on.
75 plugin
= plugin_class(self
)
77 log
.critical(_("Plugin %s could not be initialised") % plugin_class
, exc_info
=True)
80 self
.plugins
.append(plugin
)
83 self
._schedule
_plugin
(plugin
, interval
=0)
87 for plugin
in self
.plugins
:
88 for template
in plugin
.templates
:
91 def _schedule_plugin(self
, plugin
, interval
=None):
93 Schedules a collection event for the given plugin
95 log
.debug("Scheduling plugin %s for executing in %ss" % (plugin
, plugin
.interval
))
98 plugin
.interval
if interval
is None else interval
, plugin
.priority
, self
._collect
, (plugin
,),
101 def _schedule_commit(self
):
102 log
.debug("Scheduling commit in %ss" % self
.COMMIT_INTERVAL
)
104 self
.scheduler
.enter(
105 self
.COMMIT_INTERVAL
, -1, self
._commit
,
108 def _collect(self
, plugin
, **kwargs
):
110 Called for each plugin when it is time to collect some data
112 log
.debug("Collection started for %s" % plugin
)
114 # Add the next collection event to the scheduler
115 self
._schedule
_plugin
(plugin
)
122 Called when all data should be committed to disk
124 # Schedule the next commit
125 self
._schedule
_commit
()
127 # Write everything in the queue
128 self
.write_queue
.commit()
131 # Register signal handlers.
132 self
.register_signal_handler()
138 for plugin
in plugins
.get():
139 self
.add_plugin(plugin
)
144 except KeyboardInterrupt:
150 # Stop the bus thread
153 # Write all collected data to disk before ending the main thread
154 self
.write_queue
.commit()
156 log
.debug(_("Main thread exited"))
159 log
.info(_("Received shutdown signal"))
161 def register_signal_handler(self
):
162 for s
in (signal
.SIGTERM
, signal
.SIGINT
, signal
.SIGUSR1
):
163 log
.debug(_("Registering signal %d") % s
)
165 signal
.signal(s
, self
.signal_handler
)
167 def signal_handler(self
, sig
, *args
, **kwargs
):
168 log
.info(_("Caught signal %d") % sig
)
170 if sig
in (signal
.SIGTERM
, signal
.SIGINT
):
171 # Shutdown this application.
174 elif sig
== signal
.SIGUSR1
:
176 self
.write_queue
.commit()
178 def get_plugin_from_template(self
, template_name
):
179 for plugin
in self
.plugins
:
180 if not template_name
in [t
.name
for t
in plugin
.templates
]:
185 def generate_graph(self
, template_name
, *args
, **kwargs
):
186 plugin
= self
.get_plugin_from_template(template_name
)
188 raise RuntimeError("Could not find template %s" % template_name
)
190 return plugin
.generate_graph(template_name
, *args
, **kwargs
)
192 def graph_info(self
, template_name
, *args
, **kwargs
):
193 plugin
= self
.get_plugin_from_template(template_name
)
195 raise RuntimeError("Could not find template %s" % template_name
)
197 return plugin
.graph_info(template_name
, *args
, **kwargs
)
199 def last_update(self
, template_name
, *args
, **kwargs
):
200 plugin
= self
.get_plugin_from_template(template_name
)
202 raise RuntimeError("Could not find template %s" % template_name
)
204 return plugin
.last_update(*args
, **kwargs
)
206 def backup(self
, filename
):
207 # Write all data to disk first
208 self
.write_queue
.commit()
210 log
.info(_("Backing up to %s..." % filename
))
212 # Opening a compressed tar file with will have all files added to it
213 with tarfile
.open(filename
, mode
="w:gz") as archive
:
214 for path
, directories
, files
in os
.walk(DATABASE_DIR
):
216 # Skip any non-RRD files
217 if not file.endswith(".rrd"):
220 # Compose the full file path
221 file = os
.path
.join(path
, file)
223 log
.debug(_("Adding %s to backup...") % file)
225 with tempfile
.NamedTemporaryFile() as t
:
226 rrdtool
.dump(file, t
.name
)
228 # Add the file to the archive
230 t
.name
, arcname
=file[len(DATABASE_DIR
):],
233 log
.info(_("Backup finished"))
236 class WriteQueue(object):
237 def __init__(self
, collecty
):
238 self
.collecty
= collecty
240 self
.log
= logging
.getLogger("collecty.queue")
245 # Lock to make this class thread-safe
246 self
._lock
= threading
.Lock()
248 self
.log
.debug(_("Initialised write queue"))
250 def submit(self
, object, data
):
252 Submit a new data point for object
254 data
= QueueObject(object.file, data
)
257 self
._data
.append(data
)
263 Flushes the read data to disk.
265 self
.log
.debug(_("Committing data to disk..."))
267 time_start
= time
.time()
269 # There is nothing to do if the queue is empty
272 self
.log
.debug(_("No data to commit"))
275 # Get all objects from the queue and group them by the RRD file
276 # to commit them all at once
279 # Group all datapoints by file
280 for data
in self
._data
:
282 results
[data
.file].append(data
)
284 results
[data
.file] = [data
]
289 # Write the collected data to disk
290 for filename
in sorted(results
):
291 self
._commit
_file
(filename
, results
[filename
])
293 duration
= time
.time() - time_start
294 self
.log
.debug(_("Emptied write queue in %.2fs") % duration
)
296 def _commit_file(self
, filename
, results
):
297 self
.log
.debug(_("Committing %(counter)s entries to %(filename)s") \
298 % { "counter" : len(results
), "filename" : filename
})
300 # Sort data before submitting it to rrdtool
304 self
.log
.debug(" %s" % data
)
307 rrdtool
.update(filename
, *["%s" % r
for r
in results
])
309 # Catch operational errors like unreadable/unwritable RRD databases
310 # or those where the format has changed. The collected data will be lost.
311 except rrdtool
.OperationalError
as e
:
312 self
.log
.critical(_("Could not update RRD database %s: %s") \
315 def commit_file(self
, filename
):
317 Commits all data that is in the write queue for the given
320 results
, others
= [], []
322 # We will have to walk through the entire queue since we cannot
323 # ready any items selectively. Everything that belongs to our
324 # transaction is kept. Everything else will be put back into the
327 for data
in self
._data
:
328 if data
.file == filename
:
333 # Put back all items that did not match
336 # Write everything else to disk
338 self
._commit
_file
(filename
, results
)
341 class QueueObject(object):
342 def __init__(self
, file, data
):
344 self
.data
= self
._format
_data
(data
)
346 # Save current timestamp
347 self
.time
= time
.time()
350 return "%.0f:%s" % (self
.time
, self
.data
)
352 def __lt__(self
, other
):
353 if isinstance(other
, self
.__class
__):
354 return self
.time
< other
.time
356 return NotImplemented
359 def _format_data(data
):
360 if not isinstance(data
, tuple) and not isinstance(data
, list):
363 # Replace all Nones by UNKNOWN