]>
Commit | Line | Data |
---|---|---|
1 | #!/usr/bin/env python | |
2 | import threading | |
3 | import os | |
4 | import socket | |
5 | import struct | |
6 | import sys | |
7 | import time | |
8 | from dnsdisttests import DNSDistTest, Queue | |
9 | ||
10 | import dns | |
11 | import dnstap_pb2 | |
12 | ||
13 | FSTRM_CONTROL_ACCEPT = 0x01 | |
14 | FSTRM_CONTROL_START = 0x02 | |
15 | FSTRM_CONTROL_STOP = 0x03 | |
16 | FSTRM_CONTROL_READY = 0x04 | |
17 | FSTRM_CONTROL_FINISH = 0x05 | |
18 | ||
19 | ||
20 | def checkDnstapBase(testinstance, dnstap, protocol, initiator): | |
21 | testinstance.assertTrue(dnstap) | |
22 | testinstance.assertTrue(dnstap.HasField('identity')) | |
23 | testinstance.assertEqual(dnstap.identity, b'a.server') | |
24 | testinstance.assertTrue(dnstap.HasField('version')) | |
25 | testinstance.assertIn(b'dnsdist ', dnstap.version) | |
26 | testinstance.assertTrue(dnstap.HasField('type')) | |
27 | testinstance.assertEqual(dnstap.type, dnstap.MESSAGE) | |
28 | testinstance.assertTrue(dnstap.HasField('message')) | |
29 | testinstance.assertTrue(dnstap.message.HasField('socket_protocol')) | |
30 | testinstance.assertEqual(dnstap.message.socket_protocol, protocol) | |
31 | testinstance.assertTrue(dnstap.message.HasField('socket_family')) | |
32 | testinstance.assertEquals(dnstap.message.socket_family, dnstap_pb2.INET) | |
33 | testinstance.assertTrue(dnstap.message.HasField('query_address')) | |
34 | testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.query_address), initiator) | |
35 | testinstance.assertTrue(dnstap.message.HasField('response_address')) | |
36 | testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.response_address), initiator) | |
37 | testinstance.assertTrue(dnstap.message.HasField('response_port')) | |
38 | testinstance.assertEquals(dnstap.message.response_port, testinstance._dnsDistPort) | |
39 | ||
40 | ||
41 | def checkDnstapQuery(testinstance, dnstap, protocol, query, initiator='127.0.0.1'): | |
42 | testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.CLIENT_QUERY) | |
43 | checkDnstapBase(testinstance, dnstap, protocol, initiator) | |
44 | ||
45 | testinstance.assertTrue(dnstap.message.HasField('query_time_sec')) | |
46 | testinstance.assertTrue(dnstap.message.HasField('query_time_nsec')) | |
47 | ||
48 | testinstance.assertTrue(dnstap.message.HasField('query_message')) | |
49 | wire_message = dns.message.from_wire(dnstap.message.query_message) | |
50 | testinstance.assertEqual(wire_message, query) | |
51 | ||
52 | ||
53 | def checkDnstapExtra(testinstance, dnstap, expected): | |
54 | testinstance.assertTrue(dnstap.HasField('extra')) | |
55 | testinstance.assertEqual(dnstap.extra, expected) | |
56 | ||
57 | ||
58 | def checkDnstapNoExtra(testinstance, dnstap): | |
59 | testinstance.assertFalse(dnstap.HasField('extra')) | |
60 | ||
61 | ||
62 | def checkDnstapResponse(testinstance, dnstap, protocol, response, initiator='127.0.0.1'): | |
63 | testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.CLIENT_RESPONSE) | |
64 | checkDnstapBase(testinstance, dnstap, protocol, initiator) | |
65 | ||
66 | testinstance.assertTrue(dnstap.message.HasField('query_time_sec')) | |
67 | testinstance.assertTrue(dnstap.message.HasField('query_time_nsec')) | |
68 | ||
69 | testinstance.assertTrue(dnstap.message.HasField('response_time_sec')) | |
70 | testinstance.assertTrue(dnstap.message.HasField('response_time_nsec')) | |
71 | ||
72 | testinstance.assertTrue(dnstap.message.response_time_sec > dnstap.message.query_time_sec or \ | |
73 | dnstap.message.response_time_nsec > dnstap.message.query_time_nsec) | |
74 | ||
75 | testinstance.assertTrue(dnstap.message.HasField('response_message')) | |
76 | wire_message = dns.message.from_wire(dnstap.message.response_message) | |
77 | testinstance.assertEqual(wire_message, response) | |
78 | ||
79 | ||
80 | class TestDnstapOverRemoteLogger(DNSDistTest): | |
81 | _remoteLoggerServerPort = 4243 | |
82 | _remoteLoggerQueue = Queue() | |
83 | _remoteLoggerCounter = 0 | |
84 | _config_params = ['_testServerPort', '_remoteLoggerServerPort'] | |
85 | _config_template = """ | |
86 | extrasmn = newSuffixMatchNode() | |
87 | extrasmn:add(newDNSName('extra.dnstap.tests.powerdns.com.')) | |
88 | ||
89 | luatarget = 'lua.dnstap.tests.powerdns.com.' | |
90 | ||
91 | function alterDnstapQuery(dq, tap) | |
92 | if extrasmn:check(dq.qname) then | |
93 | tap:setExtra("Type,Query") | |
94 | end | |
95 | end | |
96 | ||
97 | function alterDnstapResponse(dq, tap) | |
98 | if extrasmn:check(dq.qname) then | |
99 | tap:setExtra("Type,Response") | |
100 | end | |
101 | end | |
102 | ||
103 | function luaFunc(dq) | |
104 | dq.dh:setQR(true) | |
105 | dq.dh:setRCode(DNSRCode.NXDOMAIN) | |
106 | return DNSAction.None, "" | |
107 | end | |
108 | ||
109 | newServer{address="127.0.0.1:%s", useClientSubnet=true} | |
110 | rl = newRemoteLogger('127.0.0.1:%s') | |
111 | ||
112 | addAction(AllRule(), DnstapLogAction("a.server", rl, alterDnstapQuery)) -- Send dnstap message before lookup | |
113 | ||
114 | addAction(luatarget, LuaAction(luaFunc)) -- Send dnstap message before lookup | |
115 | ||
116 | addResponseAction(AllRule(), DnstapLogResponseAction("a.server", rl, alterDnstapResponse)) -- Send dnstap message after lookup | |
117 | ||
118 | addAction('spoof.dnstap.tests.powerdns.com.', SpoofAction("192.0.2.1")) | |
119 | """ | |
120 | ||
121 | @classmethod | |
122 | def RemoteLoggerListener(cls, port): | |
123 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
124 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
125 | try: | |
126 | sock.bind(("127.0.0.1", port)) | |
127 | except socket.error as e: | |
128 | print("Error binding in the protbuf listener: %s" % str(e)) | |
129 | sys.exit(1) | |
130 | ||
131 | sock.listen(100) | |
132 | while True: | |
133 | (conn, _) = sock.accept() | |
134 | data = None | |
135 | while True: | |
136 | data = conn.recv(2) | |
137 | if not data: | |
138 | break | |
139 | (datalen,) = struct.unpack("!H", data) | |
140 | data = conn.recv(datalen) | |
141 | if not data: | |
142 | break | |
143 | ||
144 | cls._remoteLoggerQueue.put(data, True, timeout=2.0) | |
145 | ||
146 | conn.close() | |
147 | sock.close() | |
148 | ||
149 | @classmethod | |
150 | def startResponders(cls): | |
151 | DNSDistTest.startResponders() | |
152 | ||
153 | cls._remoteLoggerListener = threading.Thread(name='RemoteLogger Listener', target=cls.RemoteLoggerListener, args=[cls._remoteLoggerServerPort]) | |
154 | cls._remoteLoggerListener.setDaemon(True) | |
155 | cls._remoteLoggerListener.start() | |
156 | ||
157 | def getFirstDnstap(self): | |
158 | self.assertFalse(self._remoteLoggerQueue.empty()) | |
159 | data = self._remoteLoggerQueue.get(False) | |
160 | self.assertTrue(data) | |
161 | dnstap = dnstap_pb2.Dnstap() | |
162 | dnstap.ParseFromString(data) | |
163 | return dnstap | |
164 | ||
165 | def testDnstap(self): | |
166 | """ | |
167 | Dnstap: Send query and responses packed in dnstap to a remotelogger server | |
168 | """ | |
169 | name = 'query.dnstap.tests.powerdns.com.' | |
170 | ||
171 | target = 'target.dnstap.tests.powerdns.com.' | |
172 | query = dns.message.make_query(name, 'A', 'IN') | |
173 | response = dns.message.make_response(query) | |
174 | ||
175 | rrset = dns.rrset.from_text(name, | |
176 | 3600, | |
177 | dns.rdataclass.IN, | |
178 | dns.rdatatype.CNAME, | |
179 | target) | |
180 | response.answer.append(rrset) | |
181 | ||
182 | rrset = dns.rrset.from_text(target, | |
183 | 3600, | |
184 | dns.rdataclass.IN, | |
185 | dns.rdatatype.A, | |
186 | '127.0.0.1') | |
187 | response.answer.append(rrset) | |
188 | ||
189 | (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) | |
190 | self.assertTrue(receivedQuery) | |
191 | self.assertTrue(receivedResponse) | |
192 | receivedQuery.id = query.id | |
193 | self.assertEquals(query, receivedQuery) | |
194 | self.assertEquals(response, receivedResponse) | |
195 | ||
196 | # give the dnstap messages time to get here | |
197 | time.sleep(1) | |
198 | ||
199 | # check the dnstap message corresponding to the UDP query | |
200 | dnstap = self.getFirstDnstap() | |
201 | ||
202 | checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query) | |
203 | checkDnstapNoExtra(self, dnstap) | |
204 | ||
205 | # check the dnstap message corresponding to the UDP response | |
206 | dnstap = self.getFirstDnstap() | |
207 | checkDnstapResponse(self, dnstap, dnstap_pb2.UDP, response) | |
208 | checkDnstapNoExtra(self, dnstap) | |
209 | ||
210 | (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) | |
211 | self.assertTrue(receivedQuery) | |
212 | self.assertTrue(receivedResponse) | |
213 | receivedQuery.id = query.id | |
214 | self.assertEquals(query, receivedQuery) | |
215 | self.assertEquals(response, receivedResponse) | |
216 | ||
217 | # give the dnstap messages time to get here | |
218 | time.sleep(1) | |
219 | ||
220 | # check the dnstap message corresponding to the TCP query | |
221 | dnstap = self.getFirstDnstap() | |
222 | ||
223 | checkDnstapQuery(self, dnstap, dnstap_pb2.TCP, query) | |
224 | checkDnstapNoExtra(self, dnstap) | |
225 | ||
226 | # check the dnstap message corresponding to the TCP response | |
227 | dnstap = self.getFirstDnstap() | |
228 | checkDnstapResponse(self, dnstap, dnstap_pb2.TCP, response) | |
229 | checkDnstapNoExtra(self, dnstap) | |
230 | ||
231 | def testDnstapExtra(self): | |
232 | """ | |
233 | DnstapExtra: Send query and responses packed in dnstap to a remotelogger server. Extra data is filled out. | |
234 | """ | |
235 | name = 'extra.dnstap.tests.powerdns.com.' | |
236 | ||
237 | target = 'target.dnstap.tests.powerdns.com.' | |
238 | query = dns.message.make_query(name, 'A', 'IN') | |
239 | response = dns.message.make_response(query) | |
240 | ||
241 | rrset = dns.rrset.from_text(name, | |
242 | 3600, | |
243 | dns.rdataclass.IN, | |
244 | dns.rdatatype.CNAME, | |
245 | target) | |
246 | response.answer.append(rrset) | |
247 | ||
248 | rrset = dns.rrset.from_text(target, | |
249 | 3600, | |
250 | dns.rdataclass.IN, | |
251 | dns.rdatatype.A, | |
252 | '127.0.0.1') | |
253 | response.answer.append(rrset) | |
254 | ||
255 | (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) | |
256 | self.assertTrue(receivedQuery) | |
257 | self.assertTrue(receivedResponse) | |
258 | receivedQuery.id = query.id | |
259 | self.assertEquals(query, receivedQuery) | |
260 | self.assertEquals(response, receivedResponse) | |
261 | ||
262 | # give the dnstap messages time to get here | |
263 | time.sleep(1) | |
264 | ||
265 | # check the dnstap message corresponding to the UDP query | |
266 | dnstap = self.getFirstDnstap() | |
267 | checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query) | |
268 | checkDnstapExtra(self, dnstap, b"Type,Query") | |
269 | ||
270 | # check the dnstap message corresponding to the UDP response | |
271 | dnstap = self.getFirstDnstap() | |
272 | checkDnstapResponse(self, dnstap, dnstap_pb2.UDP, response) | |
273 | checkDnstapExtra(self, dnstap, b"Type,Response") | |
274 | ||
275 | (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) | |
276 | self.assertTrue(receivedQuery) | |
277 | self.assertTrue(receivedResponse) | |
278 | receivedQuery.id = query.id | |
279 | self.assertEquals(query, receivedQuery) | |
280 | self.assertEquals(response, receivedResponse) | |
281 | ||
282 | # give the dnstap messages time to get here | |
283 | time.sleep(1) | |
284 | ||
285 | # check the dnstap message corresponding to the TCP query | |
286 | dnstap = self.getFirstDnstap() | |
287 | checkDnstapQuery(self, dnstap, dnstap_pb2.TCP, query) | |
288 | checkDnstapExtra(self, dnstap, b"Type,Query") | |
289 | ||
290 | # check the dnstap message corresponding to the TCP response | |
291 | dnstap = self.getFirstDnstap() | |
292 | checkDnstapResponse(self, dnstap, dnstap_pb2.TCP, response) | |
293 | checkDnstapExtra(self, dnstap, b"Type,Response") | |
294 | ||
295 | ||
296 | def fstrm_get_control_frame_type(data): | |
297 | (t,) = struct.unpack("!L", data[0:4]) | |
298 | return t | |
299 | ||
300 | ||
301 | def fstrm_make_control_frame_reply(cft, data): | |
302 | if cft == FSTRM_CONTROL_READY: | |
303 | # Reply with ACCEPT frame and content-type | |
304 | contenttype = b'protobuf:dnstap.Dnstap' | |
305 | frame = struct.pack('!LLL', FSTRM_CONTROL_ACCEPT, 1, | |
306 | len(contenttype)) + contenttype | |
307 | buf = struct.pack("!LL", 0, len(frame)) + frame | |
308 | return buf | |
309 | elif cft == FSTRM_CONTROL_START: | |
310 | return None | |
311 | else: | |
312 | raise Exception('unhandled control frame ' + cft) | |
313 | ||
314 | ||
315 | def fstrm_read_and_dispatch_control_frame(conn): | |
316 | data = conn.recv(4) | |
317 | if not data: | |
318 | raise Exception('length of control frame payload could not be read') | |
319 | (datalen,) = struct.unpack("!L", data) | |
320 | data = conn.recv(datalen) | |
321 | cft = fstrm_get_control_frame_type(data) | |
322 | reply = fstrm_make_control_frame_reply(cft, data) | |
323 | if reply: | |
324 | conn.send(reply) | |
325 | return cft | |
326 | ||
327 | ||
328 | def fstrm_handle_bidir_connection(conn, on_data): | |
329 | data = None | |
330 | while True: | |
331 | data = conn.recv(4) | |
332 | if not data: | |
333 | break | |
334 | (datalen,) = struct.unpack("!L", data) | |
335 | if datalen == 0: | |
336 | # control frame length follows | |
337 | cft = fstrm_read_and_dispatch_control_frame(conn) | |
338 | if cft == FSTRM_CONTROL_STOP: | |
339 | break | |
340 | else: | |
341 | # data frame | |
342 | data = conn.recv(datalen) | |
343 | if not data: | |
344 | break | |
345 | ||
346 | on_data(data) | |
347 | ||
348 | ||
349 | class TestDnstapOverFrameStreamUnixLogger(DNSDistTest): | |
350 | _fstrmLoggerAddress = '/tmp/fslutest.sock' | |
351 | _fstrmLoggerQueue = Queue() | |
352 | _fstrmLoggerCounter = 0 | |
353 | _config_params = ['_testServerPort', '_fstrmLoggerAddress'] | |
354 | _config_template = """ | |
355 | newServer{address="127.0.0.1:%s", useClientSubnet=true} | |
356 | fslu = newFrameStreamUnixLogger('%s') | |
357 | ||
358 | addAction(AllRule(), DnstapLogAction("a.server", fslu)) | |
359 | """ | |
360 | ||
361 | @classmethod | |
362 | def FrameStreamUnixListener(cls, path): | |
363 | try: | |
364 | os.unlink(path) | |
365 | except OSError: | |
366 | pass # Assume file not found | |
367 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
368 | try: | |
369 | sock.bind(path) | |
370 | except socket.error as e: | |
371 | print("Error binding in the framestream listener: %s" % str(e)) | |
372 | sys.exit(1) | |
373 | ||
374 | sock.listen(100) | |
375 | while True: | |
376 | (conn, _) = sock.accept() | |
377 | fstrm_handle_bidir_connection(conn, lambda data: \ | |
378 | cls._fstrmLoggerQueue.put(data, True, timeout=2.0)) | |
379 | conn.close() | |
380 | sock.close() | |
381 | ||
382 | @classmethod | |
383 | def startResponders(cls): | |
384 | DNSDistTest.startResponders() | |
385 | ||
386 | cls._fstrmLoggerListener = threading.Thread(name='FrameStreamUnixListener', target=cls.FrameStreamUnixListener, args=[cls._fstrmLoggerAddress]) | |
387 | cls._fstrmLoggerListener.setDaemon(True) | |
388 | cls._fstrmLoggerListener.start() | |
389 | ||
390 | def getFirstDnstap(self): | |
391 | data = self._fstrmLoggerQueue.get(True, timeout=2.0) | |
392 | self.assertTrue(data) | |
393 | dnstap = dnstap_pb2.Dnstap() | |
394 | dnstap.ParseFromString(data) | |
395 | return dnstap | |
396 | ||
397 | def testDnstapOverFrameStreamUnix(self): | |
398 | """ | |
399 | Dnstap: Send query packed in dnstap to a unix socket fstrmlogger server | |
400 | """ | |
401 | name = 'query.dnstap.tests.powerdns.com.' | |
402 | ||
403 | target = 'target.dnstap.tests.powerdns.com.' | |
404 | query = dns.message.make_query(name, 'A', 'IN') | |
405 | response = dns.message.make_response(query) | |
406 | ||
407 | rrset = dns.rrset.from_text(name, | |
408 | 3600, | |
409 | dns.rdataclass.IN, | |
410 | dns.rdatatype.CNAME, | |
411 | target) | |
412 | response.answer.append(rrset) | |
413 | ||
414 | rrset = dns.rrset.from_text(target, | |
415 | 3600, | |
416 | dns.rdataclass.IN, | |
417 | dns.rdatatype.A, | |
418 | '127.0.0.1') | |
419 | response.answer.append(rrset) | |
420 | ||
421 | (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) | |
422 | self.assertTrue(receivedQuery) | |
423 | self.assertTrue(receivedResponse) | |
424 | receivedQuery.id = query.id | |
425 | self.assertEquals(query, receivedQuery) | |
426 | self.assertEquals(response, receivedResponse) | |
427 | ||
428 | # check the dnstap message corresponding to the UDP query | |
429 | dnstap = self.getFirstDnstap() | |
430 | ||
431 | checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query) | |
432 | checkDnstapNoExtra(self, dnstap) | |
433 | ||
434 | ||
435 | class TestDnstapOverFrameStreamTcpLogger(DNSDistTest): | |
436 | _fstrmLoggerPort = 4000 | |
437 | _fstrmLoggerQueue = Queue() | |
438 | _fstrmLoggerCounter = 0 | |
439 | _config_params = ['_testServerPort', '_fstrmLoggerPort'] | |
440 | _config_template = """ | |
441 | newServer{address="127.0.0.1:%s", useClientSubnet=true} | |
442 | fslu = newFrameStreamTcpLogger('127.0.0.1:%s') | |
443 | ||
444 | addAction(AllRule(), DnstapLogAction("a.server", fslu)) | |
445 | """ | |
446 | ||
447 | @classmethod | |
448 | def FrameStreamUnixListener(cls, port): | |
449 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
450 | try: | |
451 | sock.bind(("127.0.0.1", port)) | |
452 | except socket.error as e: | |
453 | print("Error binding in the framestream listener: %s" % str(e)) | |
454 | sys.exit(1) | |
455 | ||
456 | sock.listen(100) | |
457 | while True: | |
458 | (conn, _) = sock.accept() | |
459 | fstrm_handle_bidir_connection(conn, lambda data: \ | |
460 | cls._fstrmLoggerQueue.put(data, True, timeout=2.0)) | |
461 | conn.close() | |
462 | sock.close() | |
463 | ||
464 | @classmethod | |
465 | def startResponders(cls): | |
466 | DNSDistTest.startResponders() | |
467 | ||
468 | cls._fstrmLoggerListener = threading.Thread(name='FrameStreamUnixListener', target=cls.FrameStreamUnixListener, args=[cls._fstrmLoggerPort]) | |
469 | cls._fstrmLoggerListener.setDaemon(True) | |
470 | cls._fstrmLoggerListener.start() | |
471 | ||
472 | def getFirstDnstap(self): | |
473 | data = self._fstrmLoggerQueue.get(True, timeout=2.0) | |
474 | self.assertTrue(data) | |
475 | dnstap = dnstap_pb2.Dnstap() | |
476 | dnstap.ParseFromString(data) | |
477 | return dnstap | |
478 | ||
479 | def testDnstapOverFrameStreamTcp(self): | |
480 | """ | |
481 | Dnstap: Send query packed in dnstap to a tcp socket fstrmlogger server | |
482 | """ | |
483 | name = 'query.dnstap.tests.powerdns.com.' | |
484 | ||
485 | target = 'target.dnstap.tests.powerdns.com.' | |
486 | query = dns.message.make_query(name, 'A', 'IN') | |
487 | response = dns.message.make_response(query) | |
488 | ||
489 | rrset = dns.rrset.from_text(name, | |
490 | 3600, | |
491 | dns.rdataclass.IN, | |
492 | dns.rdatatype.CNAME, | |
493 | target) | |
494 | response.answer.append(rrset) | |
495 | ||
496 | rrset = dns.rrset.from_text(target, | |
497 | 3600, | |
498 | dns.rdataclass.IN, | |
499 | dns.rdatatype.A, | |
500 | '127.0.0.1') | |
501 | response.answer.append(rrset) | |
502 | ||
503 | (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) | |
504 | self.assertTrue(receivedQuery) | |
505 | self.assertTrue(receivedResponse) | |
506 | receivedQuery.id = query.id | |
507 | self.assertEquals(query, receivedQuery) | |
508 | self.assertEquals(response, receivedResponse) | |
509 | ||
510 | # check the dnstap message corresponding to the UDP query | |
511 | dnstap = self.getFirstDnstap() | |
512 | ||
513 | checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query) | |
514 | checkDnstapNoExtra(self, dnstap) |