]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Dnstap.py
Merge pull request #8787 from rgacogne/ddist-tls-key-log-file
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Dnstap.py
1 #!/usr/bin/env python
2 import threading
3 import os
4 import socket
5 import struct
6 import sys
7 import time
8 from dnsdisttests import DNSDistTest, Queue
9
10 import dns
11 import dnstap_pb2
12
13 FSTRM_CONTROL_ACCEPT = 0x01
14 FSTRM_CONTROL_START = 0x02
15 FSTRM_CONTROL_STOP = 0x03
16 FSTRM_CONTROL_READY = 0x04
17 FSTRM_CONTROL_FINISH = 0x05
18
19
20 def checkDnstapBase(testinstance, dnstap, protocol, initiator):
21 testinstance.assertTrue(dnstap)
22 testinstance.assertTrue(dnstap.HasField('identity'))
23 testinstance.assertEqual(dnstap.identity, b'a.server')
24 testinstance.assertTrue(dnstap.HasField('version'))
25 testinstance.assertIn(b'dnsdist ', dnstap.version)
26 testinstance.assertTrue(dnstap.HasField('type'))
27 testinstance.assertEqual(dnstap.type, dnstap.MESSAGE)
28 testinstance.assertTrue(dnstap.HasField('message'))
29 testinstance.assertTrue(dnstap.message.HasField('socket_protocol'))
30 testinstance.assertEqual(dnstap.message.socket_protocol, protocol)
31 testinstance.assertTrue(dnstap.message.HasField('socket_family'))
32 testinstance.assertEquals(dnstap.message.socket_family, dnstap_pb2.INET)
33 testinstance.assertTrue(dnstap.message.HasField('query_address'))
34 testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.query_address), initiator)
35 testinstance.assertTrue(dnstap.message.HasField('response_address'))
36 testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.response_address), initiator)
37 testinstance.assertTrue(dnstap.message.HasField('response_port'))
38 testinstance.assertEquals(dnstap.message.response_port, testinstance._dnsDistPort)
39
40
41 def checkDnstapQuery(testinstance, dnstap, protocol, query, initiator='127.0.0.1'):
42 testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.CLIENT_QUERY)
43 checkDnstapBase(testinstance, dnstap, protocol, initiator)
44
45 testinstance.assertTrue(dnstap.message.HasField('query_time_sec'))
46 testinstance.assertTrue(dnstap.message.HasField('query_time_nsec'))
47
48 testinstance.assertTrue(dnstap.message.HasField('query_message'))
49 wire_message = dns.message.from_wire(dnstap.message.query_message)
50 testinstance.assertEqual(wire_message, query)
51
52
53 def checkDnstapExtra(testinstance, dnstap, expected):
54 testinstance.assertTrue(dnstap.HasField('extra'))
55 testinstance.assertEqual(dnstap.extra, expected)
56
57
58 def checkDnstapNoExtra(testinstance, dnstap):
59 testinstance.assertFalse(dnstap.HasField('extra'))
60
61
62 def checkDnstapResponse(testinstance, dnstap, protocol, response, initiator='127.0.0.1'):
63 testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.CLIENT_RESPONSE)
64 checkDnstapBase(testinstance, dnstap, protocol, initiator)
65
66 testinstance.assertTrue(dnstap.message.HasField('query_time_sec'))
67 testinstance.assertTrue(dnstap.message.HasField('query_time_nsec'))
68
69 testinstance.assertTrue(dnstap.message.HasField('response_time_sec'))
70 testinstance.assertTrue(dnstap.message.HasField('response_time_nsec'))
71
72 testinstance.assertTrue(dnstap.message.response_time_sec > dnstap.message.query_time_sec or \
73 dnstap.message.response_time_nsec > dnstap.message.query_time_nsec)
74
75 testinstance.assertTrue(dnstap.message.HasField('response_message'))
76 wire_message = dns.message.from_wire(dnstap.message.response_message)
77 testinstance.assertEqual(wire_message, response)
78
79
80 class TestDnstapOverRemoteLogger(DNSDistTest):
81 _remoteLoggerServerPort = 4243
82 _remoteLoggerQueue = Queue()
83 _remoteLoggerCounter = 0
84 _config_params = ['_testServerPort', '_remoteLoggerServerPort']
85 _config_template = """
86 extrasmn = newSuffixMatchNode()
87 extrasmn:add(newDNSName('extra.dnstap.tests.powerdns.com.'))
88
89 luatarget = 'lua.dnstap.tests.powerdns.com.'
90
91 function alterDnstapQuery(dq, tap)
92 if extrasmn:check(dq.qname) then
93 tap:setExtra("Type,Query")
94 end
95 end
96
97 function alterDnstapResponse(dq, tap)
98 if extrasmn:check(dq.qname) then
99 tap:setExtra("Type,Response")
100 end
101 end
102
103 function luaFunc(dq)
104 dq.dh:setQR(true)
105 dq.dh:setRCode(DNSRCode.NXDOMAIN)
106 return DNSAction.None, ""
107 end
108
109 newServer{address="127.0.0.1:%s", useClientSubnet=true}
110 rl = newRemoteLogger('127.0.0.1:%s')
111
112 addAction(AllRule(), DnstapLogAction("a.server", rl, alterDnstapQuery)) -- Send dnstap message before lookup
113
114 addAction(luatarget, LuaAction(luaFunc)) -- Send dnstap message before lookup
115
116 addResponseAction(AllRule(), DnstapLogResponseAction("a.server", rl, alterDnstapResponse)) -- Send dnstap message after lookup
117
118 addAction('spoof.dnstap.tests.powerdns.com.', SpoofAction("192.0.2.1"))
119 """
120
121 @classmethod
122 def RemoteLoggerListener(cls, port):
123 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
125 try:
126 sock.bind(("127.0.0.1", port))
127 except socket.error as e:
128 print("Error binding in the protbuf listener: %s" % str(e))
129 sys.exit(1)
130
131 sock.listen(100)
132 while True:
133 (conn, _) = sock.accept()
134 data = None
135 while True:
136 data = conn.recv(2)
137 if not data:
138 break
139 (datalen,) = struct.unpack("!H", data)
140 data = conn.recv(datalen)
141 if not data:
142 break
143
144 cls._remoteLoggerQueue.put(data, True, timeout=2.0)
145
146 conn.close()
147 sock.close()
148
149 @classmethod
150 def startResponders(cls):
151 DNSDistTest.startResponders()
152
153 cls._remoteLoggerListener = threading.Thread(name='RemoteLogger Listener', target=cls.RemoteLoggerListener, args=[cls._remoteLoggerServerPort])
154 cls._remoteLoggerListener.setDaemon(True)
155 cls._remoteLoggerListener.start()
156
157 def getFirstDnstap(self):
158 self.assertFalse(self._remoteLoggerQueue.empty())
159 data = self._remoteLoggerQueue.get(False)
160 self.assertTrue(data)
161 dnstap = dnstap_pb2.Dnstap()
162 dnstap.ParseFromString(data)
163 return dnstap
164
165 def testDnstap(self):
166 """
167 Dnstap: Send query and responses packed in dnstap to a remotelogger server
168 """
169 name = 'query.dnstap.tests.powerdns.com.'
170
171 target = 'target.dnstap.tests.powerdns.com.'
172 query = dns.message.make_query(name, 'A', 'IN')
173 response = dns.message.make_response(query)
174
175 rrset = dns.rrset.from_text(name,
176 3600,
177 dns.rdataclass.IN,
178 dns.rdatatype.CNAME,
179 target)
180 response.answer.append(rrset)
181
182 rrset = dns.rrset.from_text(target,
183 3600,
184 dns.rdataclass.IN,
185 dns.rdatatype.A,
186 '127.0.0.1')
187 response.answer.append(rrset)
188
189 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
190 self.assertTrue(receivedQuery)
191 self.assertTrue(receivedResponse)
192 receivedQuery.id = query.id
193 self.assertEquals(query, receivedQuery)
194 self.assertEquals(response, receivedResponse)
195
196 # give the dnstap messages time to get here
197 time.sleep(1)
198
199 # check the dnstap message corresponding to the UDP query
200 dnstap = self.getFirstDnstap()
201
202 checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
203 checkDnstapNoExtra(self, dnstap)
204
205 # check the dnstap message corresponding to the UDP response
206 dnstap = self.getFirstDnstap()
207 checkDnstapResponse(self, dnstap, dnstap_pb2.UDP, response)
208 checkDnstapNoExtra(self, dnstap)
209
210 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
211 self.assertTrue(receivedQuery)
212 self.assertTrue(receivedResponse)
213 receivedQuery.id = query.id
214 self.assertEquals(query, receivedQuery)
215 self.assertEquals(response, receivedResponse)
216
217 # give the dnstap messages time to get here
218 time.sleep(1)
219
220 # check the dnstap message corresponding to the TCP query
221 dnstap = self.getFirstDnstap()
222
223 checkDnstapQuery(self, dnstap, dnstap_pb2.TCP, query)
224 checkDnstapNoExtra(self, dnstap)
225
226 # check the dnstap message corresponding to the TCP response
227 dnstap = self.getFirstDnstap()
228 checkDnstapResponse(self, dnstap, dnstap_pb2.TCP, response)
229 checkDnstapNoExtra(self, dnstap)
230
231 def testDnstapExtra(self):
232 """
233 DnstapExtra: Send query and responses packed in dnstap to a remotelogger server. Extra data is filled out.
234 """
235 name = 'extra.dnstap.tests.powerdns.com.'
236
237 target = 'target.dnstap.tests.powerdns.com.'
238 query = dns.message.make_query(name, 'A', 'IN')
239 response = dns.message.make_response(query)
240
241 rrset = dns.rrset.from_text(name,
242 3600,
243 dns.rdataclass.IN,
244 dns.rdatatype.CNAME,
245 target)
246 response.answer.append(rrset)
247
248 rrset = dns.rrset.from_text(target,
249 3600,
250 dns.rdataclass.IN,
251 dns.rdatatype.A,
252 '127.0.0.1')
253 response.answer.append(rrset)
254
255 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
256 self.assertTrue(receivedQuery)
257 self.assertTrue(receivedResponse)
258 receivedQuery.id = query.id
259 self.assertEquals(query, receivedQuery)
260 self.assertEquals(response, receivedResponse)
261
262 # give the dnstap messages time to get here
263 time.sleep(1)
264
265 # check the dnstap message corresponding to the UDP query
266 dnstap = self.getFirstDnstap()
267 checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
268 checkDnstapExtra(self, dnstap, b"Type,Query")
269
270 # check the dnstap message corresponding to the UDP response
271 dnstap = self.getFirstDnstap()
272 checkDnstapResponse(self, dnstap, dnstap_pb2.UDP, response)
273 checkDnstapExtra(self, dnstap, b"Type,Response")
274
275 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
276 self.assertTrue(receivedQuery)
277 self.assertTrue(receivedResponse)
278 receivedQuery.id = query.id
279 self.assertEquals(query, receivedQuery)
280 self.assertEquals(response, receivedResponse)
281
282 # give the dnstap messages time to get here
283 time.sleep(1)
284
285 # check the dnstap message corresponding to the TCP query
286 dnstap = self.getFirstDnstap()
287 checkDnstapQuery(self, dnstap, dnstap_pb2.TCP, query)
288 checkDnstapExtra(self, dnstap, b"Type,Query")
289
290 # check the dnstap message corresponding to the TCP response
291 dnstap = self.getFirstDnstap()
292 checkDnstapResponse(self, dnstap, dnstap_pb2.TCP, response)
293 checkDnstapExtra(self, dnstap, b"Type,Response")
294
295
296 def fstrm_get_control_frame_type(data):
297 (t,) = struct.unpack("!L", data[0:4])
298 return t
299
300
301 def fstrm_make_control_frame_reply(cft, data):
302 if cft == FSTRM_CONTROL_READY:
303 # Reply with ACCEPT frame and content-type
304 contenttype = b'protobuf:dnstap.Dnstap'
305 frame = struct.pack('!LLL', FSTRM_CONTROL_ACCEPT, 1,
306 len(contenttype)) + contenttype
307 buf = struct.pack("!LL", 0, len(frame)) + frame
308 return buf
309 elif cft == FSTRM_CONTROL_START:
310 return None
311 else:
312 raise Exception('unhandled control frame ' + cft)
313
314
315 def fstrm_read_and_dispatch_control_frame(conn):
316 data = conn.recv(4)
317 if not data:
318 raise Exception('length of control frame payload could not be read')
319 (datalen,) = struct.unpack("!L", data)
320 data = conn.recv(datalen)
321 cft = fstrm_get_control_frame_type(data)
322 reply = fstrm_make_control_frame_reply(cft, data)
323 if reply:
324 conn.send(reply)
325 return cft
326
327
328 def fstrm_handle_bidir_connection(conn, on_data):
329 data = None
330 while True:
331 data = conn.recv(4)
332 if not data:
333 break
334 (datalen,) = struct.unpack("!L", data)
335 if datalen == 0:
336 # control frame length follows
337 cft = fstrm_read_and_dispatch_control_frame(conn)
338 if cft == FSTRM_CONTROL_STOP:
339 break
340 else:
341 # data frame
342 data = conn.recv(datalen)
343 if not data:
344 break
345
346 on_data(data)
347
348
349 class TestDnstapOverFrameStreamUnixLogger(DNSDistTest):
350 _fstrmLoggerAddress = '/tmp/fslutest.sock'
351 _fstrmLoggerQueue = Queue()
352 _fstrmLoggerCounter = 0
353 _config_params = ['_testServerPort', '_fstrmLoggerAddress']
354 _config_template = """
355 newServer{address="127.0.0.1:%s", useClientSubnet=true}
356 fslu = newFrameStreamUnixLogger('%s')
357
358 addAction(AllRule(), DnstapLogAction("a.server", fslu))
359 """
360
361 @classmethod
362 def FrameStreamUnixListener(cls, path):
363 try:
364 os.unlink(path)
365 except OSError:
366 pass # Assume file not found
367 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
368 try:
369 sock.bind(path)
370 except socket.error as e:
371 print("Error binding in the framestream listener: %s" % str(e))
372 sys.exit(1)
373
374 sock.listen(100)
375 while True:
376 (conn, _) = sock.accept()
377 fstrm_handle_bidir_connection(conn, lambda data: \
378 cls._fstrmLoggerQueue.put(data, True, timeout=2.0))
379 conn.close()
380 sock.close()
381
382 @classmethod
383 def startResponders(cls):
384 DNSDistTest.startResponders()
385
386 cls._fstrmLoggerListener = threading.Thread(name='FrameStreamUnixListener', target=cls.FrameStreamUnixListener, args=[cls._fstrmLoggerAddress])
387 cls._fstrmLoggerListener.setDaemon(True)
388 cls._fstrmLoggerListener.start()
389
390 def getFirstDnstap(self):
391 data = self._fstrmLoggerQueue.get(True, timeout=2.0)
392 self.assertTrue(data)
393 dnstap = dnstap_pb2.Dnstap()
394 dnstap.ParseFromString(data)
395 return dnstap
396
397 def testDnstapOverFrameStreamUnix(self):
398 """
399 Dnstap: Send query packed in dnstap to a unix socket fstrmlogger server
400 """
401 name = 'query.dnstap.tests.powerdns.com.'
402
403 target = 'target.dnstap.tests.powerdns.com.'
404 query = dns.message.make_query(name, 'A', 'IN')
405 response = dns.message.make_response(query)
406
407 rrset = dns.rrset.from_text(name,
408 3600,
409 dns.rdataclass.IN,
410 dns.rdatatype.CNAME,
411 target)
412 response.answer.append(rrset)
413
414 rrset = dns.rrset.from_text(target,
415 3600,
416 dns.rdataclass.IN,
417 dns.rdatatype.A,
418 '127.0.0.1')
419 response.answer.append(rrset)
420
421 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
422 self.assertTrue(receivedQuery)
423 self.assertTrue(receivedResponse)
424 receivedQuery.id = query.id
425 self.assertEquals(query, receivedQuery)
426 self.assertEquals(response, receivedResponse)
427
428 # check the dnstap message corresponding to the UDP query
429 dnstap = self.getFirstDnstap()
430
431 checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
432 checkDnstapNoExtra(self, dnstap)
433
434
435 class TestDnstapOverFrameStreamTcpLogger(DNSDistTest):
436 _fstrmLoggerPort = 4000
437 _fstrmLoggerQueue = Queue()
438 _fstrmLoggerCounter = 0
439 _config_params = ['_testServerPort', '_fstrmLoggerPort']
440 _config_template = """
441 newServer{address="127.0.0.1:%s", useClientSubnet=true}
442 fslu = newFrameStreamTcpLogger('127.0.0.1:%s')
443
444 addAction(AllRule(), DnstapLogAction("a.server", fslu))
445 """
446
447 @classmethod
448 def FrameStreamUnixListener(cls, port):
449 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
450 try:
451 sock.bind(("127.0.0.1", port))
452 except socket.error as e:
453 print("Error binding in the framestream listener: %s" % str(e))
454 sys.exit(1)
455
456 sock.listen(100)
457 while True:
458 (conn, _) = sock.accept()
459 fstrm_handle_bidir_connection(conn, lambda data: \
460 cls._fstrmLoggerQueue.put(data, True, timeout=2.0))
461 conn.close()
462 sock.close()
463
464 @classmethod
465 def startResponders(cls):
466 DNSDistTest.startResponders()
467
468 cls._fstrmLoggerListener = threading.Thread(name='FrameStreamUnixListener', target=cls.FrameStreamUnixListener, args=[cls._fstrmLoggerPort])
469 cls._fstrmLoggerListener.setDaemon(True)
470 cls._fstrmLoggerListener.start()
471
472 def getFirstDnstap(self):
473 data = self._fstrmLoggerQueue.get(True, timeout=2.0)
474 self.assertTrue(data)
475 dnstap = dnstap_pb2.Dnstap()
476 dnstap.ParseFromString(data)
477 return dnstap
478
479 def testDnstapOverFrameStreamTcp(self):
480 """
481 Dnstap: Send query packed in dnstap to a tcp socket fstrmlogger server
482 """
483 name = 'query.dnstap.tests.powerdns.com.'
484
485 target = 'target.dnstap.tests.powerdns.com.'
486 query = dns.message.make_query(name, 'A', 'IN')
487 response = dns.message.make_response(query)
488
489 rrset = dns.rrset.from_text(name,
490 3600,
491 dns.rdataclass.IN,
492 dns.rdatatype.CNAME,
493 target)
494 response.answer.append(rrset)
495
496 rrset = dns.rrset.from_text(target,
497 3600,
498 dns.rdataclass.IN,
499 dns.rdatatype.A,
500 '127.0.0.1')
501 response.answer.append(rrset)
502
503 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
504 self.assertTrue(receivedQuery)
505 self.assertTrue(receivedResponse)
506 receivedQuery.id = query.id
507 self.assertEquals(query, receivedQuery)
508 self.assertEquals(response, receivedResponse)
509
510 # check the dnstap message corresponding to the UDP query
511 dnstap = self.getFirstDnstap()
512
513 checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
514 checkDnstapNoExtra(self, dnstap)