]>
Commit | Line | Data |
---|---|---|
73db5226 MT |
1 | #!/usr/bin/python |
2 | ############################################################################### | |
3 | # # | |
4 | # collecty - A system statistics collection daemon for IPFire # | |
5 | # Copyright (C) 2012 IPFire development team # | |
6 | # # | |
7 | # This program is free software: you can redistribute it and/or modify # | |
8 | # it under the terms of the GNU General Public License as published by # | |
9 | # the Free Software Foundation, either version 3 of the License, or # | |
10 | # (at your option) any later version. # | |
11 | # # | |
12 | # This program is distributed in the hope that it will be useful, # | |
13 | # but WITHOUT ANY WARRANTY; without even the implied warranty of # | |
14 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # | |
15 | # GNU General Public License for more details. # | |
16 | # # | |
17 | # You should have received a copy of the GNU General Public License # | |
18 | # along with this program. If not, see <http://www.gnu.org/licenses/>. # | |
19 | # # | |
20 | ############################################################################### | |
21 | ||
72364063 MT |
22 | import Queue as queue |
23 | import rrdtool | |
73db5226 | 24 | import signal |
72364063 MT |
25 | import threading |
26 | import time | |
73db5226 | 27 | |
c968f6d9 | 28 | import bus |
73db5226 MT |
29 | import plugins |
30 | ||
31 | from constants import * | |
32 | from i18n import _ | |
33 | ||
34 | import logging | |
35 | log = logging.getLogger("collecty") | |
36 | ||
37 | class Collecty(object): | |
38 | # The default interval, when all data is written to disk. | |
39 | SUBMIT_INTERVAL = 300 | |
40 | ||
72364063 MT |
41 | HEARTBEAT = 5 |
42 | ||
73db5226 | 43 | def __init__(self, debug=False): |
72364063 MT |
44 | self.debug = debug |
45 | ||
a76917bf | 46 | # Enable debug logging when running in debug mode |
72364063 | 47 | if self.debug: |
a76917bf MT |
48 | log.setLevel(logging.DEBUG) |
49 | ||
5d140577 | 50 | self.plugins = [] |
73db5226 MT |
51 | |
52 | # Indicates whether this process should be running or not. | |
53 | self.running = True | |
72364063 MT |
54 | |
55 | # The write queue holds all collected pieces of data which | |
56 | # will be written to disk later. | |
57 | self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL) | |
73db5226 | 58 | |
c968f6d9 MT |
59 | # Create a thread that connects to dbus and processes requests we |
60 | # get from there. | |
61 | self.bus = bus.Bus(self) | |
62 | ||
5d140577 MT |
63 | # Add all plugins |
64 | for plugin in plugins.get(): | |
65 | self.add_plugin(plugin) | |
73db5226 | 66 | |
0ee0c42d | 67 | log.debug(_("Collecty successfully initialized with %s plugins") \ |
5d140577 | 68 | % len(self.plugins)) |
73db5226 | 69 | |
5d140577 MT |
70 | def add_plugin(self, plugin_class): |
71 | # Try initialising a new plugin. If that fails, we will log the | |
72 | # error and try to go on. | |
73 | try: | |
74 | plugin = plugin_class(self) | |
75 | except: | |
76 | log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True) | |
77 | return | |
73db5226 | 78 | |
5d140577 | 79 | self.plugins.append(plugin) |
73db5226 | 80 | |
c968f6d9 MT |
81 | @property |
82 | def templates(self): | |
83 | for plugin in self.plugins: | |
84 | for template in plugin.templates: | |
85 | yield template | |
86 | ||
73db5226 MT |
87 | def run(self): |
88 | # Register signal handlers. | |
89 | self.register_signal_handler() | |
90 | ||
c968f6d9 MT |
91 | # Start the bus |
92 | self.bus.start() | |
93 | ||
b1ea4956 | 94 | # Start all data source threads. |
5d140577 MT |
95 | for p in self.plugins: |
96 | p.start() | |
73db5226 | 97 | |
72364063 MT |
98 | # Run the write queue thread |
99 | self.write_queue.start() | |
100 | ||
73db5226 MT |
101 | # Regularly submit all data to disk. |
102 | while self.running: | |
72364063 MT |
103 | try: |
104 | time.sleep(self.HEARTBEAT) | |
105 | except KeyboardInterrupt: | |
106 | self.shutdown() | |
107 | break | |
73db5226 | 108 | |
72364063 | 109 | # Wait until all plugins are finished. |
5d140577 | 110 | for p in self.plugins: |
72364063 | 111 | p.join() |
73db5226 | 112 | |
c968f6d9 MT |
113 | # Stop the bus thread |
114 | self.bus.shutdown() | |
115 | ||
72364063 MT |
116 | # Write all collected data to disk before ending the main thread |
117 | self.write_queue.shutdown() | |
118 | ||
119 | log.debug(_("Main thread exited")) | |
73db5226 MT |
120 | |
121 | def shutdown(self): | |
72364063 MT |
122 | if not self.running: |
123 | return | |
73db5226 | 124 | |
0ee0c42d | 125 | log.info(_("Received shutdown signal")) |
73db5226 | 126 | self.running = False |
73db5226 MT |
127 | |
128 | # Propagating shutdown to all threads. | |
5d140577 MT |
129 | for p in self.plugins: |
130 | p.shutdown() | |
73db5226 MT |
131 | |
132 | def register_signal_handler(self): | |
133 | for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1): | |
134 | log.debug(_("Registering signal %d") % s) | |
135 | ||
136 | signal.signal(s, self.signal_handler) | |
137 | ||
138 | def signal_handler(self, sig, *args, **kwargs): | |
139 | log.info(_("Caught signal %d") % sig) | |
140 | ||
141 | if sig in (signal.SIGTERM, signal.SIGINT): | |
142 | # Shutdown this application. | |
143 | self.shutdown() | |
144 | ||
145 | elif sig == signal.SIGUSR1: | |
72364063 MT |
146 | # Commit all data. |
147 | self.write_queue.commit() | |
73db5226 | 148 | |
c968f6d9 MT |
149 | def get_plugin_from_template(self, template_name): |
150 | for plugin in self.plugins: | |
151 | if not template_name in [t.name for t in plugin.templates]: | |
152 | continue | |
153 | ||
154 | return plugin | |
155 | ||
156 | def generate_graph(self, template_name, *args, **kwargs): | |
157 | plugin = self.get_plugin_from_template(template_name) | |
158 | if not plugin: | |
159 | raise RuntimeError("Could not find template %s" % template_name) | |
160 | ||
161 | return plugin.generate_graph(template_name, *args, **kwargs) | |
72364063 MT |
162 | |
163 | ||
164 | class WriteQueue(threading.Thread): | |
165 | def __init__(self, collecty, submit_interval): | |
166 | threading.Thread.__init__(self) | |
167 | self.daemon = True | |
168 | ||
169 | self.collecty = collecty | |
170 | ||
171 | self.log = logging.getLogger("collecty.queue") | |
72364063 MT |
172 | self.log.propagate = 1 |
173 | ||
174 | self.timer = plugins.Timer(submit_interval) | |
175 | self._queue = queue.PriorityQueue() | |
176 | ||
177 | self.log.debug(_("Initialised write queue")) | |
178 | ||
179 | def run(self): | |
180 | self.log.debug(_("Write queue process started")) | |
181 | self.running = True | |
182 | ||
183 | while self.running: | |
184 | # Reset the timer. | |
185 | self.timer.reset() | |
186 | ||
187 | # Wait until the timer has successfully elapsed. | |
188 | if self.timer.wait(): | |
189 | self.commit() | |
190 | ||
191 | self.commit() | |
192 | self.log.debug(_("Write queue process stopped")) | |
193 | ||
194 | def shutdown(self): | |
195 | self.running = False | |
196 | self.timer.cancel() | |
197 | ||
198 | # Wait until all data has been written. | |
199 | self.join() | |
200 | ||
201 | def add(self, object, time, data): | |
202 | result = QueueObject(object.file, time, data) | |
203 | self._queue.put(result) | |
204 | ||
205 | def commit(self): | |
206 | """ | |
207 | Flushes the read data to disk. | |
208 | """ | |
209 | # There is nothing to do if the queue is empty | |
210 | if self._queue.empty(): | |
211 | self.log.debug(_("No data to commit")) | |
212 | return | |
213 | ||
214 | time_start = time.time() | |
215 | ||
216 | self.log.debug(_("Submitting data to the databases...")) | |
217 | ||
218 | # Get all objects from the queue and group them by the RRD file | |
219 | # to commit them all at once | |
220 | results = {} | |
221 | while not self._queue.empty(): | |
222 | result = self._queue.get() | |
223 | ||
224 | try: | |
225 | results[result.file].append(result) | |
226 | except KeyError: | |
227 | results[result.file] = [result] | |
228 | ||
229 | # Write the collected data to disk | |
230 | for filename, results in results.items(): | |
231 | self._commit_file(filename, results) | |
232 | ||
233 | duration = time.time() - time_start | |
234 | self.log.debug(_("Emptied write queue in %.2fs") % duration) | |
235 | ||
236 | def _commit_file(self, filename, results): | |
0ee0c42d | 237 | self.log.debug(_("Committing %(counter)s entries to %(filename)s") \ |
72364063 MT |
238 | % { "counter" : len(results), "filename" : filename }) |
239 | ||
0ee0c42d MT |
240 | for result in results: |
241 | self.log.debug(" %s: %s" % (result.time, result.data)) | |
72364063 MT |
242 | |
243 | rrdtool.update(filename, *["%s" % r for r in results]) | |
244 | ||
245 | ||
246 | class QueueObject(object): | |
247 | def __init__(self, file, time, data): | |
248 | self.file = file | |
249 | self.time = time | |
250 | self.data = data | |
251 | ||
252 | def __str__(self): | |
253 | return "%s:%s" % (self.time.strftime("%s"), self.data) | |
254 | ||
255 | def __cmp__(self, other): | |
256 | return cmp(self.time, other.time) |