import util
+from database import Database
from errors import *
+from events import *
class Queue(Thread):
heartbeat = 1.0
- maxitems = 100
+ maxitems = 10000
def __init__(self, log):
Thread.__init__(self)
self.__running = True
self.__queue = []
+ self.db = Database(log)
+ self.lastgc = None
+
def __len__(self):
return self.length
util.setprocname("queue")
+ self.db.open()
+
while self.__running or self.__queue:
if not self.__queue:
#self.log.debug("Queue sleeping for %s seconds" % self.heartbeat)
time.sleep(self.heartbeat)
continue
+ self._checkGc()
+
event = self.__queue.pop(0)
self.log.debug("Processing queue event: %s" % event)
try:
except EventException, e:
self.log.error("Catched event exception: %s" % e)
+ self.db.close()
+
def shutdown(self):
self.__running = False
self.log.debug("Shutting down queue")
# Wait until queue handled all events
self.join()
+
+ def _checkGc(self):
+ if not self.lastgc or self.lastgc <= (time.time() - DB_GC_INTERVAL):
+ self.add(EventGarbageCollector(self.db, self.log))
+ self.lastgc = time.time()