From 4dc6b0c93eae269afa6e2597d3620ce152f06c70 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Sat, 4 Aug 2012 16:20:50 +0000 Subject: [PATCH] Let the main thread regularly submit all data to disk. --- collecty/__init__.py | 40 ++++++++++++++++++++++++++++++++++++---- collecty/plugins/base.py | 8 ++++---- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/collecty/__init__.py b/collecty/__init__.py index 723ec49..5e684f4 100644 --- a/collecty/__init__.py +++ b/collecty/__init__.py @@ -44,10 +44,18 @@ class ConfigError(Exception): pass class Collecty(object): + # The default interval, when all data is written to disk. + SUBMIT_INTERVAL = 300 + + HEARTBEAT = 2 + def __init__(self): self.config = configparser.ConfigParser() self.instances = [] + # Indicates whether this process should be running or not. + self.running = True + # Add all automatic plugins. self.add_autocreate_plugins() @@ -101,16 +109,40 @@ class Collecty(object): for i in self.instances: i.start() - # As long as at least one thread is alive, the main process - # is in a while loop. - while any([i.isAlive() for i in self.instances]): - time.sleep(0.5) + # Regularly submit all data to disk. + counter = self.SUBMIT_INTERVAL / self.HEARTBEAT + while self.running: + time.sleep(self.HEARTBEAT) + counter -= 1 + + if counter == 0: + self.submit_all() + counter = self.SUBMIT_INTERVAL / self.HEARTBEAT + + # Wait until all instances are finished. + while self.instances: + for instance in self.instances[:]: + if not instance.isAlive(): + log.debug(_("%s is not alive anymore. Removing.") % instance) + self.instances.remove(instance) + + # Wait a bit. + time.sleep(0.1) log.debug(_("No thread running. Exiting main thread.")) + def submit_all(self): + """ + Submit all data right now. + """ + for i in self.instances: + i._submit() + def shutdown(self): log.debug(_("Received shutdown signal")) + self.running = False + # Propagating shutdown to all threads. for i in self.instances: i.shutdown() diff --git a/collecty/plugins/base.py b/collecty/plugins/base.py index fb05bab..472a3a0 100644 --- a/collecty/plugins/base.py +++ b/collecty/plugins/base.py @@ -139,7 +139,7 @@ class Plugin(threading.Thread): rrdtool.update(self.file, *self.data) self.data = [] - def __read(self, *args, **kwargs): + def _read(self, *args, **kwargs): """ This method catches errors from the read() method and logs them. """ @@ -150,7 +150,7 @@ class Plugin(threading.Thread): except Exception, e: self.log.critical(_("Unhandled exception in read()!"), exc_info=True) - def __submit(self, *args, **kwargs): + def _submit(self, *args, **kwargs): """ This method catches errors from the submit() method and logs them. """ @@ -168,7 +168,7 @@ class Plugin(threading.Thread): while self.running: if counter == 0: self.log.debug(_("Collecting...")) - self.__read() + self._read() self.log.debug(_("Sleeping for %.4fs.") % self.interval) @@ -177,7 +177,7 @@ class Plugin(threading.Thread): time.sleep(self.heartbeat) counter -= 1 - self.__submit() + self._submit() self.log.debug(_("Stopped.")) def shutdown(self): -- 2.47.2