From: Michael Tremer Date: Sat, 17 Apr 2010 12:33:31 +0000 (+0200) Subject: Add a global event queue. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d558f1818413756194d4c5abe82b188888a6914e;p=oddments%2Fcappie.git Add a global event queue. This queue is supposed to handle events (that have to be defined later), because we can only handle one event at the same time. --- diff --git a/cappie.py b/cappie.py index a994934..55f3059 100644 --- a/cappie.py +++ b/cappie.py @@ -138,13 +138,15 @@ class Database(object): class Interface(Thread): heartbeat = 0.1 - def __init__(self, dev, log, promisc=False, mtu=1500): + def __init__(self, dev, cappie, promisc=False, mtu=1500): Thread.__init__(self) + self.cappie = cappie self.dev = dev - self.log = log - self.promisc = promisc + self.log = self.cappie.log self.mtu = mtu + self.promisc = promisc + self.queue = self.cappie.queue self.db = Database(self) @@ -202,6 +204,72 @@ class Interface(Thread): return "arp or rarp" +class QueueFullError(Exception): + pass + + +class Queue(Thread): + heartbeat = 5.0 + maxitems = 100 + + def __init__(self, log): + Thread.__init__(self) + + self.log = log + + self.__running = True + self.__queue = [] + + def __len__(self): + return self.length + + def add(self, event): + if self.length > self.maxitems: + raise QueueFullError, "Cannot queue new event." + + self.__queue.append(event) + + @property + def length(self): + return len(self.__queue) + + def run(self): + self.log.debug("Started event queue") + + while self.__running or self.__queue: + if not self.__queue: + #self.log.debug("Queue sleeping for %s seconds" % self.heartbeat) + time.sleep(self.heartbeat) + continue + + event = self.__queue.pop(0) + self.log.debug("Processing queue event: %s" % event) + try: + event.run() + except EventException, e: + self.log.error("Catched event exception: %s" % e) + + def shutdown(self): + self.__running = False + self.log.debug("Shutting down queue") + self.log.debug("%d events in queue left" % len(self.__queue)) + + # Wait until queue handled all events + self.join() + + +class Event(object): + def __init__(self, interface): + self.cappie = interface.cappie + self.interface = interface + + def __str__(self): + return self.__class__.__name__ + + def run(self): + raise NotImplementedError + + class Cappie(object): def __init__(self): self.__interfaces = [] @@ -219,10 +287,12 @@ class Cappie(object): handler.setFormatter(logging.Formatter("cappie: %(message)s")) self.log.addHandler(handler) + self.queue = Queue(self.log) + self.log.info("Cappie successfully started") def __del__(self): - self.reset() + self.shutdown() self.log.info("Exiting") def setDebug(self, debug): @@ -235,18 +305,27 @@ class Cappie(object): if not dev in getAllInterfaces(): raise InterfaceError, "No such interface %s" % dev - iface = Interface(dev, log=self.log, **kwargs) + kwargs["cappie"] = self + + iface = Interface(dev, **kwargs) self.__interfaces.append(iface) def run(self): if not self.__interfaces: raise RuntimeError, "No interfaces were configured" + # Start queue + self.queue.start() + # Start a thread for each interface for iface in self.__interfaces: iface.start() while True: + if not self.queue.is_alive(): + self.log.critical("Queue thread died unexpectedly.") + return + for iface in self.__interfaces: if not iface.is_alive(): self.log.critical("Thread died unexpectedly. %s" % iface.dev) @@ -254,8 +333,6 @@ class Cappie(object): time.sleep(60) def readConfig(self, configfile): - self.reset() - config = ConfigParser() config.read([configfile]) @@ -272,14 +349,12 @@ class Cappie(object): options[option] = value self.addInterface(iface, **options) - def reset(self): - self.shutdown() - self.__interfaces = [] - def shutdown(self): for iface in self.__interfaces: iface.shutdown() + self.queue.shutdown() + if __name__ == "__main__": from optparse import OptionParser