]> git.ipfire.org Git - oddments/cappie.git/commitdiff
Add a global event queue.
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 17 Apr 2010 12:33:31 +0000 (14:33 +0200)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 17 Apr 2010 12:33:31 +0000 (14:33 +0200)
This queue is supposed to handle events (that have to be defined
later), because we can only handle one event at the same time.

cappie.py

index a99493432c602edd8169e2f55c0774e34c3a3439..55f305950ddeeddc55d94394541b92d412a35a68 100644 (file)
--- a/cappie.py
+++ b/cappie.py
@@ -138,13 +138,15 @@ class Database(object):
 class Interface(Thread):
        heartbeat = 0.1
 
-       def __init__(self, dev, log, promisc=False, mtu=1500):
+       def __init__(self, dev, cappie, promisc=False, mtu=1500):
                Thread.__init__(self)
 
+               self.cappie = cappie
                self.dev = dev
-               self.log = log
-               self.promisc = promisc
+               self.log = self.cappie.log
                self.mtu = mtu
+               self.promisc = promisc
+               self.queue = self.cappie.queue
 
                self.db = Database(self)
 
@@ -202,6 +204,72 @@ class Interface(Thread):
                return "arp or rarp"
 
 
+class QueueFullError(Exception):
+       pass
+
+
+class Queue(Thread):
+       heartbeat = 5.0
+       maxitems = 100
+
+       def __init__(self, log):
+               Thread.__init__(self)
+
+               self.log = log
+
+               self.__running = True
+               self.__queue = []
+
+       def __len__(self):
+               return self.length
+
+       def add(self, event):
+               if self.length > self.maxitems:
+                       raise QueueFullError, "Cannot queue new event."
+
+               self.__queue.append(event)
+
+       @property
+       def length(self):
+               return len(self.__queue)
+
+       def run(self):
+               self.log.debug("Started event queue")
+
+               while self.__running or self.__queue:
+                       if not self.__queue:
+                               #self.log.debug("Queue sleeping for %s seconds" % self.heartbeat)
+                               time.sleep(self.heartbeat)
+                               continue
+
+                       event = self.__queue.pop(0)
+                       self.log.debug("Processing queue event: %s" % event)
+                       try:
+                               event.run()
+                       except EventException, e:
+                               self.log.error("Catched event exception: %s" % e)
+
+       def shutdown(self):
+               self.__running = False
+               self.log.debug("Shutting down queue")
+               self.log.debug("%d events in queue left" % len(self.__queue))
+
+               # Wait until queue handled all events
+               self.join()
+
+
+class Event(object):
+       def __init__(self, interface):
+               self.cappie = interface.cappie
+               self.interface = interface
+
+       def __str__(self):
+               return self.__class__.__name__
+
+       def run(self):
+               raise NotImplementedError
+
+
 class Cappie(object):
        def __init__(self):
                self.__interfaces = []
@@ -219,10 +287,12 @@ class Cappie(object):
                handler.setFormatter(logging.Formatter("cappie: %(message)s"))
                self.log.addHandler(handler)
 
+               self.queue = Queue(self.log)
+
                self.log.info("Cappie successfully started")
 
        def __del__(self):
-               self.reset()
+               self.shutdown()
                self.log.info("Exiting")
 
        def setDebug(self, debug):
@@ -235,18 +305,27 @@ class Cappie(object):
                if not dev in getAllInterfaces():
                        raise InterfaceError, "No such interface %s" % dev
 
-               iface = Interface(dev, log=self.log, **kwargs)
+               kwargs["cappie"] = self
+
+               iface = Interface(dev, **kwargs)
                self.__interfaces.append(iface)
 
        def run(self):
                if not self.__interfaces:
                        raise RuntimeError, "No interfaces were configured"
 
+               # Start queue
+               self.queue.start()
+
                # Start a thread for each interface
                for iface in self.__interfaces:
                        iface.start()
 
                while True:
+                       if not self.queue.is_alive():
+                               self.log.critical("Queue thread died unexpectedly.")
+                               return
+
                        for iface in self.__interfaces:
                                if not iface.is_alive():
                                        self.log.critical("Thread died unexpectedly. %s" % iface.dev)
@@ -254,8 +333,6 @@ class Cappie(object):
                                time.sleep(60)
 
        def readConfig(self, configfile):
-               self.reset()
-
                config = ConfigParser()
                config.read([configfile])
 
@@ -272,14 +349,12 @@ class Cappie(object):
                                options[option] = value
                        self.addInterface(iface, **options)
 
-       def reset(self):
-               self.shutdown()
-               self.__interfaces = []
-
        def shutdown(self):
                for iface in self.__interfaces:
                        iface.shutdown()
 
+               self.queue.shutdown()
+
 
 if __name__ == "__main__":
        from optparse import OptionParser