Rewrite plugin architecture
[collecty.git] / src / collecty / daemon.py
1 #!/usr/bin/python
2 ###############################################################################
3 #                                                                             #
4 # collecty - A system statistics collection daemon for IPFire                 #
5 # Copyright (C) 2012 IPFire development team                                  #
6 #                                                                             #
7 # This program is free software: you can redistribute it and/or modify        #
8 # it under the terms of the GNU General Public License as published by        #
9 # the Free Software Foundation, either version 3 of the License, or           #
10 # (at your option) any later version.                                         #
11 #                                                                             #
12 # This program is distributed in the hope that it will be useful,             #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of              #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
15 # GNU General Public License for more details.                                #
16 #                                                                             #
17 # You should have received a copy of the GNU General Public License           #
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
19 #                                                                             #
20 ###############################################################################
21
22 import Queue as queue
23 import rrdtool
24 import signal
25 import threading
26 import time
27
28 import plugins
29
30 from constants import *
31 from i18n import _
32
33 import logging
34 log = logging.getLogger("collecty")
35
36 class Collecty(object):
37         # The default interval, when all data is written to disk.
38         SUBMIT_INTERVAL = 300
39
40         HEARTBEAT = 5
41
42         def __init__(self, debug=False):
43                 self.debug = debug
44
45                 # Enable debug logging when running in debug mode
46                 if self.debug:
47                         log.setLevel(logging.DEBUG)
48
49                 self.plugins = []
50
51                 # Indicates whether this process should be running or not.
52                 self.running = True
53
54                 # The write queue holds all collected pieces of data which
55                 # will be written to disk later.
56                 self.write_queue = WriteQueue(self, self.SUBMIT_INTERVAL)
57
58                 # Add all plugins
59                 for plugin in plugins.get():
60                         self.add_plugin(plugin)
61
62                 log.info(_("Collecty successfully initialized with %s plugins") \
63                         % len(self.plugins))
64
65         def add_plugin(self, plugin_class):
66                 # Try initialising a new plugin. If that fails, we will log the
67                 # error and try to go on.
68                 try:
69                         plugin = plugin_class(self)
70                 except:
71                         log.critical(_("Plugin %s could not be initialised") % plugin_class, exc_info=True)
72                         return
73
74                 self.plugins.append(plugin)
75
76         def run(self):
77                 # Register signal handlers.
78                 self.register_signal_handler()
79
80                 # Start all data source threads.
81                 for p in self.plugins:
82                         p.start()
83
84                 # Run the write queue thread
85                 self.write_queue.start()
86
87                 # Regularly submit all data to disk.
88                 while self.running:
89                         try:
90                                 time.sleep(self.HEARTBEAT)
91                         except KeyboardInterrupt:
92                                 self.shutdown()
93                                 break
94
95                 # Wait until all plugins are finished.
96                 for p in self.plugins:
97                         p.join()
98
99                 # Write all collected data to disk before ending the main thread
100                 self.write_queue.shutdown()
101
102                 log.debug(_("Main thread exited"))
103
104         def shutdown(self):
105                 if not self.running:
106                         return
107
108                 log.debug(_("Received shutdown signal"))
109                 self.running = False
110
111                 # Propagating shutdown to all threads.
112                 for p in self.plugins:
113                         p.shutdown()
114
115         def register_signal_handler(self):
116                 for s in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
117                         log.debug(_("Registering signal %d") % s)
118
119                 signal.signal(s, self.signal_handler)
120
121         def signal_handler(self, sig, *args, **kwargs):
122                 log.info(_("Caught signal %d") % sig)
123
124                 if sig in (signal.SIGTERM, signal.SIGINT):
125                         # Shutdown this application.
126                         self.shutdown()
127
128                 elif sig == signal.SIGUSR1:
129                         # Commit all data.
130                         self.write_queue.commit()
131
132         @property
133         def graph_default_arguments(self):
134                 return GRAPH_DEFAULT_ARGUMENTS
135
136
137 class WriteQueue(threading.Thread):
138         def __init__(self, collecty, submit_interval):
139                 threading.Thread.__init__(self)
140                 self.daemon = True
141
142                 self.collecty = collecty
143
144                 self.log = logging.getLogger("collecty.queue")
145                 self.log.setLevel(logging.DEBUG)
146                 self.log.propagate = 1
147
148                 self.timer = plugins.Timer(submit_interval)
149                 self._queue = queue.PriorityQueue()
150
151                 self.log.debug(_("Initialised write queue"))
152
153         def run(self):
154                 self.log.debug(_("Write queue process started"))
155                 self.running = True
156
157                 while self.running:
158                         # Reset the timer.
159                         self.timer.reset()
160
161                         # Wait until the timer has successfully elapsed.
162                         if self.timer.wait():
163                                 self.commit()
164
165                 self.commit()
166                 self.log.debug(_("Write queue process stopped"))
167
168         def shutdown(self):
169                 self.running = False
170                 self.timer.cancel()
171
172                 # Wait until all data has been written.
173                 self.join()
174
175         def add(self, object, time, data):
176                 result = QueueObject(object.file, time, data)
177                 self._queue.put(result)
178
179         def commit(self):
180                 """
181                         Flushes the read data to disk.
182                 """
183                 # There is nothing to do if the queue is empty
184                 if self._queue.empty():
185                         self.log.debug(_("No data to commit"))
186                         return
187
188                 time_start = time.time()
189
190                 self.log.debug(_("Submitting data to the databases..."))
191
192                 # Get all objects from the queue and group them by the RRD file
193                 # to commit them all at once
194                 results = {}
195                 while not self._queue.empty():
196                         result = self._queue.get()
197
198                         try:
199                                 results[result.file].append(result)
200                         except KeyError:
201                                 results[result.file] = [result]
202
203                 # Write the collected data to disk
204                 for filename, results in results.items():
205                         self._commit_file(filename, results)
206
207                 duration = time.time() - time_start
208                 self.log.debug(_("Emptied write queue in %.2fs") % duration)
209
210         def _commit_file(self, filename, results):
211                 self.log.debug(_("Committing %(counter)s entries to %(filename)s:") \
212                         % { "counter" : len(results), "filename" : filename })
213
214                 if self.collecty.debug:
215                         for result in results:
216                                 self.log.debug("  %s: %s" % (result.time, result.data))
217
218                 rrdtool.update(filename, *["%s" % r for r in results])
219
220
221 class QueueObject(object):
222         def __init__(self, file, time, data):
223                 self.file = file
224                 self.time = time
225                 self.data = data
226
227         def __str__(self):
228                 return "%s:%s" % (self.time.strftime("%s"), self.data)
229
230         def __cmp__(self, other):
231                 return cmp(self.time, other.time)