]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DNSCrypt.py
Merge pull request #10565 from Habbie/rec-non-apex-dnskey
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DNSCrypt.py
1 #!/usr/bin/env python
2 import base64
3 import socket
4 import time
5 import dns
6 import dns.message
7 from dnsdisttests import DNSDistTest
8 import dnscrypt
9
10 class DNSCryptTest(DNSDistTest):
11 """
12 dnsdist is configured to accept DNSCrypt queries on 127.0.0.1:_dnsDistPortDNSCrypt.
13 The provider's keys have been generated with:
14 generateDNSCryptProviderKeys("DNSCryptProviderPublic.key", "DNSCryptProviderPrivate.key")
15 Be careful to change the _providerFingerprint below if you want to regenerate the keys.
16 """
17
18 _dnsDistPort = 5340
19 _dnsDistPortDNSCrypt = 8443
20
21 _consoleKey = DNSDistTest.generateConsoleKey()
22 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
23
24 _providerFingerprint = 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA'
25 _providerName = "2.provider.name"
26 _resolverCertificateSerial = 42
27
28 # valid from 60s ago until 2h from now
29 _resolverCertificateValidFrom = int(time.time() - 60)
30 _resolverCertificateValidUntil = int(time.time() + 7200)
31
32 _dnsdistStartupDelay = 10
33
34 def doDNSCryptQuery(self, client, query, response, tcp):
35 self._toResponderQueue.put(response)
36 data = client.query(query.to_wire(), tcp=tcp)
37 receivedResponse = dns.message.from_wire(data)
38 receivedQuery = None
39 if not self._fromResponderQueue.empty():
40 receivedQuery = self._fromResponderQueue.get(query)
41
42 self.assertTrue(receivedQuery)
43 self.assertTrue(receivedResponse)
44 receivedQuery.id = query.id
45 self.assertEqual(query, receivedQuery)
46 self.assertEqual(response, receivedResponse)
47
48
49 class TestDNSCrypt(DNSCryptTest):
50 _config_template = """
51 setKey("%s")
52 controlSocket("127.0.0.1:%s")
53 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
54 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
55 newServer{address="127.0.0.1:%s"}
56
57 function checkDNSCryptUDP(dq)
58 if dq:getProtocol() ~= "DNSCrypt UDP" then
59 return DNSAction.Spoof, '1.2.3.4'
60 end
61 return DNSAction.None
62 end
63
64 function checkDNSCryptTCP(dq)
65 if dq:getProtocol() ~= "DNSCrypt TCP" then
66 return DNSAction.Spoof, '1.2.3.4'
67 end
68 return DNSAction.None
69 end
70
71 addAction("udp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptUDP))
72 addAction("tcp.protocols.dnscrypt.tests.powerdns.com.", LuaAction(checkDNSCryptTCP))
73 """
74
75 _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
76
77 def testSimpleA(self):
78 """
79 DNSCrypt: encrypted A query
80 """
81 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
82 name = 'a.dnscrypt.tests.powerdns.com.'
83 query = dns.message.make_query(name, 'A', 'IN')
84 response = dns.message.make_response(query)
85 rrset = dns.rrset.from_text(name,
86 3600,
87 dns.rdataclass.IN,
88 dns.rdatatype.A,
89 '192.2.0.1')
90 response.answer.append(rrset)
91
92 self.doDNSCryptQuery(client, query, response, False)
93 self.doDNSCryptQuery(client, query, response, True)
94
95 def testResponseLargerThanPaddedQuery(self):
96 """
97 DNSCrypt: response larger than query
98
99 Send a small encrypted query (don't forget to take
100 the padding into account) and check that the response
101 is truncated.
102 """
103 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
104 name = 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.'
105 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096)
106 response = dns.message.make_response(query)
107 rrset = dns.rrset.from_text(name,
108 3600,
109 dns.rdataclass.IN,
110 dns.rdatatype.TXT,
111 'A'*255)
112 response.answer.append(rrset)
113
114 self._toResponderQueue.put(response)
115 data = client.query(query.to_wire())
116 receivedQuery = None
117 if not self._fromResponderQueue.empty():
118 receivedQuery = self._fromResponderQueue.get(query)
119
120 receivedResponse = dns.message.from_wire(data)
121
122 self.assertTrue(receivedQuery)
123 receivedQuery.id = query.id
124 self.assertEqual(query, receivedQuery)
125 self.assertEqual(receivedResponse.question, response.question)
126 self.assertTrue(receivedResponse.flags & ~dns.flags.TC)
127 self.assertTrue(len(receivedResponse.answer) == 0)
128 self.assertTrue(len(receivedResponse.authority) == 0)
129 self.assertTrue(len(receivedResponse.additional) == 0)
130
131 def testCertRotation(self):
132 """
133 DNSCrypt: certificate rotation
134 """
135 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
136 client.refreshResolverCertificates()
137
138 cert = client.getResolverCertificate()
139 self.assertTrue(cert)
140 self.assertEqual(cert.serial, self._resolverCertificateSerial)
141
142 name = 'rotation.dnscrypt.tests.powerdns.com.'
143 query = dns.message.make_query(name, 'A', 'IN')
144 response = dns.message.make_response(query)
145 rrset = dns.rrset.from_text(name,
146 3600,
147 dns.rdataclass.IN,
148 dns.rdatatype.A,
149 '192.2.0.1')
150 response.answer.append(rrset)
151
152 self.doDNSCryptQuery(client, query, response, False)
153 self.doDNSCryptQuery(client, query, response, True)
154
155 # generate a new certificate
156 self.sendConsoleCommand("generateDNSCryptCertificate('DNSCryptProviderPrivate.key', 'DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 1, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
157 # add that new certificate
158 self.sendConsoleCommand("getDNSCryptBind(0):loadNewCertificate('DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2')")
159
160 oldSerial = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(0):getSerial()")
161 self.assertEqual(int(oldSerial), self._resolverCertificateSerial)
162 effectiveSerial = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getSerial()")
163 self.assertEqual(int(effectiveSerial), self._resolverCertificateSerial + 1)
164 tsStart = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getTSStart()")
165 self.assertEqual(int(tsStart), self._resolverCertificateValidFrom)
166 tsEnd = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getTSEnd()")
167 self.assertEqual(int(tsEnd), self._resolverCertificateValidUntil)
168
169 # we should still be able to send queries with the previous certificate
170 self.doDNSCryptQuery(client, query, response, False)
171 self.doDNSCryptQuery(client, query, response, True)
172 cert = client.getResolverCertificate()
173 self.assertTrue(cert)
174 self.assertEqual(cert.serial, self._resolverCertificateSerial)
175
176 # but refreshing should get us the new one
177 client.refreshResolverCertificates()
178 cert = client.getResolverCertificate()
179 self.assertTrue(cert)
180 self.assertEqual(cert.serial, self._resolverCertificateSerial + 1)
181 # we should still get the old ones
182 certs = client.getAllResolverCertificates(True)
183 self.assertEqual(len(certs), 2)
184 self.assertEqual(certs[0].serial, self._resolverCertificateSerial)
185 self.assertEqual(certs[1].serial, self._resolverCertificateSerial + 1)
186
187 # generate a third certificate, this time in memory
188 self.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 2, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
189
190 # we should still be able to send queries with the previous certificate
191 self.doDNSCryptQuery(client, query, response, False)
192 self.doDNSCryptQuery(client, query, response, True)
193 cert = client.getResolverCertificate()
194 self.assertTrue(cert)
195 self.assertEqual(cert.serial, self._resolverCertificateSerial + 1)
196
197 # but refreshing should get us the new one
198 client.refreshResolverCertificates()
199 cert = client.getResolverCertificate()
200 self.assertTrue(cert)
201 self.assertEqual(cert.serial, self._resolverCertificateSerial + 2)
202 # we should still get the old ones
203 certs = client.getAllResolverCertificates(True)
204 self.assertEqual(len(certs), 3)
205 self.assertEqual(certs[0].serial, self._resolverCertificateSerial)
206 self.assertEqual(certs[1].serial, self._resolverCertificateSerial + 1)
207 self.assertEqual(certs[2].serial, self._resolverCertificateSerial + 2)
208
209 # generate a fourth certificate, still in memory
210 self.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 3, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
211
212 # mark the old ones as inactive
213 self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial))
214 self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial + 1))
215 self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial + 2))
216 # we should still be able to send queries with the third one
217 self.doDNSCryptQuery(client, query, response, False)
218 self.doDNSCryptQuery(client, query, response, True)
219 cert = client.getResolverCertificate()
220 self.assertTrue(cert)
221 self.assertEqual(cert.serial, self._resolverCertificateSerial + 2)
222 # now remove them
223 self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial))
224 self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial + 1))
225 self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial + 2))
226
227 # we should not be able to send with the old ones anymore
228 try:
229 data = client.query(query.to_wire())
230 except socket.timeout:
231 data = None
232 self.assertEqual(data, None)
233
234 # refreshing should get us the fourth one
235 client.refreshResolverCertificates()
236 cert = client.getResolverCertificate()
237 self.assertTrue(cert)
238 self.assertEqual(cert.serial, self._resolverCertificateSerial + 3)
239 # and only that one
240 certs = client.getAllResolverCertificates(True)
241 self.assertEqual(len(certs), 1)
242 # and we should be able to query with it
243 self.doDNSCryptQuery(client, query, response, False)
244 self.doDNSCryptQuery(client, query, response, True)
245 cert = client.getResolverCertificate()
246 self.assertTrue(cert)
247 self.assertEqual(cert.serial, self._resolverCertificateSerial + 3)
248
249 def testProtocolUDP(self):
250 """
251 DNSCrypt: Test DNSQuestion.Protocol over UDP
252 """
253 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
254 name = 'udp.protocols.dnscrypt.tests.powerdns.com.'
255 query = dns.message.make_query(name, 'A', 'IN')
256 response = dns.message.make_response(query)
257
258 self.doDNSCryptQuery(client, query, response, False)
259
260 def testProtocolTCP(self):
261 """
262 DNSCrypt: Test DNSQuestion.Protocol over TCP
263 """
264 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
265 name = 'tcp.protocols.dnscrypt.tests.powerdns.com.'
266 query = dns.message.make_query(name, 'A', 'IN')
267 response = dns.message.make_response(query)
268
269 self.doDNSCryptQuery(client, query, response, True)
270
271 class TestDNSCryptWithCache(DNSCryptTest):
272
273 _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
274 _config_template = """
275 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
276 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
277 pc = newPacketCache(5, {maxTTL=86400, minTTL=1, numberOfShards=1})
278 getPool(""):setCache(pc)
279 newServer{address="127.0.0.1:%s"}
280 """
281
282 def testCachedSimpleA(self):
283 """
284 DNSCrypt: encrypted A query served from cache
285 """
286 misses = 0
287 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
288 name = 'cacheda.dnscrypt.tests.powerdns.com.'
289 query = dns.message.make_query(name, 'A', 'IN')
290 response = dns.message.make_response(query)
291 rrset = dns.rrset.from_text(name,
292 3600,
293 dns.rdataclass.IN,
294 dns.rdatatype.A,
295 '192.2.0.1')
296 response.answer.append(rrset)
297
298 # first query to fill the cache
299 self._toResponderQueue.put(response)
300 data = client.query(query.to_wire())
301 receivedResponse = dns.message.from_wire(data)
302 receivedQuery = None
303 if not self._fromResponderQueue.empty():
304 receivedQuery = self._fromResponderQueue.get(query)
305
306 self.assertTrue(receivedQuery)
307 self.assertTrue(receivedResponse)
308 receivedQuery.id = query.id
309 self.assertEqual(query, receivedQuery)
310 self.assertEqual(response, receivedResponse)
311 misses += 1
312
313 # second query should get a cached response
314 data = client.query(query.to_wire())
315 receivedResponse = dns.message.from_wire(data)
316 receivedQuery = None
317 if not self._fromResponderQueue.empty():
318 receivedQuery = self._fromResponderQueue.get(query)
319
320 self.assertEqual(receivedQuery, None)
321 self.assertTrue(receivedResponse)
322 self.assertEqual(response, receivedResponse)
323 total = 0
324 for key in self._responsesCounter:
325 total += self._responsesCounter[key]
326 self.assertEqual(total, misses)
327
328 class TestDNSCryptAutomaticRotation(DNSCryptTest):
329 _config_template = """
330 setKey("%s")
331 controlSocket("127.0.0.1:%s")
332 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
333 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
334 newServer{address="127.0.0.1:%s"}
335
336 local last = 0
337 serial = %d
338 function maintenance()
339 local now = os.time()
340 if ((now - last) > 2) then
341 serial = serial + 1
342 getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', serial, now - 60, now + 120)
343 last = now
344 end
345 end
346 """
347
348 _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort', '_resolverCertificateSerial']
349
350 def testCertRotation(self):
351 """
352 DNSCrypt: automatic certificate rotation
353 """
354 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
355
356 client.refreshResolverCertificates()
357 cert = client.getResolverCertificate()
358 self.assertTrue(cert)
359 firstSerial = cert.serial
360 self.assertGreaterEqual(cert.serial, self._resolverCertificateSerial)
361
362 time.sleep(3)
363
364 client.refreshResolverCertificates()
365 cert = client.getResolverCertificate()
366 self.assertTrue(cert)
367 secondSerial = cert.serial
368 self.assertGreater(cert.serial, firstSerial)
369
370 name = 'automatic-rotation.dnscrypt.tests.powerdns.com.'
371 query = dns.message.make_query(name, 'A', 'IN')
372 response = dns.message.make_response(query)
373 rrset = dns.rrset.from_text(name,
374 3600,
375 dns.rdataclass.IN,
376 dns.rdatatype.A,
377 '192.2.0.1')
378 response.answer.append(rrset)
379
380 self.doDNSCryptQuery(client, query, response, False)
381 self.doDNSCryptQuery(client, query, response, True)