]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Dnstap.py
8 from dnsdisttests
import DNSDistTest
, Queue
13 FSTRM_CONTROL_ACCEPT
= 0x01
14 FSTRM_CONTROL_START
= 0x02
15 FSTRM_CONTROL_STOP
= 0x03
16 FSTRM_CONTROL_READY
= 0x04
17 FSTRM_CONTROL_FINISH
= 0x05
20 def checkDnstapBase(testinstance
, dnstap
, protocol
, initiator
):
21 testinstance
.assertTrue(dnstap
)
22 testinstance
.assertTrue(dnstap
.HasField('identity'))
23 testinstance
.assertEqual(dnstap
.identity
, b
'a.server')
24 testinstance
.assertTrue(dnstap
.HasField('version'))
25 testinstance
.assertIn(b
'dnsdist ', dnstap
.version
)
26 testinstance
.assertTrue(dnstap
.HasField('type'))
27 testinstance
.assertEqual(dnstap
.type, dnstap
.MESSAGE
)
28 testinstance
.assertTrue(dnstap
.HasField('message'))
29 testinstance
.assertTrue(dnstap
.message
.HasField('socket_protocol'))
30 testinstance
.assertEqual(dnstap
.message
.socket_protocol
, protocol
)
31 testinstance
.assertTrue(dnstap
.message
.HasField('socket_family'))
32 testinstance
.assertEquals(dnstap
.message
.socket_family
, dnstap_pb2
.INET
)
33 testinstance
.assertTrue(dnstap
.message
.HasField('query_address'))
34 testinstance
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, dnstap
.message
.query_address
), initiator
)
35 testinstance
.assertTrue(dnstap
.message
.HasField('response_address'))
36 testinstance
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, dnstap
.message
.response_address
), initiator
)
37 testinstance
.assertTrue(dnstap
.message
.HasField('response_port'))
38 testinstance
.assertEquals(dnstap
.message
.response_port
, testinstance
._dnsDistPort
)
41 def checkDnstapQuery(testinstance
, dnstap
, protocol
, query
, initiator
='127.0.0.1'):
42 testinstance
.assertEquals(dnstap
.message
.type, dnstap_pb2
.Message
.CLIENT_QUERY
)
43 checkDnstapBase(testinstance
, dnstap
, protocol
, initiator
)
45 testinstance
.assertTrue(dnstap
.message
.HasField('query_time_sec'))
46 testinstance
.assertTrue(dnstap
.message
.HasField('query_time_nsec'))
48 testinstance
.assertTrue(dnstap
.message
.HasField('query_message'))
49 wire_message
= dns
.message
.from_wire(dnstap
.message
.query_message
)
50 testinstance
.assertEqual(wire_message
, query
)
53 def checkDnstapExtra(testinstance
, dnstap
, expected
):
54 testinstance
.assertTrue(dnstap
.HasField('extra'))
55 testinstance
.assertEqual(dnstap
.extra
, expected
)
58 def checkDnstapNoExtra(testinstance
, dnstap
):
59 testinstance
.assertFalse(dnstap
.HasField('extra'))
62 def checkDnstapResponse(testinstance
, dnstap
, protocol
, response
, initiator
='127.0.0.1'):
63 testinstance
.assertEquals(dnstap
.message
.type, dnstap_pb2
.Message
.CLIENT_RESPONSE
)
64 checkDnstapBase(testinstance
, dnstap
, protocol
, initiator
)
66 testinstance
.assertTrue(dnstap
.message
.HasField('query_time_sec'))
67 testinstance
.assertTrue(dnstap
.message
.HasField('query_time_nsec'))
69 testinstance
.assertTrue(dnstap
.message
.HasField('response_time_sec'))
70 testinstance
.assertTrue(dnstap
.message
.HasField('response_time_nsec'))
72 testinstance
.assertTrue(dnstap
.message
.response_time_sec
> dnstap
.message
.query_time_sec
or \
73 dnstap
.message
.response_time_nsec
> dnstap
.message
.query_time_nsec
)
75 testinstance
.assertTrue(dnstap
.message
.HasField('response_message'))
76 wire_message
= dns
.message
.from_wire(dnstap
.message
.response_message
)
77 testinstance
.assertEqual(wire_message
, response
)
80 class TestDnstapOverRemoteLogger(DNSDistTest
):
81 _remoteLoggerServerPort
= 4243
82 _remoteLoggerQueue
= Queue()
83 _remoteLoggerCounter
= 0
84 _config_params
= ['_testServerPort', '_remoteLoggerServerPort']
85 _config_template
= """
86 extrasmn = newSuffixMatchNode()
87 extrasmn:add(newDNSName('extra.dnstap.tests.powerdns.com.'))
89 luatarget = 'lua.dnstap.tests.powerdns.com.'
91 function alterDnstapQuery(dq, tap)
92 if extrasmn:check(dq.qname) then
93 tap:setExtra("Type,Query")
97 function alterDnstapResponse(dq, tap)
98 if extrasmn:check(dq.qname) then
99 tap:setExtra("Type,Response")
105 dq.dh:setRCode(dnsdist.NXDOMAIN)
106 return DNSAction.None, ""
109 newServer{address="127.0.0.1:%s", useClientSubnet=true}
110 rl = newRemoteLogger('127.0.0.1:%s')
112 addAction(AllRule(), DnstapLogAction("a.server", rl, alterDnstapQuery)) -- Send dnstap message before lookup
114 addAction(luatarget, LuaAction(luaFunc)) -- Send dnstap message before lookup
116 addResponseAction(AllRule(), DnstapLogResponseAction("a.server", rl, alterDnstapResponse)) -- Send dnstap message after lookup
118 addAction('spoof.dnstap.tests.powerdns.com.', SpoofAction("192.0.2.1"))
122 def RemoteLoggerListener(cls
, port
):
123 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
124 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEPORT
, 1)
126 sock
.bind(("127.0.0.1", port
))
127 except socket
.error
as e
:
128 print("Error binding in the protbuf listener: %s" % str(e
))
133 (conn
, _
) = sock
.accept()
139 (datalen
,) = struct
.unpack("!H", data
)
140 data
= conn
.recv(datalen
)
144 cls
._remoteLoggerQueue
.put(data
, True, timeout
=2.0)
150 def startResponders(cls
):
151 DNSDistTest
.startResponders()
153 cls
._remoteLoggerListener
= threading
.Thread(name
='RemoteLogger Listener', target
=cls
.RemoteLoggerListener
, args
=[cls
._remoteLoggerServerPort
])
154 cls
._remoteLoggerListener
.setDaemon(True)
155 cls
._remoteLoggerListener
.start()
157 def getFirstDnstap(self
):
158 self
.assertFalse(self
._remoteLoggerQueue
.empty())
159 data
= self
._remoteLoggerQueue
.get(False)
160 self
.assertTrue(data
)
161 dnstap
= dnstap_pb2
.Dnstap()
162 dnstap
.ParseFromString(data
)
165 def testDnstap(self
):
167 Dnstap: Send query and responses packed in dnstap to a remotelogger server
169 name
= 'query.dnstap.tests.powerdns.com.'
171 target
= 'target.dnstap.tests.powerdns.com.'
172 query
= dns
.message
.make_query(name
, 'A', 'IN')
173 response
= dns
.message
.make_response(query
)
175 rrset
= dns
.rrset
.from_text(name
,
180 response
.answer
.append(rrset
)
182 rrset
= dns
.rrset
.from_text(target
,
187 response
.answer
.append(rrset
)
189 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
190 self
.assertTrue(receivedQuery
)
191 self
.assertTrue(receivedResponse
)
192 receivedQuery
.id = query
.id
193 self
.assertEquals(query
, receivedQuery
)
194 self
.assertEquals(response
, receivedResponse
)
196 # give the dnstap messages time to get here
199 # check the dnstap message corresponding to the UDP query
200 dnstap
= self
.getFirstDnstap()
202 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.UDP
, query
)
203 checkDnstapNoExtra(self
, dnstap
)
205 # check the dnstap message corresponding to the UDP response
206 dnstap
= self
.getFirstDnstap()
207 checkDnstapResponse(self
, dnstap
, dnstap_pb2
.UDP
, response
)
208 checkDnstapNoExtra(self
, dnstap
)
210 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
211 self
.assertTrue(receivedQuery
)
212 self
.assertTrue(receivedResponse
)
213 receivedQuery
.id = query
.id
214 self
.assertEquals(query
, receivedQuery
)
215 self
.assertEquals(response
, receivedResponse
)
217 # give the dnstap messages time to get here
220 # check the dnstap message corresponding to the TCP query
221 dnstap
= self
.getFirstDnstap()
223 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.TCP
, query
)
224 checkDnstapNoExtra(self
, dnstap
)
226 # check the dnstap message corresponding to the TCP response
227 dnstap
= self
.getFirstDnstap()
228 checkDnstapResponse(self
, dnstap
, dnstap_pb2
.TCP
, response
)
229 checkDnstapNoExtra(self
, dnstap
)
231 def testDnstapExtra(self
):
233 DnstapExtra: Send query and responses packed in dnstap to a remotelogger server. Extra data is filled out.
235 name
= 'extra.dnstap.tests.powerdns.com.'
237 target
= 'target.dnstap.tests.powerdns.com.'
238 query
= dns
.message
.make_query(name
, 'A', 'IN')
239 response
= dns
.message
.make_response(query
)
241 rrset
= dns
.rrset
.from_text(name
,
246 response
.answer
.append(rrset
)
248 rrset
= dns
.rrset
.from_text(target
,
253 response
.answer
.append(rrset
)
255 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
256 self
.assertTrue(receivedQuery
)
257 self
.assertTrue(receivedResponse
)
258 receivedQuery
.id = query
.id
259 self
.assertEquals(query
, receivedQuery
)
260 self
.assertEquals(response
, receivedResponse
)
262 # give the dnstap messages time to get here
265 # check the dnstap message corresponding to the UDP query
266 dnstap
= self
.getFirstDnstap()
267 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.UDP
, query
)
268 checkDnstapExtra(self
, dnstap
, b
"Type,Query")
270 # check the dnstap message corresponding to the UDP response
271 dnstap
= self
.getFirstDnstap()
272 checkDnstapResponse(self
, dnstap
, dnstap_pb2
.UDP
, response
)
273 checkDnstapExtra(self
, dnstap
, b
"Type,Response")
275 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
276 self
.assertTrue(receivedQuery
)
277 self
.assertTrue(receivedResponse
)
278 receivedQuery
.id = query
.id
279 self
.assertEquals(query
, receivedQuery
)
280 self
.assertEquals(response
, receivedResponse
)
282 # give the dnstap messages time to get here
285 # check the dnstap message corresponding to the TCP query
286 dnstap
= self
.getFirstDnstap()
287 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.TCP
, query
)
288 checkDnstapExtra(self
, dnstap
, b
"Type,Query")
290 # check the dnstap message corresponding to the TCP response
291 dnstap
= self
.getFirstDnstap()
292 checkDnstapResponse(self
, dnstap
, dnstap_pb2
.TCP
, response
)
293 checkDnstapExtra(self
, dnstap
, b
"Type,Response")
296 def fstrm_get_control_frame_type(data
):
297 (t
,) = struct
.unpack("!L", data
[0:4])
301 def fstrm_make_control_frame_reply(cft
, data
):
302 if cft
== FSTRM_CONTROL_READY
:
303 # Reply with ACCEPT frame and content-type
304 contenttype
= b
'protobuf:dnstap.Dnstap'
305 frame
= struct
.pack('!LLL', FSTRM_CONTROL_ACCEPT
, 1,
306 len(contenttype
)) + contenttype
307 buf
= struct
.pack("!LL", 0, len(frame
)) + frame
309 elif cft
== FSTRM_CONTROL_START
:
312 raise Exception('unhandled control frame ' + cft
)
315 def fstrm_read_and_dispatch_control_frame(conn
):
318 raise Exception('length of control frame payload could not be read')
319 (datalen
,) = struct
.unpack("!L", data
)
320 data
= conn
.recv(datalen
)
321 cft
= fstrm_get_control_frame_type(data
)
322 reply
= fstrm_make_control_frame_reply(cft
, data
)
328 def fstrm_handle_bidir_connection(conn
, on_data
):
334 (datalen
,) = struct
.unpack("!L", data
)
336 # control frame length follows
337 cft
= fstrm_read_and_dispatch_control_frame(conn
)
338 if cft
== FSTRM_CONTROL_STOP
:
342 data
= conn
.recv(datalen
)
349 class TestDnstapOverFrameStreamUnixLogger(DNSDistTest
):
350 _fstrmLoggerAddress
= '/tmp/fslutest.sock'
351 _fstrmLoggerQueue
= Queue()
352 _fstrmLoggerCounter
= 0
353 _config_params
= ['_testServerPort', '_fstrmLoggerAddress']
354 _config_template
= """
355 newServer{address="127.0.0.1:%s", useClientSubnet=true}
356 fslu = newFrameStreamUnixLogger('%s')
358 addAction(AllRule(), DnstapLogAction("a.server", fslu))
362 def FrameStreamUnixListener(cls
, path
):
366 pass # Assume file not found
367 sock
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
370 except socket
.error
as e
:
371 print("Error binding in the framestream listener: %s" % str(e
))
376 (conn
, _
) = sock
.accept()
377 fstrm_handle_bidir_connection(conn
, lambda data
: \
378 cls
._fstrmLoggerQueue
.put(data
, True, timeout
=2.0))
383 def startResponders(cls
):
384 DNSDistTest
.startResponders()
386 cls
._fstrmLoggerListener
= threading
.Thread(name
='FrameStreamUnixListener', target
=cls
.FrameStreamUnixListener
, args
=[cls
._fstrmLoggerAddress
])
387 cls
._fstrmLoggerListener
.setDaemon(True)
388 cls
._fstrmLoggerListener
.start()
390 def getFirstDnstap(self
):
391 data
= self
._fstrmLoggerQueue
.get(True, timeout
=2.0)
392 self
.assertTrue(data
)
393 dnstap
= dnstap_pb2
.Dnstap()
394 dnstap
.ParseFromString(data
)
397 def testDnstapOverFrameStreamUnix(self
):
399 Dnstap: Send query packed in dnstap to a unix socket fstrmlogger server
401 name
= 'query.dnstap.tests.powerdns.com.'
403 target
= 'target.dnstap.tests.powerdns.com.'
404 query
= dns
.message
.make_query(name
, 'A', 'IN')
405 response
= dns
.message
.make_response(query
)
407 rrset
= dns
.rrset
.from_text(name
,
412 response
.answer
.append(rrset
)
414 rrset
= dns
.rrset
.from_text(target
,
419 response
.answer
.append(rrset
)
421 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
422 self
.assertTrue(receivedQuery
)
423 self
.assertTrue(receivedResponse
)
424 receivedQuery
.id = query
.id
425 self
.assertEquals(query
, receivedQuery
)
426 self
.assertEquals(response
, receivedResponse
)
428 # check the dnstap message corresponding to the UDP query
429 dnstap
= self
.getFirstDnstap()
431 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.UDP
, query
)
432 checkDnstapNoExtra(self
, dnstap
)
435 class TestDnstapOverFrameStreamTcpLogger(DNSDistTest
):
436 _fstrmLoggerPort
= 4000
437 _fstrmLoggerQueue
= Queue()
438 _fstrmLoggerCounter
= 0
439 _config_params
= ['_testServerPort', '_fstrmLoggerPort']
440 _config_template
= """
441 newServer{address="127.0.0.1:%s", useClientSubnet=true}
442 fslu = newFrameStreamTcpLogger('127.0.0.1:%s')
444 addAction(AllRule(), DnstapLogAction("a.server", fslu))
448 def FrameStreamUnixListener(cls
, port
):
449 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
451 sock
.bind(("127.0.0.1", port
))
452 except socket
.error
as e
:
453 print("Error binding in the framestream listener: %s" % str(e
))
458 (conn
, _
) = sock
.accept()
459 fstrm_handle_bidir_connection(conn
, lambda data
: \
460 cls
._fstrmLoggerQueue
.put(data
, True, timeout
=2.0))
465 def startResponders(cls
):
466 DNSDistTest
.startResponders()
468 cls
._fstrmLoggerListener
= threading
.Thread(name
='FrameStreamUnixListener', target
=cls
.FrameStreamUnixListener
, args
=[cls
._fstrmLoggerPort
])
469 cls
._fstrmLoggerListener
.setDaemon(True)
470 cls
._fstrmLoggerListener
.start()
472 def getFirstDnstap(self
):
473 data
= self
._fstrmLoggerQueue
.get(True, timeout
=2.0)
474 self
.assertTrue(data
)
475 dnstap
= dnstap_pb2
.Dnstap()
476 dnstap
.ParseFromString(data
)
479 def testDnstapOverFrameStreamTcp(self
):
481 Dnstap: Send query packed in dnstap to a tcp socket fstrmlogger server
483 name
= 'query.dnstap.tests.powerdns.com.'
485 target
= 'target.dnstap.tests.powerdns.com.'
486 query
= dns
.message
.make_query(name
, 'A', 'IN')
487 response
= dns
.message
.make_response(query
)
489 rrset
= dns
.rrset
.from_text(name
,
494 response
.answer
.append(rrset
)
496 rrset
= dns
.rrset
.from_text(target
,
501 response
.answer
.append(rrset
)
503 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
504 self
.assertTrue(receivedQuery
)
505 self
.assertTrue(receivedResponse
)
506 receivedQuery
.id = query
.id
507 self
.assertEquals(query
, receivedQuery
)
508 self
.assertEquals(response
, receivedResponse
)
510 # check the dnstap message corresponding to the UDP query
511 dnstap
= self
.getFirstDnstap()
513 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.UDP
, query
)
514 checkDnstapNoExtra(self
, dnstap
)