10 # Python2/3 compatibility hacks
11 if sys
.version_info
[0] == 2:
12 from Queue
import Queue
15 from queue
import Queue
16 range = range # allow re-export of the builtin name
18 from recursortests
import RecursorTest
20 protobufQueue
= Queue()
21 protobufServerPort
= 4243
23 def ProtobufConnectionHandler(queue
, conn
):
29 (datalen
,) = struct
.unpack("!H", data
)
30 data
= conn
.recv(datalen
)
34 queue
.put(data
, True, timeout
=2.0)
38 def ProtobufListener(port
):
40 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
41 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEPORT
, 1)
43 sock
.bind(("127.0.0.1", port
))
44 except socket
.error
as e
:
45 print("Error binding in the protobuf listener: %s" % str(e
))
51 (conn
, _
) = sock
.accept()
52 thread
= threading
.Thread(name
='Connection Handler',
53 target
=ProtobufConnectionHandler
,
54 args
=[protobufQueue
, conn
])
55 thread
.setDaemon(True)
58 except socket
.error
as e
:
59 print('Error in protobuf socket: %s' % str(e
))
64 protobufListener
= threading
.Thread(name
='Protobuf Listener', target
=ProtobufListener
, args
=[protobufServerPort
])
65 protobufListener
.setDaemon(True)
66 protobufListener
.start()
68 class TestRecursorProtobuf(RecursorTest
):
70 global protobufServerPort
71 _lua_config_file
= """
72 protobufServer("127.0.0.1:%d")
73 """ % (protobufServerPort
)
76 def getFirstProtobufMessage(self
, retries
=1, waitTime
=1):
80 while protobufQueue
.empty
:
87 self
.assertFalse(protobufQueue
.empty())
88 data
= protobufQueue
.get(False)
90 msg
= dnsmessage_pb2
.PBDNSMessage()
91 msg
.ParseFromString(data
)
94 def checkNoRemainingMessage(self
):
96 self
.assertTrue(protobufQueue
.empty())
98 def checkProtobufBase(self
, msg
, protocol
, query
, initiator
, normalQueryResponse
=True, expectedECS
=None):
100 self
.assertTrue(msg
.HasField('timeSec'))
101 self
.assertTrue(msg
.HasField('socketFamily'))
102 self
.assertEquals(msg
.socketFamily
, dnsmessage_pb2
.PBDNSMessage
.INET
)
103 self
.assertTrue(msg
.HasField('from'))
104 fromvalue
= getattr(msg
, 'from')
105 self
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, fromvalue
), initiator
)
106 self
.assertTrue(msg
.HasField('socketProtocol'))
107 self
.assertEquals(msg
.socketProtocol
, protocol
)
108 self
.assertTrue(msg
.HasField('messageId'))
109 self
.assertTrue(msg
.HasField('id'))
110 self
.assertEquals(msg
.id, query
.id)
111 self
.assertTrue(msg
.HasField('inBytes'))
112 if normalQueryResponse
:
113 # compare inBytes with length of query/response
114 self
.assertEquals(msg
.inBytes
, len(query
.to_wire()))
115 if expectedECS
is not None:
116 self
.assertTrue(msg
.HasField('originalRequestorSubnet'))
118 self
.assertEquals(len(msg
.originalRequestorSubnet
), 4)
119 self
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, msg
.originalRequestorSubnet
), '127.0.0.1')
121 def checkOutgoingProtobufBase(self
, msg
, protocol
, query
, initiator
):
123 self
.assertTrue(msg
.HasField('timeSec'))
124 self
.assertTrue(msg
.HasField('socketFamily'))
125 self
.assertEquals(msg
.socketFamily
, dnsmessage_pb2
.PBDNSMessage
.INET
)
126 self
.assertTrue(msg
.HasField('socketProtocol'))
127 self
.assertEquals(msg
.socketProtocol
, protocol
)
128 self
.assertTrue(msg
.HasField('messageId'))
129 self
.assertTrue(msg
.HasField('id'))
130 self
.assertNotEquals(msg
.id, query
.id)
131 self
.assertTrue(msg
.HasField('inBytes'))
132 # compare inBytes with length of query/response
133 self
.assertEquals(msg
.inBytes
, len(query
.to_wire()))
135 def checkProtobufQuery(self
, msg
, protocol
, query
, qclass
, qtype
, qname
, initiator
='127.0.0.1'):
136 self
.assertEquals(msg
.type, dnsmessage_pb2
.PBDNSMessage
.DNSQueryType
)
137 self
.checkProtobufBase(msg
, protocol
, query
, initiator
)
138 # dnsdist doesn't fill the responder field for responses
139 # because it doesn't keep the information around.
140 self
.assertTrue(msg
.HasField('to'))
141 self
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, msg
.to
), '127.0.0.1')
142 self
.assertTrue(msg
.HasField('question'))
143 self
.assertTrue(msg
.question
.HasField('qClass'))
144 self
.assertEquals(msg
.question
.qClass
, qclass
)
145 self
.assertTrue(msg
.question
.HasField('qType'))
146 self
.assertEquals(msg
.question
.qClass
, qtype
)
147 self
.assertTrue(msg
.question
.HasField('qName'))
148 self
.assertEquals(msg
.question
.qName
, qname
)
150 def checkProtobufResponse(self
, msg
, protocol
, response
, initiator
='127.0.0.1'):
151 self
.assertEquals(msg
.type, dnsmessage_pb2
.PBDNSMessage
.DNSResponseType
)
152 self
.checkProtobufBase(msg
, protocol
, response
, initiator
)
153 self
.assertTrue(msg
.HasField('response'))
154 self
.assertTrue(msg
.response
.HasField('queryTimeSec'))
156 def checkProtobufResponseRecord(self
, record
, rclass
, rtype
, rname
, rttl
):
157 self
.assertTrue(record
.HasField('class'))
158 self
.assertEquals(getattr(record
, 'class'), rclass
)
159 self
.assertTrue(record
.HasField('type'))
160 self
.assertEquals(record
.type, rtype
)
161 self
.assertTrue(record
.HasField('name'))
162 self
.assertEquals(record
.name
, rname
)
163 self
.assertTrue(record
.HasField('ttl'))
164 self
.assertEquals(record
.ttl
, rttl
)
165 self
.assertTrue(record
.HasField('rdata'))
167 def checkProtobufPolicy(self
, msg
, policyType
, reason
):
168 self
.assertEquals(msg
.type, dnsmessage_pb2
.PBDNSMessage
.DNSResponseType
)
169 self
.assertTrue(msg
.response
.HasField('appliedPolicyType'))
170 self
.assertTrue(msg
.response
.HasField('appliedPolicy'))
171 self
.assertEquals(msg
.response
.appliedPolicy
, reason
)
172 self
.assertEquals(msg
.response
.appliedPolicyType
, policyType
)
174 def checkProtobufTags(self
, msg
, tags
):
175 self
.assertEquals(len(msg
.response
.tags
), len(tags
))
176 for tag
in msg
.response
.tags
:
177 self
.assertTrue(tag
in tags
)
179 def checkProtobufOutgoingQuery(self
, msg
, protocol
, query
, qclass
, qtype
, qname
, initiator
='127.0.0.1'):
180 self
.assertEquals(msg
.type, dnsmessage_pb2
.PBDNSMessage
.DNSOutgoingQueryType
)
181 self
.checkOutgoingProtobufBase(msg
, protocol
, query
, initiator
)
182 self
.assertTrue(msg
.HasField('to'))
183 self
.assertTrue(msg
.HasField('question'))
184 self
.assertTrue(msg
.question
.HasField('qClass'))
185 self
.assertEquals(msg
.question
.qClass
, qclass
)
186 self
.assertTrue(msg
.question
.HasField('qType'))
187 self
.assertEquals(msg
.question
.qClass
, qtype
)
188 self
.assertTrue(msg
.question
.HasField('qName'))
189 self
.assertEquals(msg
.question
.qName
, qname
)
191 def checkProtobufIncomingResponse(self
, msg
, protocol
, response
, initiator
='127.0.0.1'):
192 self
.assertEquals(msg
.type, dnsmessage_pb2
.PBDNSMessage
.DNSIncomingResponseType
)
193 self
.checkOutgoingProtobufBase(msg
, protocol
, response
, initiator
)
194 self
.assertTrue(msg
.HasField('response'))
195 self
.assertTrue(msg
.response
.HasField('queryTimeSec'))
200 global protobufListener
201 global protobufServerPort
202 global ProtobufListener
203 if protobufListener
is None or not protobufListener
.isAlive():
204 protobufListener
= threading
.Thread(name
='Protobuf Listener', target
=ProtobufListener
, args
=[protobufServerPort
])
205 protobufListener
.setDaemon(True)
206 protobufListener
.start()
210 cls
.startResponders()
212 confdir
= os
.path
.join('configs', cls
._confdir
)
213 cls
.createConfigDir(confdir
)
215 cls
.generateRecursorConfig(confdir
)
216 cls
.startRecursor(confdir
, cls
._recursorPort
)
219 # Make sure the queue is empty, in case
220 # a previous test failed
222 while not protobufQueue
.empty():
223 protobufQueue
.get(False)
226 def generateRecursorConfig(cls
, confdir
):
227 authzonepath
= os
.path
.join(confdir
, 'example.zone')
228 with
open(authzonepath
, 'w') as authzone
:
229 authzone
.write("""$ORIGIN example.
231 a 3600 IN A 192.0.2.42
232 tagged 3600 IN A 192.0.2.84
233 query-selected 3600 IN A 192.0.2.84
234 answer-selected 3600 IN A 192.0.2.84
235 """.format(soa
=cls
._SOA
))
236 super(TestRecursorProtobuf
, cls
).generateRecursorConfig(confdir
)
239 def tearDownClass(cls
):
240 cls
.tearDownRecursor()
242 class ProtobufDefaultTest(TestRecursorProtobuf
):
244 This test makes sure that we correctly export queries and response over protobuf.
247 _confdir
= 'ProtobufDefault'
248 _config_template
= """
249 auth-zones=example=configs/%s/example.zone""" % _confdir
253 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.42')
254 query
= dns
.message
.make_query(name
, 'A', want_dnssec
=True)
255 query
.flags |
= dns
.flags
.CD
256 res
= self
.sendUDPQuery(query
)
257 self
.assertRRsetInAnswer(res
, expected
)
259 # check the protobuf messages corresponding to the UDP query and answer
260 msg
= self
.getFirstProtobufMessage()
261 self
.checkProtobufQuery(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, query
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
)
263 msg
= self
.getFirstProtobufMessage()
264 self
.checkProtobufResponse(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, res
)
265 self
.assertEquals(len(msg
.response
.rrs
), 1)
266 rr
= msg
.response
.rrs
[0]
267 # we have max-cache-ttl set to 15
268 self
.checkProtobufResponseRecord(rr
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
, 15)
269 self
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, rr
.rdata
), '192.0.2.42')
270 self
.checkNoRemainingMessage()
272 class OutgoingProtobufDefaultTest(TestRecursorProtobuf
):
274 This test makes sure that we correctly export outgoing queries over protobuf.
275 It must be improved and setup env so we can check for incoming responses, but makes sure for now
276 that the recursor at least connects to the protobuf server.
279 _confdir
= 'OutgoingProtobufDefault'
280 _config_template
= """
281 auth-zones=example=configs/%s/example.zone""" % _confdir
282 _lua_config_file
= """
283 outgoingProtobufServer("127.0.0.1:%d")
284 """ % (protobufServerPort
)
287 name
= 'www.example.org.'
288 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.42')
289 query
= dns
.message
.make_query(name
, 'A', want_dnssec
=True)
290 query
.flags |
= dns
.flags
.RD
291 res
= self
.sendUDPQuery(query
)
293 # check the protobuf messages corresponding to the UDP query and answer
294 msg
= self
.getFirstProtobufMessage()
295 self
.checkProtobufOutgoingQuery(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, query
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
)
296 # # then the response
297 # msg = self.getFirstProtobufMessage()
298 # self.checkProtobufIncomingResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
299 self
.checkNoRemainingMessage()
301 class ProtobufMasksTest(TestRecursorProtobuf
):
303 This test makes sure that we correctly export queries and response over protobuf, respecting the configured initiator masking.
306 _confdir
= 'ProtobufMasks'
307 _config_template
= """
308 auth-zones=example=configs/%s/example.zone""" % _confdir
309 global protobufServerPort
311 _protobufMaskV6
= 128
312 _lua_config_file
= """
313 protobufServer("127.0.0.1:%d")
314 setProtobufMasks(%d, %d)
315 """ % (protobufServerPort
, _protobufMaskV4
, _protobufMaskV6
)
319 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.42')
320 query
= dns
.message
.make_query(name
, 'A', want_dnssec
=True)
321 query
.flags |
= dns
.flags
.CD
322 res
= self
.sendUDPQuery(query
)
323 self
.assertRRsetInAnswer(res
, expected
)
325 # check the protobuf messages corresponding to the UDP query and answer
326 # but first let the protobuf messages the time to get there
327 msg
= self
.getFirstProtobufMessage()
328 self
.checkProtobufQuery(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, query
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
, '112.0.0.0')
330 msg
= self
.getFirstProtobufMessage()
331 self
.checkProtobufResponse(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, res
, '112.0.0.0')
332 self
.assertEquals(len(msg
.response
.rrs
), 1)
333 rr
= msg
.response
.rrs
[0]
334 # we have max-cache-ttl set to 15
335 self
.checkProtobufResponseRecord(rr
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
, 15)
336 self
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, rr
.rdata
), '192.0.2.42')
337 self
.checkNoRemainingMessage()
339 class ProtobufQueriesOnlyTest(TestRecursorProtobuf
):
341 This test makes sure that we correctly export queries but not responses over protobuf.
344 _confdir
= 'ProtobufQueriesOnly'
345 _config_template
= """
346 auth-zones=example=configs/%s/example.zone""" % _confdir
347 global protobufServerPort
348 _lua_config_file
= """
349 protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=false } )
350 """ % (protobufServerPort
)
354 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.42')
355 query
= dns
.message
.make_query(name
, 'A', want_dnssec
=True)
356 query
.flags |
= dns
.flags
.CD
357 res
= self
.sendUDPQuery(query
)
358 self
.assertRRsetInAnswer(res
, expected
)
360 # check the protobuf message corresponding to the UDP query
361 msg
= self
.getFirstProtobufMessage()
362 self
.checkProtobufQuery(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, query
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
)
364 self
.checkNoRemainingMessage()
366 class ProtobufResponsesOnlyTest(TestRecursorProtobuf
):
368 This test makes sure that we correctly export responses but not queries over protobuf.
371 _confdir
= 'ProtobufResponsesOnly'
372 _config_template
= """
373 auth-zones=example=configs/%s/example.zone""" % _confdir
374 global protobufServerPort
375 _lua_config_file
= """
376 protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=true } )
377 """ % (protobufServerPort
)
381 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.42')
382 query
= dns
.message
.make_query(name
, 'A', want_dnssec
=True)
383 query
.flags |
= dns
.flags
.CD
384 res
= self
.sendUDPQuery(query
)
385 self
.assertRRsetInAnswer(res
, expected
)
387 # check the protobuf message corresponding to the UDP response
388 msg
= self
.getFirstProtobufMessage()
389 self
.checkProtobufResponse(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, res
)
390 self
.assertEquals(len(msg
.response
.rrs
), 1)
391 rr
= msg
.response
.rrs
[0]
392 # we have max-cache-ttl set to 15
393 self
.checkProtobufResponseRecord(rr
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
, 15)
394 self
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, rr
.rdata
), '192.0.2.42')
395 # nothing else in the queue
396 self
.checkNoRemainingMessage()
398 class ProtobufTaggedOnlyTest(TestRecursorProtobuf
):
400 This test makes sure that we correctly export queries and responses but only if they have been tagged.
403 _confdir
= 'ProtobufTaggedOnly'
404 _config_template
= """
405 auth-zones=example=configs/%s/example.zone""" % _confdir
406 global protobufServerPort
407 _lua_config_file
= """
408 protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=true, taggedOnly=true } )
409 """ % (protobufServerPort
)
410 _tags
= ['tag1', 'tag2']
411 _tag_from_gettag
= 'tag-from-gettag'
412 _lua_dns_script_file
= """
413 function gettag(remote, ednssubnet, localip, qname, qtype, ednsoptions, tcp)
414 if qname:equal('tagged.example.') then
419 function preresolve(dq)
420 if dq.qname:equal('tagged.example.') then
421 dq:addPolicyTag('%s')
422 dq:addPolicyTag('%s')
426 """ % (_tag_from_gettag
, _tags
[0], _tags
[1])
430 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.42')
431 query
= dns
.message
.make_query(name
, 'A', want_dnssec
=True)
432 query
.flags |
= dns
.flags
.CD
433 res
= self
.sendUDPQuery(query
)
434 self
.assertRRsetInAnswer(res
, expected
)
436 # check the protobuf message corresponding to the UDP response
437 # the first query and answer are not tagged, so there is nothing in the queue
439 self
.checkNoRemainingMessage()
441 def testTagged(self
):
442 name
= 'tagged.example.'
443 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.84')
444 query
= dns
.message
.make_query(name
, 'A', want_dnssec
=True)
445 query
.flags |
= dns
.flags
.CD
446 res
= self
.sendUDPQuery(query
)
447 self
.assertRRsetInAnswer(res
, expected
)
449 # check the protobuf messages corresponding to the UDP query and answer
450 msg
= self
.getFirstProtobufMessage()
451 self
.checkProtobufQuery(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, query
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
)
452 self
.checkProtobufTags(msg
, [self
._tag
_from
_gettag
])
454 msg
= self
.getFirstProtobufMessage()
455 self
.checkProtobufResponse(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, res
)
456 self
.assertEquals(len(msg
.response
.rrs
), 1)
457 rr
= msg
.response
.rrs
[0]
458 # we have max-cache-ttl set to 15
459 self
.checkProtobufResponseRecord(rr
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
, 15)
460 self
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, rr
.rdata
), '192.0.2.84')
461 tags
= [self
._tag
_from
_gettag
] + self
._tags
462 self
.checkProtobufTags(msg
, tags
)
463 self
.checkNoRemainingMessage()
465 class ProtobufSelectedFromLuaTest(TestRecursorProtobuf
):
467 This test makes sure that we correctly export queries and responses but only if they have been selected from Lua.
470 _confdir
= 'ProtobufSelectedFromLua'
471 _config_template
= """
472 auth-zones=example=configs/%s/example.zone""" % _confdir
473 global protobufServerPort
474 _lua_config_file
= """
475 protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=false } )
476 """ % (protobufServerPort
)
477 _lua_dns_script_file
= """
478 local ffi = require("ffi")
481 typedef struct pdns_ffi_param pdns_ffi_param_t;
483 const char* pdns_ffi_param_get_qname(pdns_ffi_param_t* ref);
484 void pdns_ffi_param_set_log_query(pdns_ffi_param_t* ref, bool logQuery);
487 function gettag_ffi(obj)
488 qname = ffi.string(ffi.C.pdns_ffi_param_get_qname(obj))
489 if qname == 'query-selected.example' then
490 ffi.C.pdns_ffi_param_set_log_query(obj, true)
495 function preresolve(dq)
496 if dq.qname:equal('answer-selected.example.') then
497 dq.logResponse = true
505 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.42')
506 query
= dns
.message
.make_query(name
, 'A', want_dnssec
=True)
507 query
.flags |
= dns
.flags
.CD
508 res
= self
.sendUDPQuery(query
)
509 self
.assertRRsetInAnswer(res
, expected
)
511 # check the protobuf message corresponding to the UDP response
512 # the first query and answer are not selected, so there is nothing in the queue
513 self
.checkNoRemainingMessage()
515 def testQuerySelected(self
):
516 name
= 'query-selected.example.'
517 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.84')
518 query
= dns
.message
.make_query(name
, 'A', want_dnssec
=True)
519 query
.flags |
= dns
.flags
.CD
520 res
= self
.sendUDPQuery(query
)
521 self
.assertRRsetInAnswer(res
, expected
)
523 # check the protobuf messages corresponding to the UDP query
524 msg
= self
.getFirstProtobufMessage()
525 self
.checkProtobufQuery(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, query
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
)
526 # there should be no response
527 self
.checkNoRemainingMessage()
529 def testResponseSelected(self
):
530 name
= 'answer-selected.example.'
531 expected
= dns
.rrset
.from_text(name
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.84')
532 query
= dns
.message
.make_query(name
, 'A', want_dnssec
=True)
533 query
.flags |
= dns
.flags
.CD
534 res
= self
.sendUDPQuery(query
)
535 self
.assertRRsetInAnswer(res
, expected
)
537 # check the protobuf messages corresponding to the UDP response
538 msg
= self
.getFirstProtobufMessage()
539 self
.checkProtobufResponse(msg
, dnsmessage_pb2
.PBDNSMessage
.UDP
, res
)
540 self
.assertEquals(len(msg
.response
.rrs
), 1)
541 rr
= msg
.response
.rrs
[0]
542 # we have max-cache-ttl set to 15
543 self
.checkProtobufResponseRecord(rr
, dns
.rdataclass
.IN
, dns
.rdatatype
.A
, name
, 15)
544 self
.assertEquals(socket
.inet_ntop(socket
.AF_INET
, rr
.rdata
), '192.0.2.84')
545 self
.checkNoRemainingMessage()