]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Dnstap.py
8c2fd12dc7dbb18f14b33d4ea7233587bd4a9e87
9 from dnsdisttests
import DNSDistTest
14 FSTRM_CONTROL_ACCEPT
= 0x01
15 FSTRM_CONTROL_START
= 0x02
16 FSTRM_CONTROL_STOP
= 0x03
17 FSTRM_CONTROL_READY
= 0x04
18 FSTRM_CONTROL_FINISH
= 0x05
21 def checkDnstapBase(testinstance
, dnstap
, protocol
, initiator
):
22 testinstance
.assertTrue(dnstap
)
23 testinstance
.assertTrue(dnstap
.HasField('identity'))
24 testinstance
.assertEqual(dnstap
.identity
, 'a.server')
25 testinstance
.assertTrue(dnstap
.HasField('version'))
26 testinstance
.assertIn('dnsdist ', dnstap
.version
)
27 testinstance
.assertTrue(dnstap
.HasField('type'))
28 testinstance
.assertEqual(dnstap
.type, dnstap
.MESSAGE
)
29 testinstance
.assertTrue(dnstap
.HasField('message'))
30 testinstance
.assertTrue(dnstap
.message
.HasField('socket_protocol'))
31 testinstance
.assertEqual(dnstap
.message
.socket_protocol
, protocol
)
32 testinstance
.assertTrue(dnstap
.message
.HasField('socket_family'))
33 testinstance
.assertEquals(dnstap
.message
.socket_family
, dnstap_pb2
.INET
)
34 testinstance
.assertTrue(dnstap
.message
.HasField('query_address'))
35 testinstance
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, dnstap
.message
.query_address
), initiator
)
36 testinstance
.assertTrue(dnstap
.message
.HasField('response_address'))
37 testinstance
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, dnstap
.message
.response_address
), initiator
)
38 testinstance
.assertTrue(dnstap
.message
.HasField('response_port'))
39 testinstance
.assertEquals(dnstap
.message
.response_port
, testinstance
._dnsDistPort
)
42 def checkDnstapQuery(testinstance
, dnstap
, protocol
, query
, initiator
='127.0.0.1'):
43 testinstance
.assertEquals(dnstap
.message
.type, dnstap_pb2
.Message
.CLIENT_QUERY
)
44 checkDnstapBase(testinstance
, dnstap
, protocol
, initiator
)
46 testinstance
.assertTrue(dnstap
.message
.HasField('query_time_sec'))
47 testinstance
.assertTrue(dnstap
.message
.HasField('query_time_nsec'))
49 testinstance
.assertTrue(dnstap
.message
.HasField('query_message'))
50 wire_message
= dns
.message
.from_wire(dnstap
.message
.query_message
)
51 testinstance
.assertEqual(wire_message
, query
)
54 def checkDnstapExtra(testinstance
, dnstap
, expected
):
55 testinstance
.assertTrue(dnstap
.HasField('extra'))
56 testinstance
.assertEqual(dnstap
.extra
, expected
)
59 def checkDnstapNoExtra(testinstance
, dnstap
):
60 testinstance
.assertFalse(dnstap
.HasField('extra'))
63 def checkDnstapResponse(testinstance
, dnstap
, protocol
, response
, initiator
='127.0.0.1'):
64 testinstance
.assertEquals(dnstap
.message
.type, dnstap_pb2
.Message
.CLIENT_RESPONSE
)
65 checkDnstapBase(testinstance
, dnstap
, protocol
, initiator
)
67 testinstance
.assertTrue(dnstap
.message
.HasField('query_time_sec'))
68 testinstance
.assertTrue(dnstap
.message
.HasField('query_time_nsec'))
70 testinstance
.assertTrue(dnstap
.message
.HasField('response_time_sec'))
71 testinstance
.assertTrue(dnstap
.message
.HasField('response_time_nsec'))
73 testinstance
.assertTrue(dnstap
.message
.response_time_sec
> dnstap
.message
.query_time_sec
or \
74 dnstap
.message
.response_time_nsec
> dnstap
.message
.query_time_nsec
)
76 testinstance
.assertTrue(dnstap
.message
.HasField('response_message'))
77 wire_message
= dns
.message
.from_wire(dnstap
.message
.response_message
)
78 testinstance
.assertEqual(wire_message
, response
)
81 class TestDnstapOverRemoteLogger(DNSDistTest
):
82 _remoteLoggerServerPort
= 4243
83 _remoteLoggerQueue
= Queue
.Queue()
84 _remoteLoggerCounter
= 0
85 _config_params
= ['_testServerPort', '_remoteLoggerServerPort']
86 _config_template
= """
87 extrasmn = newSuffixMatchNode()
88 extrasmn:add(newDNSName('extra.dnstap.tests.powerdns.com.'))
90 luatarget = 'lua.dnstap.tests.powerdns.com.'
92 function alterDnstapQuery(dq, tap)
93 if extrasmn:check(dq.qname) then
94 tap:setExtra("Type,Query")
98 function alterDnstapResponse(dq, tap)
99 if extrasmn:check(dq.qname) then
100 tap:setExtra("Type,Response")
106 dq.dh:setRCode(dnsdist.NXDOMAIN)
107 return DNSAction.None, ""
110 newServer{address="127.0.0.1:%s", useClientSubnet=true}
111 rl = newRemoteLogger('127.0.0.1:%s')
113 addAction(AllRule(), DnstapLogAction("a.server", rl, alterDnstapQuery)) -- Send dnstap message before lookup
115 addAction(luatarget, LuaAction(luaFunc)) -- Send dnstap message before lookup
117 addResponseAction(AllRule(), DnstapLogResponseAction("a.server", rl, alterDnstapResponse)) -- Send dnstap message after lookup
119 addAction('spoof.dnstap.tests.powerdns.com.', SpoofAction("192.0.2.1"))
123 def RemoteLoggerListener(cls
, port
):
124 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
125 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEPORT
, 1)
127 sock
.bind(("127.0.0.1", port
))
128 except socket
.error
as e
:
129 print("Error binding in the protbuf listener: %s" % str(e
))
134 (conn
, _
) = sock
.accept()
140 (datalen
,) = struct
.unpack("!H", data
)
141 data
= conn
.recv(datalen
)
145 cls
._remoteLoggerQueue
.put(data
, True, timeout
=2.0)
151 def startResponders(cls
):
152 DNSDistTest
.startResponders()
154 cls
._remoteLoggerListener
= threading
.Thread(name
='RemoteLogger Listener', target
=cls
.RemoteLoggerListener
, args
=[cls
._remoteLoggerServerPort
])
155 cls
._remoteLoggerListener
.setDaemon(True)
156 cls
._remoteLoggerListener
.start()
158 def getFirstDnstap(self
):
159 self
.assertFalse(self
._remoteLoggerQueue
.empty())
160 data
= self
._remoteLoggerQueue
.get(False)
161 self
.assertTrue(data
)
162 dnstap
= dnstap_pb2
.Dnstap()
163 dnstap
.ParseFromString(data
)
166 def testDnstap(self
):
168 Dnstap: Send query and responses packed in dnstap to a remotelogger server
170 name
= 'query.dnstap.tests.powerdns.com.'
172 target
= 'target.dnstap.tests.powerdns.com.'
173 query
= dns
.message
.make_query(name
, 'A', 'IN')
174 response
= dns
.message
.make_response(query
)
176 rrset
= dns
.rrset
.from_text(name
,
181 response
.answer
.append(rrset
)
183 rrset
= dns
.rrset
.from_text(target
,
188 response
.answer
.append(rrset
)
190 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
191 self
.assertTrue(receivedQuery
)
192 self
.assertTrue(receivedResponse
)
193 receivedQuery
.id = query
.id
194 self
.assertEquals(query
, receivedQuery
)
195 self
.assertEquals(response
, receivedResponse
)
197 # give the dnstap messages time to get here
200 # check the dnstap message corresponding to the UDP query
201 dnstap
= self
.getFirstDnstap()
203 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.UDP
, query
)
204 checkDnstapNoExtra(self
, dnstap
)
206 # check the dnstap message corresponding to the UDP response
207 dnstap
= self
.getFirstDnstap()
208 checkDnstapResponse(self
, dnstap
, dnstap_pb2
.UDP
, response
)
209 checkDnstapNoExtra(self
, dnstap
)
211 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
212 self
.assertTrue(receivedQuery
)
213 self
.assertTrue(receivedResponse
)
214 receivedQuery
.id = query
.id
215 self
.assertEquals(query
, receivedQuery
)
216 self
.assertEquals(response
, receivedResponse
)
218 # give the dnstap messages time to get here
221 # check the dnstap message corresponding to the TCP query
222 dnstap
= self
.getFirstDnstap()
224 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.TCP
, query
)
225 checkDnstapNoExtra(self
, dnstap
)
227 # check the dnstap message corresponding to the TCP response
228 dnstap
= self
.getFirstDnstap()
229 checkDnstapResponse(self
, dnstap
, dnstap_pb2
.TCP
, response
)
230 checkDnstapNoExtra(self
, dnstap
)
232 def testDnstapExtra(self
):
234 DnstapExtra: Send query and responses packed in dnstap to a remotelogger server. Extra data is filled out.
236 name
= 'extra.dnstap.tests.powerdns.com.'
238 target
= 'target.dnstap.tests.powerdns.com.'
239 query
= dns
.message
.make_query(name
, 'A', 'IN')
240 response
= dns
.message
.make_response(query
)
242 rrset
= dns
.rrset
.from_text(name
,
247 response
.answer
.append(rrset
)
249 rrset
= dns
.rrset
.from_text(target
,
254 response
.answer
.append(rrset
)
256 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
257 self
.assertTrue(receivedQuery
)
258 self
.assertTrue(receivedResponse
)
259 receivedQuery
.id = query
.id
260 self
.assertEquals(query
, receivedQuery
)
261 self
.assertEquals(response
, receivedResponse
)
263 # give the dnstap messages time to get here
266 # check the dnstap message corresponding to the UDP query
267 dnstap
= self
.getFirstDnstap()
268 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.UDP
, query
)
269 checkDnstapExtra(self
, dnstap
, "Type,Query")
271 # check the dnstap message corresponding to the UDP response
272 dnstap
= self
.getFirstDnstap()
273 checkDnstapResponse(self
, dnstap
, dnstap_pb2
.UDP
, response
)
274 checkDnstapExtra(self
, dnstap
, "Type,Response")
276 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
277 self
.assertTrue(receivedQuery
)
278 self
.assertTrue(receivedResponse
)
279 receivedQuery
.id = query
.id
280 self
.assertEquals(query
, receivedQuery
)
281 self
.assertEquals(response
, receivedResponse
)
283 # give the dnstap messages time to get here
286 # check the dnstap message corresponding to the TCP query
287 dnstap
= self
.getFirstDnstap()
288 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.TCP
, query
)
289 checkDnstapExtra(self
, dnstap
, "Type,Query")
291 # check the dnstap message corresponding to the TCP response
292 dnstap
= self
.getFirstDnstap()
293 checkDnstapResponse(self
, dnstap
, dnstap_pb2
.TCP
, response
)
294 checkDnstapExtra(self
, dnstap
, "Type,Response")
297 def fstrm_get_control_frame_type(data
):
298 (t
,) = struct
.unpack("!L", data
[0:4])
302 def fstrm_make_control_frame_reply(cft
, data
):
303 if cft
== FSTRM_CONTROL_READY
:
304 # Reply with ACCEPT frame and content-type
305 contenttype
= 'protobuf:dnstap.Dnstap'
306 frame
= struct
.pack('!LLL', FSTRM_CONTROL_ACCEPT
, 1,
307 len(contenttype
)) + contenttype
308 buf
= struct
.pack("!LL", 0, len(frame
)) + frame
310 elif cft
== FSTRM_CONTROL_START
:
313 raise Exception('unhandled control frame ' + cft
)
316 def fstrm_read_and_dispatch_control_frame(conn
):
319 raise Exception('length of control frame payload could not be read')
320 (datalen
,) = struct
.unpack("!L", data
)
321 data
= conn
.recv(datalen
)
322 cft
= fstrm_get_control_frame_type(data
)
323 reply
= fstrm_make_control_frame_reply(cft
, data
)
329 def fstrm_handle_bidir_connection(conn
, on_data
):
335 (datalen
,) = struct
.unpack("!L", data
)
337 # control frame length follows
338 cft
= fstrm_read_and_dispatch_control_frame(conn
)
339 if cft
== FSTRM_CONTROL_STOP
:
343 data
= conn
.recv(datalen
)
350 class TestDnstapOverFrameStreamUnixLogger(DNSDistTest
):
351 _fstrmLoggerAddress
= '/tmp/fslutest.sock'
352 _fstrmLoggerQueue
= Queue
.Queue()
353 _fstrmLoggerCounter
= 0
354 _config_params
= ['_testServerPort', '_fstrmLoggerAddress']
355 _config_template
= """
356 newServer{address="127.0.0.1:%s", useClientSubnet=true}
357 fslu = newFrameStreamUnixLogger('%s')
359 addAction(AllRule(), DnstapLogAction("a.server", fslu))
363 def FrameStreamUnixListener(cls
, path
):
367 pass # Assume file not found
368 sock
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
371 except socket
.error
as e
:
372 print("Error binding in the framestream listener: %s" % str(e
))
377 (conn
, _
) = sock
.accept()
378 fstrm_handle_bidir_connection(conn
, lambda data
: \
379 cls
._fstrmLoggerQueue
.put(data
, True, timeout
=2.0))
384 def startResponders(cls
):
385 DNSDistTest
.startResponders()
387 cls
._fstrmLoggerListener
= threading
.Thread(name
='FrameStreamUnixListener', target
=cls
.FrameStreamUnixListener
, args
=[cls
._fstrmLoggerAddress
])
388 cls
._fstrmLoggerListener
.setDaemon(True)
389 cls
._fstrmLoggerListener
.start()
391 def getFirstDnstap(self
):
392 data
= self
._fstrmLoggerQueue
.get(True, timeout
=2.0)
393 self
.assertTrue(data
)
394 dnstap
= dnstap_pb2
.Dnstap()
395 dnstap
.ParseFromString(data
)
398 def testDnstapOverFrameStreamUnix(self
):
400 Dnstap: Send query packed in dnstap to a unix socket fstrmlogger server
402 name
= 'query.dnstap.tests.powerdns.com.'
404 target
= 'target.dnstap.tests.powerdns.com.'
405 query
= dns
.message
.make_query(name
, 'A', 'IN')
406 response
= dns
.message
.make_response(query
)
408 rrset
= dns
.rrset
.from_text(name
,
413 response
.answer
.append(rrset
)
415 rrset
= dns
.rrset
.from_text(target
,
420 response
.answer
.append(rrset
)
422 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
423 self
.assertTrue(receivedQuery
)
424 self
.assertTrue(receivedResponse
)
425 receivedQuery
.id = query
.id
426 self
.assertEquals(query
, receivedQuery
)
427 self
.assertEquals(response
, receivedResponse
)
429 # check the dnstap message corresponding to the UDP query
430 dnstap
= self
.getFirstDnstap()
432 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.UDP
, query
)
433 checkDnstapNoExtra(self
, dnstap
)
436 class TestDnstapOverFrameStreamTcpLogger(DNSDistTest
):
437 _fstrmLoggerPort
= 4000
438 _fstrmLoggerQueue
= Queue
.Queue()
439 _fstrmLoggerCounter
= 0
440 _config_params
= ['_testServerPort', '_fstrmLoggerPort']
441 _config_template
= """
442 newServer{address="127.0.0.1:%s", useClientSubnet=true}
443 fslu = newFrameStreamTcpLogger('127.0.0.1:%s')
445 addAction(AllRule(), DnstapLogAction("a.server", fslu))
449 def FrameStreamUnixListener(cls
, port
):
450 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
452 sock
.bind(("127.0.0.1", port
))
453 except socket
.error
as e
:
454 print("Error binding in the framestream listener: %s" % str(e
))
459 (conn
, _
) = sock
.accept()
460 fstrm_handle_bidir_connection(conn
, lambda data
: \
461 cls
._fstrmLoggerQueue
.put(data
, True, timeout
=2.0))
466 def startResponders(cls
):
467 DNSDistTest
.startResponders()
469 cls
._fstrmLoggerListener
= threading
.Thread(name
='FrameStreamUnixListener', target
=cls
.FrameStreamUnixListener
, args
=[cls
._fstrmLoggerPort
])
470 cls
._fstrmLoggerListener
.setDaemon(True)
471 cls
._fstrmLoggerListener
.start()
473 def getFirstDnstap(self
):
474 data
= self
._fstrmLoggerQueue
.get(True, timeout
=2.0)
475 self
.assertTrue(data
)
476 dnstap
= dnstap_pb2
.Dnstap()
477 dnstap
.ParseFromString(data
)
480 def testDnstapOverFrameStreamTcp(self
):
482 Dnstap: Send query packed in dnstap to a tcp socket fstrmlogger server
484 name
= 'query.dnstap.tests.powerdns.com.'
486 target
= 'target.dnstap.tests.powerdns.com.'
487 query
= dns
.message
.make_query(name
, 'A', 'IN')
488 response
= dns
.message
.make_response(query
)
490 rrset
= dns
.rrset
.from_text(name
,
495 response
.answer
.append(rrset
)
497 rrset
= dns
.rrset
.from_text(target
,
502 response
.answer
.append(rrset
)
504 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
505 self
.assertTrue(receivedQuery
)
506 self
.assertTrue(receivedResponse
)
507 receivedQuery
.id = query
.id
508 self
.assertEquals(query
, receivedQuery
)
509 self
.assertEquals(response
, receivedResponse
)
511 # check the dnstap message corresponding to the UDP query
512 dnstap
= self
.getFirstDnstap()
514 checkDnstapQuery(self
, dnstap
, dnstap_pb2
.UDP
, query
)
515 checkDnstapNoExtra(self
, dnstap
)