]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Dnstap.py
Merge pull request #6100 from pieterlexis/ipv4-ipv6-equiv
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Dnstap.py
1 #!/usr/bin/env python
2 import Queue
3 import threading
4 import os
5 import socket
6 import struct
7 import sys
8 import time
9 from dnsdisttests import DNSDistTest
10
11 import dns
12 import dnstap_pb2
13
14 FSTRM_CONTROL_ACCEPT = 0x01
15 FSTRM_CONTROL_START = 0x02
16 FSTRM_CONTROL_STOP = 0x03
17 FSTRM_CONTROL_READY = 0x04
18 FSTRM_CONTROL_FINISH = 0x05
19
20
21 def checkDnstapBase(testinstance, dnstap, protocol, initiator):
22 testinstance.assertTrue(dnstap)
23 testinstance.assertTrue(dnstap.HasField('identity'))
24 testinstance.assertEqual(dnstap.identity, 'a.server')
25 testinstance.assertTrue(dnstap.HasField('version'))
26 testinstance.assertIn('dnsdist ', dnstap.version)
27 testinstance.assertTrue(dnstap.HasField('type'))
28 testinstance.assertEqual(dnstap.type, dnstap.MESSAGE)
29 testinstance.assertTrue(dnstap.HasField('message'))
30 testinstance.assertTrue(dnstap.message.HasField('socket_protocol'))
31 testinstance.assertEqual(dnstap.message.socket_protocol, protocol)
32 testinstance.assertTrue(dnstap.message.HasField('socket_family'))
33 testinstance.assertEquals(dnstap.message.socket_family, dnstap_pb2.INET)
34 testinstance.assertTrue(dnstap.message.HasField('query_address'))
35 testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.query_address), initiator)
36 testinstance.assertTrue(dnstap.message.HasField('response_address'))
37 testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.response_address), initiator)
38 testinstance.assertTrue(dnstap.message.HasField('response_port'))
39 testinstance.assertEquals(dnstap.message.response_port, testinstance._dnsDistPort)
40
41
42 def checkDnstapQuery(testinstance, dnstap, protocol, query, initiator='127.0.0.1'):
43 testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.CLIENT_QUERY)
44 checkDnstapBase(testinstance, dnstap, protocol, initiator)
45
46 testinstance.assertTrue(dnstap.message.HasField('query_time_sec'))
47 testinstance.assertTrue(dnstap.message.HasField('query_time_nsec'))
48
49 testinstance.assertTrue(dnstap.message.HasField('query_message'))
50 wire_message = dns.message.from_wire(dnstap.message.query_message)
51 testinstance.assertEqual(wire_message, query)
52
53
54 def checkDnstapExtra(testinstance, dnstap, expected):
55 testinstance.assertTrue(dnstap.HasField('extra'))
56 testinstance.assertEqual(dnstap.extra, expected)
57
58
59 def checkDnstapNoExtra(testinstance, dnstap):
60 testinstance.assertFalse(dnstap.HasField('extra'))
61
62
63 def checkDnstapResponse(testinstance, dnstap, protocol, response, initiator='127.0.0.1'):
64 testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.CLIENT_RESPONSE)
65 checkDnstapBase(testinstance, dnstap, protocol, initiator)
66
67 testinstance.assertTrue(dnstap.message.HasField('query_time_sec'))
68 testinstance.assertTrue(dnstap.message.HasField('query_time_nsec'))
69
70 testinstance.assertTrue(dnstap.message.HasField('response_time_sec'))
71 testinstance.assertTrue(dnstap.message.HasField('response_time_nsec'))
72
73 testinstance.assertTrue(dnstap.message.response_time_sec > dnstap.message.query_time_sec or \
74 dnstap.message.response_time_nsec > dnstap.message.query_time_nsec)
75
76 testinstance.assertTrue(dnstap.message.HasField('response_message'))
77 wire_message = dns.message.from_wire(dnstap.message.response_message)
78 testinstance.assertEqual(wire_message, response)
79
80
81 class TestDnstapOverRemoteLogger(DNSDistTest):
82 _remoteLoggerServerPort = 4243
83 _remoteLoggerQueue = Queue.Queue()
84 _remoteLoggerCounter = 0
85 _config_params = ['_testServerPort', '_remoteLoggerServerPort']
86 _config_template = """
87 extrasmn = newSuffixMatchNode()
88 extrasmn:add(newDNSName('extra.dnstap.tests.powerdns.com.'))
89
90 luatarget = 'lua.dnstap.tests.powerdns.com.'
91
92 function alterDnstapQuery(dq, tap)
93 if extrasmn:check(dq.qname) then
94 tap:setExtra("Type,Query")
95 end
96 end
97
98 function alterDnstapResponse(dq, tap)
99 if extrasmn:check(dq.qname) then
100 tap:setExtra("Type,Response")
101 end
102 end
103
104 function luaFunc(dq)
105 dq.dh:setQR(true)
106 dq.dh:setRCode(dnsdist.NXDOMAIN)
107 return DNSAction.None, ""
108 end
109
110 newServer{address="127.0.0.1:%s", useClientSubnet=true}
111 rl = newRemoteLogger('127.0.0.1:%s')
112
113 addAction(AllRule(), DnstapLogAction("a.server", rl, alterDnstapQuery)) -- Send dnstap message before lookup
114
115 addAction(luatarget, LuaAction(luaFunc)) -- Send dnstap message before lookup
116
117 addResponseAction(AllRule(), DnstapLogResponseAction("a.server", rl, alterDnstapResponse)) -- Send dnstap message after lookup
118
119 addAction('spoof.dnstap.tests.powerdns.com.', SpoofAction("192.0.2.1"))
120 """
121
122 @classmethod
123 def RemoteLoggerListener(cls, port):
124 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
125 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
126 try:
127 sock.bind(("127.0.0.1", port))
128 except socket.error as e:
129 print("Error binding in the protbuf listener: %s" % str(e))
130 sys.exit(1)
131
132 sock.listen(100)
133 while True:
134 (conn, _) = sock.accept()
135 data = None
136 while True:
137 data = conn.recv(2)
138 if not data:
139 break
140 (datalen,) = struct.unpack("!H", data)
141 data = conn.recv(datalen)
142 if not data:
143 break
144
145 cls._remoteLoggerQueue.put(data, True, timeout=2.0)
146
147 conn.close()
148 sock.close()
149
150 @classmethod
151 def startResponders(cls):
152 DNSDistTest.startResponders()
153
154 cls._remoteLoggerListener = threading.Thread(name='RemoteLogger Listener', target=cls.RemoteLoggerListener, args=[cls._remoteLoggerServerPort])
155 cls._remoteLoggerListener.setDaemon(True)
156 cls._remoteLoggerListener.start()
157
158 def getFirstDnstap(self):
159 self.assertFalse(self._remoteLoggerQueue.empty())
160 data = self._remoteLoggerQueue.get(False)
161 self.assertTrue(data)
162 dnstap = dnstap_pb2.Dnstap()
163 dnstap.ParseFromString(data)
164 return dnstap
165
166 def testDnstap(self):
167 """
168 Dnstap: Send query and responses packed in dnstap to a remotelogger server
169 """
170 name = 'query.dnstap.tests.powerdns.com.'
171
172 target = 'target.dnstap.tests.powerdns.com.'
173 query = dns.message.make_query(name, 'A', 'IN')
174 response = dns.message.make_response(query)
175
176 rrset = dns.rrset.from_text(name,
177 3600,
178 dns.rdataclass.IN,
179 dns.rdatatype.CNAME,
180 target)
181 response.answer.append(rrset)
182
183 rrset = dns.rrset.from_text(target,
184 3600,
185 dns.rdataclass.IN,
186 dns.rdatatype.A,
187 '127.0.0.1')
188 response.answer.append(rrset)
189
190 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
191 self.assertTrue(receivedQuery)
192 self.assertTrue(receivedResponse)
193 receivedQuery.id = query.id
194 self.assertEquals(query, receivedQuery)
195 self.assertEquals(response, receivedResponse)
196
197 # give the dnstap messages time to get here
198 time.sleep(1)
199
200 # check the dnstap message corresponding to the UDP query
201 dnstap = self.getFirstDnstap()
202
203 checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
204 checkDnstapNoExtra(self, dnstap)
205
206 # check the dnstap message corresponding to the UDP response
207 dnstap = self.getFirstDnstap()
208 checkDnstapResponse(self, dnstap, dnstap_pb2.UDP, response)
209 checkDnstapNoExtra(self, dnstap)
210
211 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
212 self.assertTrue(receivedQuery)
213 self.assertTrue(receivedResponse)
214 receivedQuery.id = query.id
215 self.assertEquals(query, receivedQuery)
216 self.assertEquals(response, receivedResponse)
217
218 # give the dnstap messages time to get here
219 time.sleep(1)
220
221 # check the dnstap message corresponding to the TCP query
222 dnstap = self.getFirstDnstap()
223
224 checkDnstapQuery(self, dnstap, dnstap_pb2.TCP, query)
225 checkDnstapNoExtra(self, dnstap)
226
227 # check the dnstap message corresponding to the TCP response
228 dnstap = self.getFirstDnstap()
229 checkDnstapResponse(self, dnstap, dnstap_pb2.TCP, response)
230 checkDnstapNoExtra(self, dnstap)
231
232 def testDnstapExtra(self):
233 """
234 DnstapExtra: Send query and responses packed in dnstap to a remotelogger server. Extra data is filled out.
235 """
236 name = 'extra.dnstap.tests.powerdns.com.'
237
238 target = 'target.dnstap.tests.powerdns.com.'
239 query = dns.message.make_query(name, 'A', 'IN')
240 response = dns.message.make_response(query)
241
242 rrset = dns.rrset.from_text(name,
243 3600,
244 dns.rdataclass.IN,
245 dns.rdatatype.CNAME,
246 target)
247 response.answer.append(rrset)
248
249 rrset = dns.rrset.from_text(target,
250 3600,
251 dns.rdataclass.IN,
252 dns.rdatatype.A,
253 '127.0.0.1')
254 response.answer.append(rrset)
255
256 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
257 self.assertTrue(receivedQuery)
258 self.assertTrue(receivedResponse)
259 receivedQuery.id = query.id
260 self.assertEquals(query, receivedQuery)
261 self.assertEquals(response, receivedResponse)
262
263 # give the dnstap messages time to get here
264 time.sleep(1)
265
266 # check the dnstap message corresponding to the UDP query
267 dnstap = self.getFirstDnstap()
268 checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
269 checkDnstapExtra(self, dnstap, "Type,Query")
270
271 # check the dnstap message corresponding to the UDP response
272 dnstap = self.getFirstDnstap()
273 checkDnstapResponse(self, dnstap, dnstap_pb2.UDP, response)
274 checkDnstapExtra(self, dnstap, "Type,Response")
275
276 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
277 self.assertTrue(receivedQuery)
278 self.assertTrue(receivedResponse)
279 receivedQuery.id = query.id
280 self.assertEquals(query, receivedQuery)
281 self.assertEquals(response, receivedResponse)
282
283 # give the dnstap messages time to get here
284 time.sleep(1)
285
286 # check the dnstap message corresponding to the TCP query
287 dnstap = self.getFirstDnstap()
288 checkDnstapQuery(self, dnstap, dnstap_pb2.TCP, query)
289 checkDnstapExtra(self, dnstap, "Type,Query")
290
291 # check the dnstap message corresponding to the TCP response
292 dnstap = self.getFirstDnstap()
293 checkDnstapResponse(self, dnstap, dnstap_pb2.TCP, response)
294 checkDnstapExtra(self, dnstap, "Type,Response")
295
296
297 def fstrm_get_control_frame_type(data):
298 (t,) = struct.unpack("!L", data[0:4])
299 return t
300
301
302 def fstrm_make_control_frame_reply(cft, data):
303 if cft == FSTRM_CONTROL_READY:
304 # Reply with ACCEPT frame and content-type
305 contenttype = 'protobuf:dnstap.Dnstap'
306 frame = struct.pack('!LLL', FSTRM_CONTROL_ACCEPT, 1,
307 len(contenttype)) + contenttype
308 buf = struct.pack("!LL", 0, len(frame)) + frame
309 return buf
310 elif cft == FSTRM_CONTROL_START:
311 return None
312 else:
313 raise Exception('unhandled control frame ' + cft)
314
315
316 def fstrm_read_and_dispatch_control_frame(conn):
317 data = conn.recv(4)
318 if not data:
319 raise Exception('length of control frame payload could not be read')
320 (datalen,) = struct.unpack("!L", data)
321 data = conn.recv(datalen)
322 cft = fstrm_get_control_frame_type(data)
323 reply = fstrm_make_control_frame_reply(cft, data)
324 if reply:
325 conn.send(reply)
326 return cft
327
328
329 def fstrm_handle_bidir_connection(conn, on_data):
330 data = None
331 while True:
332 data = conn.recv(4)
333 if not data:
334 break
335 (datalen,) = struct.unpack("!L", data)
336 if datalen == 0:
337 # control frame length follows
338 cft = fstrm_read_and_dispatch_control_frame(conn)
339 if cft == FSTRM_CONTROL_STOP:
340 break
341 else:
342 # data frame
343 data = conn.recv(datalen)
344 if not data:
345 break
346
347 on_data(data)
348
349
350 class TestDnstapOverFrameStreamUnixLogger(DNSDistTest):
351 _fstrmLoggerAddress = '/tmp/fslutest.sock'
352 _fstrmLoggerQueue = Queue.Queue()
353 _fstrmLoggerCounter = 0
354 _config_params = ['_testServerPort', '_fstrmLoggerAddress']
355 _config_template = """
356 newServer{address="127.0.0.1:%s", useClientSubnet=true}
357 fslu = newFrameStreamUnixLogger('%s')
358
359 addAction(AllRule(), DnstapLogAction("a.server", fslu))
360 """
361
362 @classmethod
363 def FrameStreamUnixListener(cls, path):
364 try:
365 os.unlink(path)
366 except OSError:
367 pass # Assume file not found
368 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
369 try:
370 sock.bind(path)
371 except socket.error as e:
372 print("Error binding in the framestream listener: %s" % str(e))
373 sys.exit(1)
374
375 sock.listen(100)
376 while True:
377 (conn, _) = sock.accept()
378 fstrm_handle_bidir_connection(conn, lambda data: \
379 cls._fstrmLoggerQueue.put(data, True, timeout=2.0))
380 conn.close()
381 sock.close()
382
383 @classmethod
384 def startResponders(cls):
385 DNSDistTest.startResponders()
386
387 cls._fstrmLoggerListener = threading.Thread(name='FrameStreamUnixListener', target=cls.FrameStreamUnixListener, args=[cls._fstrmLoggerAddress])
388 cls._fstrmLoggerListener.setDaemon(True)
389 cls._fstrmLoggerListener.start()
390
391 def getFirstDnstap(self):
392 data = self._fstrmLoggerQueue.get(True, timeout=2.0)
393 self.assertTrue(data)
394 dnstap = dnstap_pb2.Dnstap()
395 dnstap.ParseFromString(data)
396 return dnstap
397
398 def testDnstapOverFrameStreamUnix(self):
399 """
400 Dnstap: Send query packed in dnstap to a unix socket fstrmlogger server
401 """
402 name = 'query.dnstap.tests.powerdns.com.'
403
404 target = 'target.dnstap.tests.powerdns.com.'
405 query = dns.message.make_query(name, 'A', 'IN')
406 response = dns.message.make_response(query)
407
408 rrset = dns.rrset.from_text(name,
409 3600,
410 dns.rdataclass.IN,
411 dns.rdatatype.CNAME,
412 target)
413 response.answer.append(rrset)
414
415 rrset = dns.rrset.from_text(target,
416 3600,
417 dns.rdataclass.IN,
418 dns.rdatatype.A,
419 '127.0.0.1')
420 response.answer.append(rrset)
421
422 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
423 self.assertTrue(receivedQuery)
424 self.assertTrue(receivedResponse)
425 receivedQuery.id = query.id
426 self.assertEquals(query, receivedQuery)
427 self.assertEquals(response, receivedResponse)
428
429 # check the dnstap message corresponding to the UDP query
430 dnstap = self.getFirstDnstap()
431
432 checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
433 checkDnstapNoExtra(self, dnstap)
434
435
436 class TestDnstapOverFrameStreamTcpLogger(DNSDistTest):
437 _fstrmLoggerPort = 4000
438 _fstrmLoggerQueue = Queue.Queue()
439 _fstrmLoggerCounter = 0
440 _config_params = ['_testServerPort', '_fstrmLoggerPort']
441 _config_template = """
442 newServer{address="127.0.0.1:%s", useClientSubnet=true}
443 fslu = newFrameStreamTcpLogger('127.0.0.1:%s')
444
445 addAction(AllRule(), DnstapLogAction("a.server", fslu))
446 """
447
448 @classmethod
449 def FrameStreamUnixListener(cls, port):
450 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
451 try:
452 sock.bind(("127.0.0.1", port))
453 except socket.error as e:
454 print("Error binding in the framestream listener: %s" % str(e))
455 sys.exit(1)
456
457 sock.listen(100)
458 while True:
459 (conn, _) = sock.accept()
460 fstrm_handle_bidir_connection(conn, lambda data: \
461 cls._fstrmLoggerQueue.put(data, True, timeout=2.0))
462 conn.close()
463 sock.close()
464
465 @classmethod
466 def startResponders(cls):
467 DNSDistTest.startResponders()
468
469 cls._fstrmLoggerListener = threading.Thread(name='FrameStreamUnixListener', target=cls.FrameStreamUnixListener, args=[cls._fstrmLoggerPort])
470 cls._fstrmLoggerListener.setDaemon(True)
471 cls._fstrmLoggerListener.start()
472
473 def getFirstDnstap(self):
474 data = self._fstrmLoggerQueue.get(True, timeout=2.0)
475 self.assertTrue(data)
476 dnstap = dnstap_pb2.Dnstap()
477 dnstap.ParseFromString(data)
478 return dnstap
479
480 def testDnstapOverFrameStreamTcp(self):
481 """
482 Dnstap: Send query packed in dnstap to a tcp socket fstrmlogger server
483 """
484 name = 'query.dnstap.tests.powerdns.com.'
485
486 target = 'target.dnstap.tests.powerdns.com.'
487 query = dns.message.make_query(name, 'A', 'IN')
488 response = dns.message.make_response(query)
489
490 rrset = dns.rrset.from_text(name,
491 3600,
492 dns.rdataclass.IN,
493 dns.rdatatype.CNAME,
494 target)
495 response.answer.append(rrset)
496
497 rrset = dns.rrset.from_text(target,
498 3600,
499 dns.rdataclass.IN,
500 dns.rdatatype.A,
501 '127.0.0.1')
502 response.answer.append(rrset)
503
504 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
505 self.assertTrue(receivedQuery)
506 self.assertTrue(receivedResponse)
507 receivedQuery.id = query.id
508 self.assertEquals(query, receivedQuery)
509 self.assertEquals(response, receivedResponse)
510
511 # check the dnstap message corresponding to the UDP query
512 dnstap = self.getFirstDnstap()
513
514 checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
515 checkDnstapNoExtra(self, dnstap)