]> git.ipfire.org Git - oddments/cappie.git/blob - cappie.py
55f305950ddeeddc55d94394541b92d412a35a68
[oddments/cappie.git] / cappie.py
1 #!/usr/bin/python
2
3 import logging
4 import logging.handlers
5 import pcapy
6 import struct
7 import sys
8 import time
9
10 from ConfigParser import ConfigParser
11 from threading import Thread
12
13 TYPE_ARP = 0
14
15 OPERATION_REQUEST = 0
16 OPERATION_RESPONSE = 1
17
18 def getAllInterfaces():
19 filters = ("lo", "any")
20 ret = []
21 for dev in pcapy.findalldevs():
22 if not dev in filters:
23 ret.append(dev)
24 return ret
25
26 def val2int(val):
27 return int("".join(["%02d" % ord(c) for c in val]), 16)
28
29 def val2ip4(val):
30 return ".".join(["%d" % ord(i) for i in val])
31
32 def val2mac(val):
33 return ":".join(["%02x" % ord(i) for i in val])
34
35 def decode_packet(data):
36 for func in (decode_arp_packet,):
37 try:
38 p =func(data)
39 except PacketTypeError:
40 continue
41
42 return p
43
44 raise PacketTypeError, "Could not determine type of packet"
45
46 def decode_arp_packet(data):
47 operationmap = {
48 1 : OPERATION_REQUEST,
49 2 : OPERATION_RESPONSE,
50 }
51
52 #if not len(data) == 42:
53 # raise DecodeError, "Data has wrong length: %d" % len(data)
54
55 ret = {
56 "type" : TYPE_ARP,
57 }
58
59 #"hwtype" : data[:2],
60 protocol = val2int(struct.unpack("!2s", data[12:14])[0])
61 hw_addr_size = val2int(struct.unpack("!1s", data[18:19])[0])
62 hw_prot_size = val2int(struct.unpack("!1s", data[19:20])[0])
63 operation = val2int(struct.unpack("!2s", data[20:22])[0])
64
65 # Sanity checks
66 if not protocol == 0x0806:
67 raise PacketTypeError, "Not an ARP packet"
68
69 # TODO Must check hwtype here...
70
71 try:
72 ret["operation"] = operationmap[operation]
73 except KeyError:
74 raise DecodeError, "Unknown operation type"
75
76 address_length = hw_addr_size + hw_prot_size
77 unpack_str = "!%ss%ss" % (hw_addr_size, hw_prot_size)
78
79 ret["source_address"], ret["source_ip_address"] = \
80 struct.unpack(unpack_str, data[22:22 + address_length])
81
82 ret["destination_address"], ret["destination_ip_address"] = \
83 struct.unpack(unpack_str, data[22 + address_length:22 + address_length * 2])
84
85 for i in ("source_address", "destination_address"):
86 ret[i] = val2mac(ret[i])
87
88 for i in ("source_ip_address", "destination_ip_address"):
89 ret[i] = val2ip4(ret[i])
90
91 return ret
92
93 def decode_ndp_packet(data):
94 raise PacketTypeError
95
96 class PacketTypeError(Exception):
97 pass
98
99 class DecodeError(Exception):
100 pass
101
102
103 class InterfaceError(Exception):
104 pass
105
106
107 class Database(object):
108 def __init__(self, interface):
109 self.interface = interface
110 self.dev = self.interface.dev
111 self.log = self.interface.log
112
113 self.__data = {}
114
115 def open(self):
116 self.log.debug("Opened database for %s" % self.dev)
117
118 def close(self):
119 self.log.debug("Closing database for %s" % self.dev)
120 print self.__data
121
122 def get(self, mac):
123 if self.has(mac):
124 return self.__data[mac]
125
126 def has(self, mac):
127 return self.__data.has_key(mac)
128
129 def put(self, mac, key, val):
130 if not self.has(mac):
131 self.__data[mac] = {}
132
133 # TODO Check key for sanity
134
135 self.__data[mac][key] = val
136
137
138 class Interface(Thread):
139 heartbeat = 0.1
140
141 def __init__(self, dev, cappie, promisc=False, mtu=1500):
142 Thread.__init__(self)
143
144 self.cappie = cappie
145 self.dev = dev
146 self.log = self.cappie.log
147 self.mtu = mtu
148 self.promisc = promisc
149 self.queue = self.cappie.queue
150
151 self.db = Database(self)
152
153 self.log.debug("Created new interface %s" % self.dev)
154
155 self.__running = True
156
157 def _callback(self, header, data):
158 self.log.debug("Received packet on %s" % self.dev)
159 try:
160 p = decode_packet(data)
161 except PacketTypeError, e:
162 self.log.error("Got unknown packet: %s" % e)
163 return
164 except DecodeError, e:
165 self.log.warning("Got decoding error: %s" % e)
166 return
167
168 # Dump packet information
169 for key, val in p.items():
170 self.log.debug(" %s: %s" % (key, val))
171
172 if not self.db.has(p["source_address"]):
173 self.db.put(p["source_address"], "SOURCE_IP_ADDRESS", p["source_ip_address"])
174
175 def run(self):
176 self.log.info("Starting interface %s" % self.dev)
177
178 self.db.open()
179
180 p = pcapy.open_live(self.dev, self.mtu, self.promisc, 0)
181 p.setfilter(self.filter)
182 #p.loop(0, self._callback)
183
184 p.setnonblock(1)
185 while True:
186 if not self.__running:
187 self.db.close()
188 return
189
190 if p.dispatch(1, self._callback):
191 continue
192
193 time.sleep(self.heartbeat)
194
195 def shutdown(self):
196 if not self.__running:
197 return
198
199 self.log.debug("Sending shutdown signal to %s" % self.dev)
200 self.__running = False
201
202 @property
203 def filter(self):
204 return "arp or rarp"
205
206
207 class QueueFullError(Exception):
208 pass
209
210
211 class Queue(Thread):
212 heartbeat = 5.0
213 maxitems = 100
214
215 def __init__(self, log):
216 Thread.__init__(self)
217
218 self.log = log
219
220 self.__running = True
221 self.__queue = []
222
223 def __len__(self):
224 return self.length
225
226 def add(self, event):
227 if self.length > self.maxitems:
228 raise QueueFullError, "Cannot queue new event."
229
230 self.__queue.append(event)
231
232 @property
233 def length(self):
234 return len(self.__queue)
235
236 def run(self):
237 self.log.debug("Started event queue")
238
239 while self.__running or self.__queue:
240 if not self.__queue:
241 #self.log.debug("Queue sleeping for %s seconds" % self.heartbeat)
242 time.sleep(self.heartbeat)
243 continue
244
245 event = self.__queue.pop(0)
246 self.log.debug("Processing queue event: %s" % event)
247 try:
248 event.run()
249 except EventException, e:
250 self.log.error("Catched event exception: %s" % e)
251
252 def shutdown(self):
253 self.__running = False
254 self.log.debug("Shutting down queue")
255 self.log.debug("%d events in queue left" % len(self.__queue))
256
257 # Wait until queue handled all events
258 self.join()
259
260
261 class Event(object):
262 def __init__(self, interface):
263 self.cappie = interface.cappie
264 self.interface = interface
265
266 def __str__(self):
267 return self.__class__.__name__
268
269 def run(self):
270 raise NotImplementedError
271
272
273 class Cappie(object):
274 def __init__(self):
275 self.__interfaces = []
276
277 self.log = logging.getLogger("cappie")
278 self.log.setLevel(logging.INFO)
279
280 # Log to console
281 handler = logging.StreamHandler()
282 handler.setFormatter(logging.Formatter("%(levelname)7s %(message)s"))
283 self.log.addHandler(handler)
284
285 # Setup syslog
286 handler = logging.handlers.SysLogHandler("/dev/log")
287 handler.setFormatter(logging.Formatter("cappie: %(message)s"))
288 self.log.addHandler(handler)
289
290 self.queue = Queue(self.log)
291
292 self.log.info("Cappie successfully started")
293
294 def __del__(self):
295 self.shutdown()
296 self.log.info("Exiting")
297
298 def setDebug(self, debug):
299 if debug:
300 self.log.setLevel(logging.DEBUG)
301 else:
302 self.log.setLevel(logging.INFO)
303
304 def addInterface(self, dev, **kwargs):
305 if not dev in getAllInterfaces():
306 raise InterfaceError, "No such interface %s" % dev
307
308 kwargs["cappie"] = self
309
310 iface = Interface(dev, **kwargs)
311 self.__interfaces.append(iface)
312
313 def run(self):
314 if not self.__interfaces:
315 raise RuntimeError, "No interfaces were configured"
316
317 # Start queue
318 self.queue.start()
319
320 # Start a thread for each interface
321 for iface in self.__interfaces:
322 iface.start()
323
324 while True:
325 if not self.queue.is_alive():
326 self.log.critical("Queue thread died unexpectedly.")
327 return
328
329 for iface in self.__interfaces:
330 if not iface.is_alive():
331 self.log.critical("Thread died unexpectedly. %s" % iface.dev)
332 return
333 time.sleep(60)
334
335 def readConfig(self, configfile):
336 config = ConfigParser()
337 config.read([configfile])
338
339 global_opts = {}
340 if config.has_section("global"):
341 for option, value in config.items("global"):
342 global_opts[option] = value
343
344 config.remove_section("global")
345
346 for iface in config.sections():
347 options = {}
348 for option, value in config.items(iface):
349 options[option] = value
350 self.addInterface(iface, **options)
351
352 def shutdown(self):
353 for iface in self.__interfaces:
354 iface.shutdown()
355
356 self.queue.shutdown()
357
358
359 if __name__ == "__main__":
360 from optparse import OptionParser
361 op = OptionParser()
362 op.add_option("-c", "--config", dest="config",
363 help="read configuration from file", metavar="FILE",
364 default="/etc/cappie/cappie.conf")
365 op.add_option("-d", action="store_true", dest="debug", default=False)
366
367 (options, args) = op.parse_args()
368
369 cappie = Cappie()
370 if options.config:
371 cappie.readConfig(options.config)
372 cappie.setDebug(options.debug)
373
374 try:
375 cappie.run()
376 except KeyboardInterrupt:
377 cappie.shutdown()
378 except RuntimeError, e:
379 print >>sys.stderr, e
380 sys.exit(1)
381
382 #sys.exit(0)