]> git.ipfire.org Git - collecty.git/commitdiff
Let the main thread regularly submit all data to disk.
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 4 Aug 2012 16:20:50 +0000 (16:20 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 4 Aug 2012 16:20:50 +0000 (16:20 +0000)
collecty/__init__.py
collecty/plugins/base.py

index 723ec4995475edf2c4c47e05b1b62bc362a77f26..5e684f46069a8101f8cf5e9293122f66a8cb2bb5 100644 (file)
@@ -44,10 +44,18 @@ class ConfigError(Exception):
        pass
 
 class Collecty(object):
+       # The default interval, when all data is written to disk.
+       SUBMIT_INTERVAL = 300
+
+       HEARTBEAT = 2
+
        def __init__(self):
                self.config = configparser.ConfigParser()
                self.instances = []
 
+               # Indicates whether this process should be running or not.
+               self.running = True
+
                # Add all automatic plugins.
                self.add_autocreate_plugins()
 
@@ -101,16 +109,40 @@ class Collecty(object):
                for i in self.instances:
                        i.start()
 
-               # As long as at least one thread is alive, the main process
-               # is in a while loop.
-               while any([i.isAlive() for i in self.instances]):
-                       time.sleep(0.5)
+               # Regularly submit all data to disk.
+               counter = self.SUBMIT_INTERVAL / self.HEARTBEAT
+               while self.running:
+                       time.sleep(self.HEARTBEAT)
+                       counter -= 1
+
+                       if counter == 0:
+                               self.submit_all()
+                               counter = self.SUBMIT_INTERVAL / self.HEARTBEAT
+
+               # Wait until all instances are finished.
+               while self.instances:
+                       for instance in self.instances[:]:
+                               if not instance.isAlive():
+                                       log.debug(_("%s is not alive anymore. Removing.") % instance)
+                                       self.instances.remove(instance)
+
+                       # Wait a bit.
+                       time.sleep(0.1)
 
                log.debug(_("No thread running. Exiting main thread."))
 
+       def submit_all(self):
+               """
+                       Submit all data right now.
+               """
+               for i in self.instances:
+                       i._submit()
+
        def shutdown(self):
                log.debug(_("Received shutdown signal"))
 
+               self.running = False
+
                # Propagating shutdown to all threads.
                for i in self.instances:
                        i.shutdown()
index fb05bab6be318a2e96019cb0ec59627cb4c1cdc3..472a3a0c7e6d810fbfcea05f55ea175ba856be1e 100644 (file)
@@ -139,7 +139,7 @@ class Plugin(threading.Thread):
                rrdtool.update(self.file, *self.data)
                self.data = []
 
-       def __read(self, *args, **kwargs):
+       def _read(self, *args, **kwargs):
                """
                        This method catches errors from the read() method and logs them.
                """
@@ -150,7 +150,7 @@ class Plugin(threading.Thread):
                except Exception, e:
                        self.log.critical(_("Unhandled exception in read()!"), exc_info=True)
 
-       def __submit(self, *args, **kwargs):
+       def _submit(self, *args, **kwargs):
                """
                        This method catches errors from the submit() method and logs them.
                """
@@ -168,7 +168,7 @@ class Plugin(threading.Thread):
                while self.running:
                        if counter == 0:
                                self.log.debug(_("Collecting..."))
-                               self.__read()
+                               self._read()
 
                                self.log.debug(_("Sleeping for %.4fs.") % self.interval)
 
@@ -177,7 +177,7 @@ class Plugin(threading.Thread):
                        time.sleep(self.heartbeat)
                        counter -= 1
 
-               self.__submit()
+               self._submit()
                self.log.debug(_("Stopped."))
 
        def shutdown(self):