]>
Commit | Line | Data |
---|---|---|
73db5226 MT |
1 | #!/usr/bin/python |
2 | ############################################################################### | |
3 | # # | |
4 | # collecty - A system statistics collection daemon for IPFire # | |
5 | # Copyright (C) 2012 IPFire development team # | |
6 | # # | |
7 | # This program is free software: you can redistribute it and/or modify # | |
8 | # it under the terms of the GNU General Public License as published by # | |
9 | # the Free Software Foundation, either version 3 of the License, or # | |
10 | # (at your option) any later version. # | |
11 | # # | |
12 | # This program is distributed in the hope that it will be useful, # | |
13 | # but WITHOUT ANY WARRANTY; without even the implied warranty of # | |
14 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # | |
15 | # GNU General Public License for more details. # | |
16 | # # | |
17 | # You should have received a copy of the GNU General Public License # | |
18 | # along with this program. If not, see <http://www.gnu.org/licenses/>. # | |
19 | # # | |
20 | ############################################################################### | |
21 | ||
72364063 MT |
22 | import Queue as queue |
23 | import rrdtool | |
73db5226 | 24 | import signal |
72364063 MT |
25 | import threading |
26 | import time | |
73db5226 | 27 | |
73db5226 MT |
28 | import plugins |
29 | ||
30 | from constants import * | |
31 | from i18n import _ | |
32 | ||
33 | import logging | |
34 | log = logging.getLogger("collecty") | |
35 | ||
36 | class Collecty(object): | |
37 | # The default interval, when all data is written to disk. | |
38 | SUBMIT_INTERVAL = 300 | |
39 | ||
72364063 MT |
40 | HEARTBEAT = 5 |
41 | ||
73db5226 | 42 | def __init__(self, debug=False): |
72364063 MT |
43 | self.debug = debug |
44 | ||
a76917bf | 45 | # Enable debug logging when running in debug mode |
72364063 | 46 | if self.debug: |
a76917bf MT |
47 | log.setLevel(logging.DEBUG) |
48 | ||
5d140577 | 49 | self.plugins = [] |
73db5226 MT |
50 | |
51 | # Indicates whether this process should be running or not. | |
52 | self.running = True | |
72364063 MT |
53 | |
54 | # The write queue holds all collected pieces of data which | |
55 | # will be written to disk later. | |
56 | self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL) | |
73db5226 | 57 | |
5d140577 MT |
58 | # Add all plugins |
59 | for plugin in plugins.get(): | |
60 | self.add_plugin(plugin) | |
73db5226 | 61 | |
5d140577 MT |
62 | log.info(_("Collecty successfully initialized with %s plugins") \ |
63 | % len(self.plugins)) | |
73db5226 | 64 | |
5d140577 MT |
65 | def add_plugin(self, plugin_class): |
66 | # Try initialising a new plugin. If that fails, we will log the | |
67 | # error and try to go on. | |
68 | try: | |
69 | plugin = plugin_class(self) | |
70 | except: | |
71 | log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True) | |
72 | return | |
73db5226 | 73 | |
5d140577 | 74 | self.plugins.append(plugin) |
73db5226 | 75 | |
73db5226 MT |
76 | def run(self): |
77 | # Register signal handlers. | |
78 | self.register_signal_handler() | |
79 | ||
b1ea4956 | 80 | # Start all data source threads. |
5d140577 MT |
81 | for p in self.plugins: |
82 | p.start() | |
73db5226 | 83 | |
72364063 MT |
84 | # Run the write queue thread |
85 | self.write_queue.start() | |
86 | ||
73db5226 MT |
87 | # Regularly submit all data to disk. |
88 | while self.running: | |
72364063 MT |
89 | try: |
90 | time.sleep(self.HEARTBEAT) | |
91 | except KeyboardInterrupt: | |
92 | self.shutdown() | |
93 | break | |
73db5226 | 94 | |
72364063 | 95 | # Wait until all plugins are finished. |
5d140577 | 96 | for p in self.plugins: |
72364063 | 97 | p.join() |
73db5226 | 98 | |
72364063 MT |
99 | # Write all collected data to disk before ending the main thread |
100 | self.write_queue.shutdown() | |
101 | ||
102 | log.debug(_("Main thread exited")) | |
73db5226 MT |
103 | |
104 | def shutdown(self): | |
72364063 MT |
105 | if not self.running: |
106 | return | |
73db5226 | 107 | |
72364063 | 108 | log.debug(_("Received shutdown signal")) |
73db5226 | 109 | self.running = False |
73db5226 MT |
110 | |
111 | # Propagating shutdown to all threads. | |
5d140577 MT |
112 | for p in self.plugins: |
113 | p.shutdown() | |
73db5226 MT |
114 | |
115 | def register_signal_handler(self): | |
116 | for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1): | |
117 | log.debug(_("Registering signal %d") % s) | |
118 | ||
119 | signal.signal(s, self.signal_handler) | |
120 | ||
121 | def signal_handler(self, sig, *args, **kwargs): | |
122 | log.info(_("Caught signal %d") % sig) | |
123 | ||
124 | if sig in (signal.SIGTERM, signal.SIGINT): | |
125 | # Shutdown this application. | |
126 | self.shutdown() | |
127 | ||
128 | elif sig == signal.SIGUSR1: | |
72364063 MT |
129 | # Commit all data. |
130 | self.write_queue.commit() | |
73db5226 MT |
131 | |
132 | @property | |
133 | def graph_default_arguments(self): | |
134 | return GRAPH_DEFAULT_ARGUMENTS | |
72364063 MT |
135 | |
136 | ||
137 | class WriteQueue(threading.Thread): | |
138 | def __init__(self, collecty, submit_interval): | |
139 | threading.Thread.__init__(self) | |
140 | self.daemon = True | |
141 | ||
142 | self.collecty = collecty | |
143 | ||
144 | self.log = logging.getLogger("collecty.queue") | |
145 | self.log.setLevel(logging.DEBUG) | |
146 | self.log.propagate = 1 | |
147 | ||
148 | self.timer = plugins.Timer(submit_interval) | |
149 | self._queue = queue.PriorityQueue() | |
150 | ||
151 | self.log.debug(_("Initialised write queue")) | |
152 | ||
153 | def run(self): | |
154 | self.log.debug(_("Write queue process started")) | |
155 | self.running = True | |
156 | ||
157 | while self.running: | |
158 | # Reset the timer. | |
159 | self.timer.reset() | |
160 | ||
161 | # Wait until the timer has successfully elapsed. | |
162 | if self.timer.wait(): | |
163 | self.commit() | |
164 | ||
165 | self.commit() | |
166 | self.log.debug(_("Write queue process stopped")) | |
167 | ||
168 | def shutdown(self): | |
169 | self.running = False | |
170 | self.timer.cancel() | |
171 | ||
172 | # Wait until all data has been written. | |
173 | self.join() | |
174 | ||
175 | def add(self, object, time, data): | |
176 | result = QueueObject(object.file, time, data) | |
177 | self._queue.put(result) | |
178 | ||
179 | def commit(self): | |
180 | """ | |
181 | Flushes the read data to disk. | |
182 | """ | |
183 | # There is nothing to do if the queue is empty | |
184 | if self._queue.empty(): | |
185 | self.log.debug(_("No data to commit")) | |
186 | return | |
187 | ||
188 | time_start = time.time() | |
189 | ||
190 | self.log.debug(_("Submitting data to the databases...")) | |
191 | ||
192 | # Get all objects from the queue and group them by the RRD file | |
193 | # to commit them all at once | |
194 | results = {} | |
195 | while not self._queue.empty(): | |
196 | result = self._queue.get() | |
197 | ||
198 | try: | |
199 | results[result.file].append(result) | |
200 | except KeyError: | |
201 | results[result.file] = [result] | |
202 | ||
203 | # Write the collected data to disk | |
204 | for filename, results in results.items(): | |
205 | self._commit_file(filename, results) | |
206 | ||
207 | duration = time.time() - time_start | |
208 | self.log.debug(_("Emptied write queue in %.2fs") % duration) | |
209 | ||
210 | def _commit_file(self, filename, results): | |
211 | self.log.debug(_("Committing %(counter)s entries to %(filename)s:") \ | |
212 | % { "counter" : len(results), "filename" : filename }) | |
213 | ||
214 | if self.collecty.debug: | |
215 | for result in results: | |
216 | self.log.debug(" %s: %s" % (result.time, result.data)) | |
217 | ||
218 | rrdtool.update(filename, *["%s" % r for r in results]) | |
219 | ||
220 | ||
221 | class QueueObject(object): | |
222 | def __init__(self, file, time, data): | |
223 | self.file = file | |
224 | self.time = time | |
225 | self.data = data | |
226 | ||
227 | def __str__(self): | |
228 | return "%s:%s" % (self.time.strftime("%s"), self.data) | |
229 | ||
230 | def __cmp__(self, other): | |
231 | return cmp(self.time, other.time) |