pass
class Collecty(object):
+ # The default interval, when all data is written to disk.
+ SUBMIT_INTERVAL = 300
+
+ HEARTBEAT = 2
+
def __init__(self):
self.config = configparser.ConfigParser()
self.instances = []
+ # Indicates whether this process should be running or not.
+ self.running = True
+
# Add all automatic plugins.
self.add_autocreate_plugins()
for i in self.instances:
i.start()
- # As long as at least one thread is alive, the main process
- # is in a while loop.
- while any([i.isAlive() for i in self.instances]):
- time.sleep(0.5)
+ # Regularly submit all data to disk.
+ counter = self.SUBMIT_INTERVAL / self.HEARTBEAT
+ while self.running:
+ time.sleep(self.HEARTBEAT)
+ counter -= 1
+
+ if counter == 0:
+ self.submit_all()
+ counter = self.SUBMIT_INTERVAL / self.HEARTBEAT
+
+ # Wait until all instances are finished.
+ while self.instances:
+ for instance in self.instances[:]:
+ if not instance.isAlive():
+ log.debug(_("%s is not alive anymore. Removing.") % instance)
+ self.instances.remove(instance)
+
+ # Wait a bit.
+ time.sleep(0.1)
log.debug(_("No thread running. Exiting main thread."))
+ def submit_all(self):
+ """
+ Submit all data right now.
+ """
+ for i in self.instances:
+ i._submit()
+
def shutdown(self):
log.debug(_("Received shutdown signal"))
+ self.running = False
+
# Propagating shutdown to all threads.
for i in self.instances:
i.shutdown()
rrdtool.update(self.file, *self.data)
self.data = []
- def __read(self, *args, **kwargs):
+ def _read(self, *args, **kwargs):
"""
This method catches errors from the read() method and logs them.
"""
except Exception, e:
self.log.critical(_("Unhandled exception in read()!"), exc_info=True)
- def __submit(self, *args, **kwargs):
+ def _submit(self, *args, **kwargs):
"""
This method catches errors from the submit() method and logs them.
"""
while self.running:
if counter == 0:
self.log.debug(_("Collecting..."))
- self.__read()
+ self._read()
self.log.debug(_("Sleeping for %.4fs.") % self.interval)
time.sleep(self.heartbeat)
counter -= 1
- self.__submit()
+ self._submit()
self.log.debug(_("Stopped."))
def shutdown(self):