]> git.ipfire.org Git - oddments/cappie.git/blobdiff - cappie/queue.py
Merge branch 'master' of ssh://git.ipfire.org/pub/git/oddments/cappie
[oddments/cappie.git] / cappie / queue.py
index 0db700635652633577e0ceeb28d9ea0712f415be..e690d136235dab0018114af2bf8b2b9438b1331f 100644 (file)
@@ -25,11 +25,13 @@ from threading import Thread
 
 import util
 
+from database import Database
 from errors import *
+from events import *
 
 class Queue(Thread):
        heartbeat = 1.0
-       maxitems = 100
+       maxitems = 10000
 
        def __init__(self, log):
                Thread.__init__(self)
@@ -39,6 +41,9 @@ class Queue(Thread):
                self.__running = True
                self.__queue = []
 
+               self.db = Database(log)
+               self.lastgc = None
+
        def __len__(self):
                return self.length
 
@@ -57,12 +62,16 @@ class Queue(Thread):
 
                util.setprocname("queue")
 
+               self.db.open()
+
                while self.__running or self.__queue:
                        if not self.__queue:
                                #self.log.debug("Queue sleeping for %s seconds" % self.heartbeat)
                                time.sleep(self.heartbeat)
                                continue
 
+                       self._checkGc()
+
                        event = self.__queue.pop(0)
                        self.log.debug("Processing queue event: %s" % event)
                        try:
@@ -70,6 +79,8 @@ class Queue(Thread):
                        except EventException, e:
                                self.log.error("Catched event exception: %s" % e)
 
+               self.db.close()
+
        def shutdown(self):
                self.__running = False
                self.log.debug("Shutting down queue")
@@ -77,3 +88,8 @@ class Queue(Thread):
 
                # Wait until queue handled all events
                self.join()
+
+       def _checkGc(self):
+               if not self.lastgc or self.lastgc <= (time.time() - DB_GC_INTERVAL):
+                       self.add(EventGarbageCollector(self.db, self.log))
+                       self.lastgc = time.time()