7 from dnsdisttests
import DNSDistTest
, pickAvailablePort
10 class DNSCryptTest(DNSDistTest
):
12 dnsdist is configured to accept DNSCrypt queries on 127.0.0.1:_dnsDistPortDNSCrypt.
13 The provider's keys have been generated with:
14 generateDNSCryptProviderKeys("DNSCryptProviderPublic.key", "DNSCryptProviderPrivate.key")
15 Be careful to change the _providerFingerprint below if you want to regenerate the keys.
18 _dnsDistPortDNSCrypt
= pickAvailablePort()
20 _consoleKey
= DNSDistTest
.generateConsoleKey()
21 _consoleKeyB64
= base64
.b64encode(_consoleKey
).decode('ascii')
23 _providerFingerprint
= 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA'
24 _providerName
= "2.provider.name"
25 _resolverCertificateSerial
= 42
27 # valid from 60s ago until 2h from now
28 _resolverCertificateValidFrom
= int(time
.time() - 60)
29 _resolverCertificateValidUntil
= int(time
.time() + 7200)
31 def doDNSCryptQuery(self
, client
, query
, response
, tcp
):
32 self
._toResponderQueue
.put(response
)
33 data
= client
.query(query
.to_wire(), tcp
=tcp
)
34 receivedResponse
= dns
.message
.from_wire(data
)
36 if not self
._fromResponderQueue
.empty():
37 receivedQuery
= self
._fromResponderQueue
.get(query
)
39 self
.assertTrue(receivedQuery
)
40 self
.assertTrue(receivedResponse
)
41 receivedQuery
.id = query
.id
42 self
.assertEqual(query
, receivedQuery
)
43 self
.assertEqual(response
, receivedResponse
)
46 class TestDNSCrypt(DNSCryptTest
):
47 _config_template
= """
49 controlSocket("127.0.0.1:%s")
50 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
51 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
52 newServer{address="127.0.0.1:%s"}
54 function checkDNSCryptUDP(dq)
55 if dq:getProtocol() ~= "DNSCrypt UDP" then
56 return DNSAction.Spoof, '1.2.3.4'
61 function checkDNSCryptTCP(dq)
62 if dq:getProtocol() ~= "DNSCrypt TCP" then
63 return DNSAction.Spoof, '1.2.3.4'
68 addAction("udp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptUDP))
69 addAction("tcp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptTCP))
72 _config_params
= ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
74 def testSimpleA(self
):
76 DNSCrypt: encrypted A query
78 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", self
._dnsDistPortDNSCrypt
)
79 name
= 'a.dnscrypt.tests.powerdns.com.'
80 query
= dns
.message
.make_query(name
, 'A', 'IN')
81 response
= dns
.message
.make_response(query
)
82 rrset
= dns
.rrset
.from_text(name
,
87 response
.answer
.append(rrset
)
89 self
.doDNSCryptQuery(client
, query
, response
, False)
90 self
.doDNSCryptQuery(client
, query
, response
, True)
92 def testResponseLargerThanPaddedQuery(self
):
94 DNSCrypt: response larger than query
96 Send a small encrypted query (don't forget to take
97 the padding into account) and check that the response
100 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", self
._dnsDistPortDNSCrypt
)
101 name
= 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.'
102 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096)
103 response
= dns
.message
.make_response(query
)
104 rrset
= dns
.rrset
.from_text(name
,
109 response
.answer
.append(rrset
)
111 self
._toResponderQueue
.put(response
)
112 data
= client
.query(query
.to_wire())
114 if not self
._fromResponderQueue
.empty():
115 receivedQuery
= self
._fromResponderQueue
.get(query
)
117 receivedResponse
= dns
.message
.from_wire(data
)
119 self
.assertTrue(receivedQuery
)
120 receivedQuery
.id = query
.id
121 self
.assertEqual(query
, receivedQuery
)
122 self
.assertEqual(receivedResponse
.question
, response
.question
)
123 self
.assertTrue(receivedResponse
.flags
& ~dns
.flags
.TC
)
124 self
.assertTrue(len(receivedResponse
.answer
) == 0)
125 self
.assertTrue(len(receivedResponse
.authority
) == 0)
126 self
.assertTrue(len(receivedResponse
.additional
) == 0)
128 def testCertRotation(self
):
130 DNSCrypt: certificate rotation
132 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", self
._dnsDistPortDNSCrypt
)
133 client
.refreshResolverCertificates()
135 cert
= client
.getResolverCertificate()
136 self
.assertTrue(cert
)
137 self
.assertEqual(cert
.serial
, self
._resolverCertificateSerial
)
139 name
= 'rotation.dnscrypt.tests.powerdns.com.'
140 query
= dns
.message
.make_query(name
, 'A', 'IN')
141 response
= dns
.message
.make_response(query
)
142 rrset
= dns
.rrset
.from_text(name
,
147 response
.answer
.append(rrset
)
149 self
.doDNSCryptQuery(client
, query
, response
, False)
150 self
.doDNSCryptQuery(client
, query
, response
, True)
152 # generate a new certificate
153 self
.sendConsoleCommand("generateDNSCryptCertificate('DNSCryptProviderPrivate.key', 'DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2', {!s}, {:.0f}, {:.0f})".format(self
._resolverCertificateSerial
+ 1, self
._resolverCertificateValidFrom
, self
._resolverCertificateValidUntil
))
154 # add that new certificate
155 self
.sendConsoleCommand("getDNSCryptBind(0):loadNewCertificate('DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2')")
157 oldSerial
= self
.sendConsoleCommand("getDNSCryptBind(0):getCertificate(0):getSerial()")
158 self
.assertEqual(int(oldSerial
), self
._resolverCertificateSerial
)
159 effectiveSerial
= self
.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getSerial()")
160 self
.assertEqual(int(effectiveSerial
), self
._resolverCertificateSerial
+ 1)
161 tsStart
= self
.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getTSStart()")
162 self
.assertEqual(int(tsStart
), self
._resolverCertificateValidFrom
)
163 tsEnd
= self
.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getTSEnd()")
164 self
.assertEqual(int(tsEnd
), self
._resolverCertificateValidUntil
)
166 # we should still be able to send queries with the previous certificate
167 self
.doDNSCryptQuery(client
, query
, response
, False)
168 self
.doDNSCryptQuery(client
, query
, response
, True)
169 cert
= client
.getResolverCertificate()
170 self
.assertTrue(cert
)
171 self
.assertEqual(cert
.serial
, self
._resolverCertificateSerial
)
173 # but refreshing should get us the new one
174 client
.refreshResolverCertificates()
175 cert
= client
.getResolverCertificate()
176 self
.assertTrue(cert
)
177 self
.assertEqual(cert
.serial
, self
._resolverCertificateSerial
+ 1)
178 # we should still get the old ones
179 certs
= client
.getAllResolverCertificates(True)
180 self
.assertEqual(len(certs
), 2)
181 self
.assertEqual(certs
[0].serial
, self
._resolverCertificateSerial
)
182 self
.assertEqual(certs
[1].serial
, self
._resolverCertificateSerial
+ 1)
184 # generate a third certificate, this time in memory
185 self
.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self
._resolverCertificateSerial
+ 2, self
._resolverCertificateValidFrom
, self
._resolverCertificateValidUntil
))
187 # we should still be able to send queries with the previous certificate
188 self
.doDNSCryptQuery(client
, query
, response
, False)
189 self
.doDNSCryptQuery(client
, query
, response
, True)
190 cert
= client
.getResolverCertificate()
191 self
.assertTrue(cert
)
192 self
.assertEqual(cert
.serial
, self
._resolverCertificateSerial
+ 1)
194 # but refreshing should get us the new one
195 client
.refreshResolverCertificates()
196 cert
= client
.getResolverCertificate()
197 self
.assertTrue(cert
)
198 self
.assertEqual(cert
.serial
, self
._resolverCertificateSerial
+ 2)
199 # we should still get the old ones
200 certs
= client
.getAllResolverCertificates(True)
201 self
.assertEqual(len(certs
), 3)
202 self
.assertEqual(certs
[0].serial
, self
._resolverCertificateSerial
)
203 self
.assertEqual(certs
[1].serial
, self
._resolverCertificateSerial
+ 1)
204 self
.assertEqual(certs
[2].serial
, self
._resolverCertificateSerial
+ 2)
206 # generate a fourth certificate, still in memory
207 self
.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self
._resolverCertificateSerial
+ 3, self
._resolverCertificateValidFrom
, self
._resolverCertificateValidUntil
))
209 # mark the old ones as inactive
210 self
.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self
._resolverCertificateSerial
))
211 self
.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self
._resolverCertificateSerial
+ 1))
212 self
.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self
._resolverCertificateSerial
+ 2))
213 # we should still be able to send queries with the third one
214 self
.doDNSCryptQuery(client
, query
, response
, False)
215 self
.doDNSCryptQuery(client
, query
, response
, True)
216 cert
= client
.getResolverCertificate()
217 self
.assertTrue(cert
)
218 self
.assertEqual(cert
.serial
, self
._resolverCertificateSerial
+ 2)
220 self
.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self
._resolverCertificateSerial
))
221 self
.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self
._resolverCertificateSerial
+ 1))
222 self
.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self
._resolverCertificateSerial
+ 2))
224 # we should not be able to send with the old ones anymore
226 data
= client
.query(query
.to_wire())
227 except socket
.timeout
:
229 self
.assertEqual(data
, None)
231 # refreshing should get us the fourth one
232 client
.refreshResolverCertificates()
233 cert
= client
.getResolverCertificate()
234 self
.assertTrue(cert
)
235 self
.assertEqual(cert
.serial
, self
._resolverCertificateSerial
+ 3)
237 certs
= client
.getAllResolverCertificates(True)
238 self
.assertEqual(len(certs
), 1)
239 # and we should be able to query with it
240 self
.doDNSCryptQuery(client
, query
, response
, False)
241 self
.doDNSCryptQuery(client
, query
, response
, True)
242 cert
= client
.getResolverCertificate()
243 self
.assertTrue(cert
)
244 self
.assertEqual(cert
.serial
, self
._resolverCertificateSerial
+ 3)
246 def testProtocolUDP(self
):
248 DNSCrypt: Test DNSQuestion.Protocol over UDP
250 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", self
._dnsDistPortDNSCrypt
)
251 name
= 'udp.protocols.dnscrypt.tests.powerdns.com.'
252 query
= dns
.message
.make_query(name
, 'A', 'IN')
253 response
= dns
.message
.make_response(query
)
255 self
.doDNSCryptQuery(client
, query
, response
, False)
257 def testProtocolTCP(self
):
259 DNSCrypt: Test DNSQuestion.Protocol over TCP
261 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", self
._dnsDistPortDNSCrypt
)
262 name
= 'tcp.protocols.dnscrypt.tests.powerdns.com.'
263 query
= dns
.message
.make_query(name
, 'A', 'IN')
264 response
= dns
.message
.make_response(query
)
266 self
.doDNSCryptQuery(client
, query
, response
, True)
268 class TestDNSCryptWithCache(DNSCryptTest
):
270 _config_params
= ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
271 _config_template
= """
272 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
273 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
274 pc = newPacketCache(5, {maxTTL=86400, minTTL=1, numberOfShards=1})
275 getPool(""):setCache(pc)
276 newServer{address="127.0.0.1:%s"}
279 def testCachedSimpleA(self
):
281 DNSCrypt: encrypted A query served from cache
284 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", self
._dnsDistPortDNSCrypt
)
285 name
= 'cacheda.dnscrypt.tests.powerdns.com.'
286 query
= dns
.message
.make_query(name
, 'A', 'IN')
287 response
= dns
.message
.make_response(query
)
288 rrset
= dns
.rrset
.from_text(name
,
293 response
.answer
.append(rrset
)
295 # first query to fill the cache
296 self
._toResponderQueue
.put(response
)
297 data
= client
.query(query
.to_wire())
298 receivedResponse
= dns
.message
.from_wire(data
)
300 if not self
._fromResponderQueue
.empty():
301 receivedQuery
= self
._fromResponderQueue
.get(query
)
303 self
.assertTrue(receivedQuery
)
304 self
.assertTrue(receivedResponse
)
305 receivedQuery
.id = query
.id
306 self
.assertEqual(query
, receivedQuery
)
307 self
.assertEqual(response
, receivedResponse
)
310 # second query should get a cached response
311 data
= client
.query(query
.to_wire())
312 receivedResponse
= dns
.message
.from_wire(data
)
314 if not self
._fromResponderQueue
.empty():
315 receivedQuery
= self
._fromResponderQueue
.get(query
)
317 self
.assertEqual(receivedQuery
, None)
318 self
.assertTrue(receivedResponse
)
319 self
.assertEqual(response
, receivedResponse
)
321 for key
in self
._responsesCounter
:
322 total
+= self
._responsesCounter
[key
]
323 self
.assertEqual(total
, misses
)
325 class TestDNSCryptAutomaticRotation(DNSCryptTest
):
326 _config_template
= """
328 controlSocket("127.0.0.1:%s")
329 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
330 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
331 newServer{address="127.0.0.1:%s"}
335 function maintenance()
336 local now = os.time()
337 if ((now - last) > 2) then
339 getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', serial, now - 60, now + 120)
345 _config_params
= ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort', '_resolverCertificateSerial']
347 def testCertRotation(self
):
349 DNSCrypt: automatic certificate rotation
351 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", self
._dnsDistPortDNSCrypt
)
353 client
.refreshResolverCertificates()
354 cert
= client
.getResolverCertificate()
355 self
.assertTrue(cert
)
356 firstSerial
= cert
.serial
357 self
.assertGreaterEqual(cert
.serial
, self
._resolverCertificateSerial
)
361 client
.refreshResolverCertificates()
362 cert
= client
.getResolverCertificate()
363 self
.assertTrue(cert
)
364 secondSerial
= cert
.serial
365 self
.assertGreater(cert
.serial
, firstSerial
)
367 name
= 'automatic-rotation.dnscrypt.tests.powerdns.com.'
368 query
= dns
.message
.make_query(name
, 'A', 'IN')
369 response
= dns
.message
.make_response(query
)
370 rrset
= dns
.rrset
.from_text(name
,
375 response
.answer
.append(rrset
)
377 self
.doDNSCryptQuery(client
, query
, response
, False)
378 self
.doDNSCryptQuery(client
, query
, response
, True)