]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DNSCrypt.py
Merge pull request #13756 from rgacogne/ddist-xsk-doc-typos
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DNSCrypt.py
1 #!/usr/bin/env python
2 import base64
3 import socket
4 import time
5 import dns
6 import dns.message
7 from dnsdisttests import DNSDistTest, pickAvailablePort
8 import dnscrypt
9
10 class DNSCryptTest(DNSDistTest):
11 """
12 dnsdist is configured to accept DNSCrypt queries on 127.0.0.1:_dnsDistPortDNSCrypt.
13 The provider's keys have been generated with:
14 generateDNSCryptProviderKeys("DNSCryptProviderPublic.key", "DNSCryptProviderPrivate.key")
15 Be careful to change the _providerFingerprint below if you want to regenerate the keys.
16 """
17
18 _dnsDistPortDNSCrypt = pickAvailablePort()
19
20 _consoleKey = DNSDistTest.generateConsoleKey()
21 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
22
23 _providerFingerprint = 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA'
24 _providerName = "2.provider.name"
25 _resolverCertificateSerial = 42
26
27 # valid from 60s ago until 2h from now
28 _resolverCertificateValidFrom = int(time.time() - 60)
29 _resolverCertificateValidUntil = int(time.time() + 7200)
30
31 def doDNSCryptQuery(self, client, query, response, tcp):
32 self._toResponderQueue.put(response)
33 data = client.query(query.to_wire(), tcp=tcp)
34 receivedResponse = dns.message.from_wire(data)
35 receivedQuery = None
36 if not self._fromResponderQueue.empty():
37 receivedQuery = self._fromResponderQueue.get(query)
38
39 self.assertTrue(receivedQuery)
40 self.assertTrue(receivedResponse)
41 receivedQuery.id = query.id
42 self.assertEqual(query, receivedQuery)
43 self.assertEqual(response, receivedResponse)
44
45
46 class TestDNSCrypt(DNSCryptTest):
47 _config_template = """
48 setKey("%s")
49 controlSocket("127.0.0.1:%s")
50 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
51 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
52 newServer{address="127.0.0.1:%s"}
53
54 function checkDNSCryptUDP(dq)
55 if dq:getProtocol() ~= "DNSCrypt UDP" then
56 return DNSAction.Spoof, '1.2.3.4'
57 end
58 return DNSAction.None
59 end
60
61 function checkDNSCryptTCP(dq)
62 if dq:getProtocol() ~= "DNSCrypt TCP" then
63 return DNSAction.Spoof, '1.2.3.4'
64 end
65 return DNSAction.None
66 end
67
68 addAction("udp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptUDP))
69 addAction("tcp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptTCP))
70 """
71
72 _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
73
74 def testSimpleA(self):
75 """
76 DNSCrypt: encrypted A query
77 """
78 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
79 name = 'a.dnscrypt.tests.powerdns.com.'
80 query = dns.message.make_query(name, 'A', 'IN')
81 response = dns.message.make_response(query)
82 rrset = dns.rrset.from_text(name,
83 3600,
84 dns.rdataclass.IN,
85 dns.rdatatype.A,
86 '192.2.0.1')
87 response.answer.append(rrset)
88
89 self.doDNSCryptQuery(client, query, response, False)
90 self.doDNSCryptQuery(client, query, response, True)
91
92 def testResponseLargerThanPaddedQuery(self):
93 """
94 DNSCrypt: response larger than query
95
96 Send a small encrypted query (don't forget to take
97 the padding into account) and check that the response
98 is truncated.
99 """
100 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
101 name = 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.'
102 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096)
103 response = dns.message.make_response(query)
104 rrset = dns.rrset.from_text(name,
105 3600,
106 dns.rdataclass.IN,
107 dns.rdatatype.TXT,
108 'A'*255)
109 response.answer.append(rrset)
110
111 self._toResponderQueue.put(response)
112 data = client.query(query.to_wire())
113 receivedQuery = None
114 if not self._fromResponderQueue.empty():
115 receivedQuery = self._fromResponderQueue.get(query)
116
117 receivedResponse = dns.message.from_wire(data)
118
119 self.assertTrue(receivedQuery)
120 receivedQuery.id = query.id
121 self.assertEqual(query, receivedQuery)
122 self.assertEqual(receivedResponse.question, response.question)
123 self.assertTrue(receivedResponse.flags & ~dns.flags.TC)
124 self.assertTrue(len(receivedResponse.answer) == 0)
125 self.assertTrue(len(receivedResponse.authority) == 0)
126 self.assertTrue(len(receivedResponse.additional) == 0)
127
128 def testCertRotation(self):
129 """
130 DNSCrypt: certificate rotation
131 """
132 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
133 client.refreshResolverCertificates()
134
135 cert = client.getResolverCertificate()
136 self.assertTrue(cert)
137 self.assertEqual(cert.serial, self._resolverCertificateSerial)
138
139 name = 'rotation.dnscrypt.tests.powerdns.com.'
140 query = dns.message.make_query(name, 'A', 'IN')
141 response = dns.message.make_response(query)
142 rrset = dns.rrset.from_text(name,
143 3600,
144 dns.rdataclass.IN,
145 dns.rdatatype.A,
146 '192.2.0.1')
147 response.answer.append(rrset)
148
149 self.doDNSCryptQuery(client, query, response, False)
150 self.doDNSCryptQuery(client, query, response, True)
151
152 # generate a new certificate
153 self.sendConsoleCommand("generateDNSCryptCertificate('DNSCryptProviderPrivate.key', 'DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 1, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
154 # add that new certificate
155 self.sendConsoleCommand("getDNSCryptBind(0):loadNewCertificate('DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2')")
156
157 oldSerial = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(0):getSerial()")
158 self.assertEqual(int(oldSerial), self._resolverCertificateSerial)
159 effectiveSerial = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getSerial()")
160 self.assertEqual(int(effectiveSerial), self._resolverCertificateSerial + 1)
161 tsStart = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getTSStart()")
162 self.assertEqual(int(tsStart), self._resolverCertificateValidFrom)
163 tsEnd = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getTSEnd()")
164 self.assertEqual(int(tsEnd), self._resolverCertificateValidUntil)
165
166 # we should still be able to send queries with the previous certificate
167 self.doDNSCryptQuery(client, query, response, False)
168 self.doDNSCryptQuery(client, query, response, True)
169 cert = client.getResolverCertificate()
170 self.assertTrue(cert)
171 self.assertEqual(cert.serial, self._resolverCertificateSerial)
172
173 # but refreshing should get us the new one
174 client.refreshResolverCertificates()
175 cert = client.getResolverCertificate()
176 self.assertTrue(cert)
177 self.assertEqual(cert.serial, self._resolverCertificateSerial + 1)
178 # we should still get the old ones
179 certs = client.getAllResolverCertificates(True)
180 self.assertEqual(len(certs), 2)
181 self.assertEqual(certs[0].serial, self._resolverCertificateSerial)
182 self.assertEqual(certs[1].serial, self._resolverCertificateSerial + 1)
183
184 # generate a third certificate, this time in memory
185 self.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 2, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
186
187 # we should still be able to send queries with the previous certificate
188 self.doDNSCryptQuery(client, query, response, False)
189 self.doDNSCryptQuery(client, query, response, True)
190 cert = client.getResolverCertificate()
191 self.assertTrue(cert)
192 self.assertEqual(cert.serial, self._resolverCertificateSerial + 1)
193
194 # but refreshing should get us the new one
195 client.refreshResolverCertificates()
196 cert = client.getResolverCertificate()
197 self.assertTrue(cert)
198 self.assertEqual(cert.serial, self._resolverCertificateSerial + 2)
199 # we should still get the old ones
200 certs = client.getAllResolverCertificates(True)
201 self.assertEqual(len(certs), 3)
202 self.assertEqual(certs[0].serial, self._resolverCertificateSerial)
203 self.assertEqual(certs[1].serial, self._resolverCertificateSerial + 1)
204 self.assertEqual(certs[2].serial, self._resolverCertificateSerial + 2)
205
206 # generate a fourth certificate, still in memory
207 self.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 3, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
208
209 # mark the old ones as inactive
210 self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial))
211 self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial + 1))
212 self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial + 2))
213 # we should still be able to send queries with the third one
214 self.doDNSCryptQuery(client, query, response, False)
215 self.doDNSCryptQuery(client, query, response, True)
216 cert = client.getResolverCertificate()
217 self.assertTrue(cert)
218 self.assertEqual(cert.serial, self._resolverCertificateSerial + 2)
219 # now remove them
220 self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial))
221 self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial + 1))
222 self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial + 2))
223
224 # we should not be able to send with the old ones anymore
225 try:
226 data = client.query(query.to_wire())
227 except socket.timeout:
228 data = None
229 self.assertEqual(data, None)
230
231 # refreshing should get us the fourth one
232 client.refreshResolverCertificates()
233 cert = client.getResolverCertificate()
234 self.assertTrue(cert)
235 self.assertEqual(cert.serial, self._resolverCertificateSerial + 3)
236 # and only that one
237 certs = client.getAllResolverCertificates(True)
238 self.assertEqual(len(certs), 1)
239 # and we should be able to query with it
240 self.doDNSCryptQuery(client, query, response, False)
241 self.doDNSCryptQuery(client, query, response, True)
242 cert = client.getResolverCertificate()
243 self.assertTrue(cert)
244 self.assertEqual(cert.serial, self._resolverCertificateSerial + 3)
245
246 def testProtocolUDP(self):
247 """
248 DNSCrypt: Test DNSQuestion.Protocol over UDP
249 """
250 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
251 name = 'udp.protocols.dnscrypt.tests.powerdns.com.'
252 query = dns.message.make_query(name, 'A', 'IN')
253 response = dns.message.make_response(query)
254
255 self.doDNSCryptQuery(client, query, response, False)
256
257 def testProtocolTCP(self):
258 """
259 DNSCrypt: Test DNSQuestion.Protocol over TCP
260 """
261 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
262 name = 'tcp.protocols.dnscrypt.tests.powerdns.com.'
263 query = dns.message.make_query(name, 'A', 'IN')
264 response = dns.message.make_response(query)
265
266 self.doDNSCryptQuery(client, query, response, True)
267
268 class TestDNSCryptWithCache(DNSCryptTest):
269
270 _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
271 _config_template = """
272 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
273 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
274 pc = newPacketCache(5, {maxTTL=86400, minTTL=1, numberOfShards=1})
275 getPool(""):setCache(pc)
276 newServer{address="127.0.0.1:%s"}
277 """
278
279 def testCachedSimpleA(self):
280 """
281 DNSCrypt: encrypted A query served from cache
282 """
283 misses = 0
284 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
285 name = 'cacheda.dnscrypt.tests.powerdns.com.'
286 query = dns.message.make_query(name, 'A', 'IN')
287 response = dns.message.make_response(query)
288 rrset = dns.rrset.from_text(name,
289 3600,
290 dns.rdataclass.IN,
291 dns.rdatatype.A,
292 '192.2.0.1')
293 response.answer.append(rrset)
294
295 # first query to fill the cache
296 self._toResponderQueue.put(response)
297 data = client.query(query.to_wire())
298 receivedResponse = dns.message.from_wire(data)
299 receivedQuery = None
300 if not self._fromResponderQueue.empty():
301 receivedQuery = self._fromResponderQueue.get(query)
302
303 self.assertTrue(receivedQuery)
304 self.assertTrue(receivedResponse)
305 receivedQuery.id = query.id
306 self.assertEqual(query, receivedQuery)
307 self.assertEqual(response, receivedResponse)
308 misses += 1
309
310 # second query should get a cached response
311 data = client.query(query.to_wire())
312 receivedResponse = dns.message.from_wire(data)
313 receivedQuery = None
314 if not self._fromResponderQueue.empty():
315 receivedQuery = self._fromResponderQueue.get(query)
316
317 self.assertEqual(receivedQuery, None)
318 self.assertTrue(receivedResponse)
319 self.assertEqual(response, receivedResponse)
320 total = 0
321 for key in self._responsesCounter:
322 total += self._responsesCounter[key]
323 self.assertEqual(total, misses)
324
325 class TestDNSCryptAutomaticRotation(DNSCryptTest):
326 _config_template = """
327 setKey("%s")
328 controlSocket("127.0.0.1:%s")
329 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
330 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
331 newServer{address="127.0.0.1:%s"}
332
333 local last = 0
334 serial = %d
335 function reloadCallback()
336 local now = os.time()
337 if ((now - last) > 2) then
338 serial = serial + 1
339 getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', serial, now - 60, now + 120)
340 last = now
341 end
342 end
343 addMaintenanceCallback(reloadCallback)
344 """
345
346 _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort', '_resolverCertificateSerial']
347
348 def testCertRotation(self):
349 """
350 DNSCrypt: automatic certificate rotation
351 """
352 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
353
354 client.refreshResolverCertificates()
355 cert = client.getResolverCertificate()
356 self.assertTrue(cert)
357 firstSerial = cert.serial
358 self.assertGreaterEqual(cert.serial, self._resolverCertificateSerial)
359
360 time.sleep(3)
361
362 client.refreshResolverCertificates()
363 cert = client.getResolverCertificate()
364 self.assertTrue(cert)
365 secondSerial = cert.serial
366 self.assertGreater(cert.serial, firstSerial)
367
368 name = 'automatic-rotation.dnscrypt.tests.powerdns.com.'
369 query = dns.message.make_query(name, 'A', 'IN')
370 response = dns.message.make_response(query)
371 rrset = dns.rrset.from_text(name,
372 3600,
373 dns.rdataclass.IN,
374 dns.rdatatype.A,
375 '192.2.0.1')
376 response.answer.append(rrset)
377
378 self.doDNSCryptQuery(client, query, response, False)
379 self.doDNSCryptQuery(client, query, response, True)