class Interface(Thread):
heartbeat = 0.1
- def __init__(self, dev, log, promisc=False, mtu=1500):
+ def __init__(self, dev, cappie, promisc=False, mtu=1500):
Thread.__init__(self)
+ self.cappie = cappie
self.dev = dev
- self.log = log
- self.promisc = promisc
+ self.log = self.cappie.log
self.mtu = mtu
+ self.promisc = promisc
+ self.queue = self.cappie.queue
self.db = Database(self)
return "arp or rarp"
+class QueueFullError(Exception):
+ pass
+
+
+class Queue(Thread):
+ heartbeat = 5.0
+ maxitems = 100
+
+ def __init__(self, log):
+ Thread.__init__(self)
+
+ self.log = log
+
+ self.__running = True
+ self.__queue = []
+
+ def __len__(self):
+ return self.length
+
+ def add(self, event):
+ if self.length > self.maxitems:
+ raise QueueFullError, "Cannot queue new event."
+
+ self.__queue.append(event)
+
+ @property
+ def length(self):
+ return len(self.__queue)
+
+ def run(self):
+ self.log.debug("Started event queue")
+
+ while self.__running or self.__queue:
+ if not self.__queue:
+ #self.log.debug("Queue sleeping for %s seconds" % self.heartbeat)
+ time.sleep(self.heartbeat)
+ continue
+
+ event = self.__queue.pop(0)
+ self.log.debug("Processing queue event: %s" % event)
+ try:
+ event.run()
+ except EventException, e:
+ self.log.error("Catched event exception: %s" % e)
+
+ def shutdown(self):
+ self.__running = False
+ self.log.debug("Shutting down queue")
+ self.log.debug("%d events in queue left" % len(self.__queue))
+
+ # Wait until queue handled all events
+ self.join()
+
+
+class Event(object):
+ def __init__(self, interface):
+ self.cappie = interface.cappie
+ self.interface = interface
+
+ def __str__(self):
+ return self.__class__.__name__
+
+ def run(self):
+ raise NotImplementedError
+
+
class Cappie(object):
def __init__(self):
self.__interfaces = []
handler.setFormatter(logging.Formatter("cappie: %(message)s"))
self.log.addHandler(handler)
+ self.queue = Queue(self.log)
+
self.log.info("Cappie successfully started")
def __del__(self):
- self.reset()
+ self.shutdown()
self.log.info("Exiting")
def setDebug(self, debug):
if not dev in getAllInterfaces():
raise InterfaceError, "No such interface %s" % dev
- iface = Interface(dev, log=self.log, **kwargs)
+ kwargs["cappie"] = self
+
+ iface = Interface(dev, **kwargs)
self.__interfaces.append(iface)
def run(self):
if not self.__interfaces:
raise RuntimeError, "No interfaces were configured"
+ # Start queue
+ self.queue.start()
+
# Start a thread for each interface
for iface in self.__interfaces:
iface.start()
while True:
+ if not self.queue.is_alive():
+ self.log.critical("Queue thread died unexpectedly.")
+ return
+
for iface in self.__interfaces:
if not iface.is_alive():
self.log.critical("Thread died unexpectedly. %s" % iface.dev)
time.sleep(60)
def readConfig(self, configfile):
- self.reset()
-
config = ConfigParser()
config.read([configfile])
options[option] = value
self.addInterface(iface, **options)
- def reset(self):
- self.shutdown()
- self.__interfaces = []
-
def shutdown(self):
for iface in self.__interfaces:
iface.shutdown()
+ self.queue.shutdown()
+
if __name__ == "__main__":
from optparse import OptionParser