]>
git.ipfire.org Git - oddments/cappie.git/blob - cappie.py
55f305950ddeeddc55d94394541b92d412a35a68
4 import logging
.handlers
10 from ConfigParser
import ConfigParser
11 from threading
import Thread
16 OPERATION_RESPONSE
= 1
18 def getAllInterfaces():
19 filters
= ("lo", "any")
21 for dev
in pcapy
.findalldevs():
22 if not dev
in filters
:
27 return int("".join(["%02d" % ord(c
) for c
in val
]), 16)
30 return ".".join(["%d" % ord(i
) for i
in val
])
33 return ":".join(["%02x" % ord(i
) for i
in val
])
35 def decode_packet(data
):
36 for func
in (decode_arp_packet
,):
39 except PacketTypeError
:
44 raise PacketTypeError
, "Could not determine type of packet"
46 def decode_arp_packet(data
):
48 1 : OPERATION_REQUEST
,
49 2 : OPERATION_RESPONSE
,
52 #if not len(data) == 42:
53 # raise DecodeError, "Data has wrong length: %d" % len(data)
60 protocol
= val2int(struct
.unpack("!2s", data
[12:14])[0])
61 hw_addr_size
= val2int(struct
.unpack("!1s", data
[18:19])[0])
62 hw_prot_size
= val2int(struct
.unpack("!1s", data
[19:20])[0])
63 operation
= val2int(struct
.unpack("!2s", data
[20:22])[0])
66 if not protocol
== 0x0806:
67 raise PacketTypeError
, "Not an ARP packet"
69 # TODO Must check hwtype here...
72 ret
["operation"] = operationmap
[operation
]
74 raise DecodeError
, "Unknown operation type"
76 address_length
= hw_addr_size
+ hw_prot_size
77 unpack_str
= "!%ss%ss" % (hw_addr_size
, hw_prot_size
)
79 ret
["source_address"], ret
["source_ip_address"] = \
80 struct
.unpack(unpack_str
, data
[22:22 + address_length
])
82 ret
["destination_address"], ret
["destination_ip_address"] = \
83 struct
.unpack(unpack_str
, data
[22 + address_length
:22 + address_length
* 2])
85 for i
in ("source_address", "destination_address"):
86 ret
[i
] = val2mac(ret
[i
])
88 for i
in ("source_ip_address", "destination_ip_address"):
89 ret
[i
] = val2ip4(ret
[i
])
93 def decode_ndp_packet(data
):
96 class PacketTypeError(Exception):
99 class DecodeError(Exception):
103 class InterfaceError(Exception):
107 class Database(object):
108 def __init__(self
, interface
):
109 self
.interface
= interface
110 self
.dev
= self
.interface
.dev
111 self
.log
= self
.interface
.log
116 self
.log
.debug("Opened database for %s" % self
.dev
)
119 self
.log
.debug("Closing database for %s" % self
.dev
)
124 return self
.__data
[mac
]
127 return self
.__data
.has_key(mac
)
129 def put(self
, mac
, key
, val
):
130 if not self
.has(mac
):
131 self
.__data
[mac
] = {}
133 # TODO Check key for sanity
135 self
.__data
[mac
][key
] = val
138 class Interface(Thread
):
141 def __init__(self
, dev
, cappie
, promisc
=False, mtu
=1500):
142 Thread
.__init
__(self
)
146 self
.log
= self
.cappie
.log
148 self
.promisc
= promisc
149 self
.queue
= self
.cappie
.queue
151 self
.db
= Database(self
)
153 self
.log
.debug("Created new interface %s" % self
.dev
)
155 self
.__running
= True
157 def _callback(self
, header
, data
):
158 self
.log
.debug("Received packet on %s" % self
.dev
)
160 p
= decode_packet(data
)
161 except PacketTypeError
, e
:
162 self
.log
.error("Got unknown packet: %s" % e
)
164 except DecodeError
, e
:
165 self
.log
.warning("Got decoding error: %s" % e
)
168 # Dump packet information
169 for key
, val
in p
.items():
170 self
.log
.debug(" %s: %s" % (key
, val
))
172 if not self
.db
.has(p
["source_address"]):
173 self
.db
.put(p
["source_address"], "SOURCE_IP_ADDRESS", p
["source_ip_address"])
176 self
.log
.info("Starting interface %s" % self
.dev
)
180 p
= pcapy
.open_live(self
.dev
, self
.mtu
, self
.promisc
, 0)
181 p
.setfilter(self
.filter)
182 #p.loop(0, self._callback)
186 if not self
.__running
:
190 if p
.dispatch(1, self
._callback
):
193 time
.sleep(self
.heartbeat
)
196 if not self
.__running
:
199 self
.log
.debug("Sending shutdown signal to %s" % self
.dev
)
200 self
.__running
= False
207 class QueueFullError(Exception):
215 def __init__(self
, log
):
216 Thread
.__init
__(self
)
220 self
.__running
= True
226 def add(self
, event
):
227 if self
.length
> self
.maxitems
:
228 raise QueueFullError
, "Cannot queue new event."
230 self
.__queue
.append(event
)
234 return len(self
.__queue
)
237 self
.log
.debug("Started event queue")
239 while self
.__running
or self
.__queue
:
241 #self.log.debug("Queue sleeping for %s seconds" % self.heartbeat)
242 time
.sleep(self
.heartbeat
)
245 event
= self
.__queue
.pop(0)
246 self
.log
.debug("Processing queue event: %s" % event
)
249 except EventException
, e
:
250 self
.log
.error("Catched event exception: %s" % e
)
253 self
.__running
= False
254 self
.log
.debug("Shutting down queue")
255 self
.log
.debug("%d events in queue left" % len(self
.__queue
))
257 # Wait until queue handled all events
262 def __init__(self
, interface
):
263 self
.cappie
= interface
.cappie
264 self
.interface
= interface
267 return self
.__class
__.__name
__
270 raise NotImplementedError
273 class Cappie(object):
275 self
.__interfaces
= []
277 self
.log
= logging
.getLogger("cappie")
278 self
.log
.setLevel(logging
.INFO
)
281 handler
= logging
.StreamHandler()
282 handler
.setFormatter(logging
.Formatter("%(levelname)7s %(message)s"))
283 self
.log
.addHandler(handler
)
286 handler
= logging
.handlers
.SysLogHandler("/dev/log")
287 handler
.setFormatter(logging
.Formatter("cappie: %(message)s"))
288 self
.log
.addHandler(handler
)
290 self
.queue
= Queue(self
.log
)
292 self
.log
.info("Cappie successfully started")
296 self
.log
.info("Exiting")
298 def setDebug(self
, debug
):
300 self
.log
.setLevel(logging
.DEBUG
)
302 self
.log
.setLevel(logging
.INFO
)
304 def addInterface(self
, dev
, **kwargs
):
305 if not dev
in getAllInterfaces():
306 raise InterfaceError
, "No such interface %s" % dev
308 kwargs
["cappie"] = self
310 iface
= Interface(dev
, **kwargs
)
311 self
.__interfaces
.append(iface
)
314 if not self
.__interfaces
:
315 raise RuntimeError, "No interfaces were configured"
320 # Start a thread for each interface
321 for iface
in self
.__interfaces
:
325 if not self
.queue
.is_alive():
326 self
.log
.critical("Queue thread died unexpectedly.")
329 for iface
in self
.__interfaces
:
330 if not iface
.is_alive():
331 self
.log
.critical("Thread died unexpectedly. %s" % iface
.dev
)
335 def readConfig(self
, configfile
):
336 config
= ConfigParser()
337 config
.read([configfile
])
340 if config
.has_section("global"):
341 for option
, value
in config
.items("global"):
342 global_opts
[option
] = value
344 config
.remove_section("global")
346 for iface
in config
.sections():
348 for option
, value
in config
.items(iface
):
349 options
[option
] = value
350 self
.addInterface(iface
, **options
)
353 for iface
in self
.__interfaces
:
356 self
.queue
.shutdown()
359 if __name__
== "__main__":
360 from optparse
import OptionParser
362 op
.add_option("-c", "--config", dest
="config",
363 help="read configuration from file", metavar
="FILE",
364 default
="/etc/cappie/cappie.conf")
365 op
.add_option("-d", action
="store_true", dest
="debug", default
=False)
367 (options
, args
) = op
.parse_args()
371 cappie
.readConfig(options
.config
)
372 cappie
.setDebug(options
.debug
)
376 except KeyboardInterrupt:
378 except RuntimeError, e
:
379 print >>sys
.stderr
, e