]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DNSCrypt.py
Merge pull request #8945 from rgacogne/ddist-x-forwarded-for
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DNSCrypt.py
1 #!/usr/bin/env python
2 import base64
3 import socket
4 import time
5 import dns
6 import dns.message
7 from dnsdisttests import DNSDistTest
8 import dnscrypt
9
10 class DNSCryptTest(DNSDistTest):
11 """
12 dnsdist is configured to accept DNSCrypt queries on 127.0.0.1:_dnsDistPortDNSCrypt.
13 The provider's keys have been generated with:
14 generateDNSCryptProviderKeys("DNSCryptProviderPublic.key", "DNSCryptProviderPrivate.key")
15 Be careful to change the _providerFingerprint below if you want to regenerate the keys.
16 """
17
18 _dnsDistPort = 5340
19 _dnsDistPortDNSCrypt = 8443
20
21 _consoleKey = DNSDistTest.generateConsoleKey()
22 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
23
24 _providerFingerprint = 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA'
25 _providerName = "2.provider.name"
26 _resolverCertificateSerial = 42
27
28 # valid from 60s ago until 2h from now
29 _resolverCertificateValidFrom = int(time.time() - 60)
30 _resolverCertificateValidUntil = int(time.time() + 7200)
31
32 _dnsdistStartupDelay = 10
33
34 def doDNSCryptQuery(self, client, query, response, tcp):
35 self._toResponderQueue.put(response)
36 data = client.query(query.to_wire(), tcp=tcp)
37 receivedResponse = dns.message.from_wire(data)
38 receivedQuery = None
39 if not self._fromResponderQueue.empty():
40 receivedQuery = self._fromResponderQueue.get(query)
41
42 self.assertTrue(receivedQuery)
43 self.assertTrue(receivedResponse)
44 receivedQuery.id = query.id
45 self.assertEquals(query, receivedQuery)
46 self.assertEquals(response, receivedResponse)
47
48
49 class TestDNSCrypt(DNSCryptTest):
50 _config_template = """
51 setKey("%s")
52 controlSocket("127.0.0.1:%s")
53 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
54 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
55 newServer{address="127.0.0.1:%s"}
56 """
57
58 _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
59
60 def testSimpleA(self):
61 """
62 DNSCrypt: encrypted A query
63 """
64 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
65 name = 'a.dnscrypt.tests.powerdns.com.'
66 query = dns.message.make_query(name, 'A', 'IN')
67 response = dns.message.make_response(query)
68 rrset = dns.rrset.from_text(name,
69 3600,
70 dns.rdataclass.IN,
71 dns.rdatatype.A,
72 '192.2.0.1')
73 response.answer.append(rrset)
74
75 self.doDNSCryptQuery(client, query, response, False)
76 self.doDNSCryptQuery(client, query, response, True)
77
78 def testResponseLargerThanPaddedQuery(self):
79 """
80 DNSCrypt: response larger than query
81
82 Send a small encrypted query (don't forget to take
83 the padding into account) and check that the response
84 is truncated.
85 """
86 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
87 name = 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.'
88 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096)
89 response = dns.message.make_response(query)
90 rrset = dns.rrset.from_text(name,
91 3600,
92 dns.rdataclass.IN,
93 dns.rdatatype.TXT,
94 'A'*255)
95 response.answer.append(rrset)
96
97 self._toResponderQueue.put(response)
98 data = client.query(query.to_wire())
99 receivedQuery = None
100 if not self._fromResponderQueue.empty():
101 receivedQuery = self._fromResponderQueue.get(query)
102
103 receivedResponse = dns.message.from_wire(data)
104
105 self.assertTrue(receivedQuery)
106 receivedQuery.id = query.id
107 self.assertEquals(query, receivedQuery)
108 self.assertEquals(receivedResponse.question, response.question)
109 self.assertTrue(receivedResponse.flags & ~dns.flags.TC)
110 self.assertTrue(len(receivedResponse.answer) == 0)
111 self.assertTrue(len(receivedResponse.authority) == 0)
112 self.assertTrue(len(receivedResponse.additional) == 0)
113
114 def testCertRotation(self):
115 """
116 DNSCrypt: certificate rotation
117 """
118 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
119 client.refreshResolverCertificates()
120
121 cert = client.getResolverCertificate()
122 self.assertTrue(cert)
123 self.assertEquals(cert.serial, self._resolverCertificateSerial)
124
125 name = 'rotation.dnscrypt.tests.powerdns.com.'
126 query = dns.message.make_query(name, 'A', 'IN')
127 response = dns.message.make_response(query)
128 rrset = dns.rrset.from_text(name,
129 3600,
130 dns.rdataclass.IN,
131 dns.rdatatype.A,
132 '192.2.0.1')
133 response.answer.append(rrset)
134
135 self.doDNSCryptQuery(client, query, response, False)
136 self.doDNSCryptQuery(client, query, response, True)
137
138 # generate a new certificate
139 self.sendConsoleCommand("generateDNSCryptCertificate('DNSCryptProviderPrivate.key', 'DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 1, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
140 # add that new certificate
141 self.sendConsoleCommand("getDNSCryptBind(0):loadNewCertificate('DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2')")
142
143 oldSerial = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(0):getSerial()")
144 self.assertEquals(int(oldSerial), self._resolverCertificateSerial)
145 effectiveSerial = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getSerial()")
146 self.assertEquals(int(effectiveSerial), self._resolverCertificateSerial + 1)
147 tsStart = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getTSStart()")
148 self.assertEquals(int(tsStart), self._resolverCertificateValidFrom)
149 tsEnd = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getTSEnd()")
150 self.assertEquals(int(tsEnd), self._resolverCertificateValidUntil)
151
152 # we should still be able to send queries with the previous certificate
153 self.doDNSCryptQuery(client, query, response, False)
154 self.doDNSCryptQuery(client, query, response, True)
155 cert = client.getResolverCertificate()
156 self.assertTrue(cert)
157 self.assertEquals(cert.serial, self._resolverCertificateSerial)
158
159 # but refreshing should get us the new one
160 client.refreshResolverCertificates()
161 cert = client.getResolverCertificate()
162 self.assertTrue(cert)
163 self.assertEquals(cert.serial, self._resolverCertificateSerial + 1)
164 # we should still get the old ones
165 certs = client.getAllResolverCertificates(True)
166 self.assertEquals(len(certs), 2)
167 self.assertEquals(certs[0].serial, self._resolverCertificateSerial)
168 self.assertEquals(certs[1].serial, self._resolverCertificateSerial + 1)
169
170 # generate a third certificate, this time in memory
171 self.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 2, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
172
173 # we should still be able to send queries with the previous certificate
174 self.doDNSCryptQuery(client, query, response, False)
175 self.doDNSCryptQuery(client, query, response, True)
176 cert = client.getResolverCertificate()
177 self.assertTrue(cert)
178 self.assertEquals(cert.serial, self._resolverCertificateSerial + 1)
179
180 # but refreshing should get us the new one
181 client.refreshResolverCertificates()
182 cert = client.getResolverCertificate()
183 self.assertTrue(cert)
184 self.assertEquals(cert.serial, self._resolverCertificateSerial + 2)
185 # we should still get the old ones
186 certs = client.getAllResolverCertificates(True)
187 self.assertEquals(len(certs), 3)
188 self.assertEquals(certs[0].serial, self._resolverCertificateSerial)
189 self.assertEquals(certs[1].serial, self._resolverCertificateSerial + 1)
190 self.assertEquals(certs[2].serial, self._resolverCertificateSerial + 2)
191
192 # generate a fourth certificate, still in memory
193 self.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 3, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
194
195 # mark the old ones as inactive
196 self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial))
197 self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial + 1))
198 self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial + 2))
199 # we should still be able to send queries with the third one
200 self.doDNSCryptQuery(client, query, response, False)
201 self.doDNSCryptQuery(client, query, response, True)
202 cert = client.getResolverCertificate()
203 self.assertTrue(cert)
204 self.assertEquals(cert.serial, self._resolverCertificateSerial + 2)
205 # now remove them
206 self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial))
207 self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial + 1))
208 self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial + 2))
209
210 # we should not be able to send with the old ones anymore
211 try:
212 data = client.query(query.to_wire())
213 except socket.timeout:
214 data = None
215 self.assertEquals(data, None)
216
217 # refreshing should get us the fourth one
218 client.refreshResolverCertificates()
219 cert = client.getResolverCertificate()
220 self.assertTrue(cert)
221 self.assertEquals(cert.serial, self._resolverCertificateSerial + 3)
222 # and only that one
223 certs = client.getAllResolverCertificates(True)
224 self.assertEquals(len(certs), 1)
225 # and we should be able to query with it
226 self.doDNSCryptQuery(client, query, response, False)
227 self.doDNSCryptQuery(client, query, response, True)
228 cert = client.getResolverCertificate()
229 self.assertTrue(cert)
230 self.assertEquals(cert.serial, self._resolverCertificateSerial + 3)
231
232 class TestDNSCryptWithCache(DNSCryptTest):
233
234 _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
235 _config_template = """
236 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
237 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
238 pc = newPacketCache(5, {maxTTL=86400, minTTL=1})
239 getPool(""):setCache(pc)
240 newServer{address="127.0.0.1:%s"}
241 """
242
243 def testCachedSimpleA(self):
244 """
245 DNSCrypt: encrypted A query served from cache
246 """
247 misses = 0
248 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
249 name = 'cacheda.dnscrypt.tests.powerdns.com.'
250 query = dns.message.make_query(name, 'A', 'IN')
251 response = dns.message.make_response(query)
252 rrset = dns.rrset.from_text(name,
253 3600,
254 dns.rdataclass.IN,
255 dns.rdatatype.A,
256 '192.2.0.1')
257 response.answer.append(rrset)
258
259 # first query to fill the cache
260 self._toResponderQueue.put(response)
261 data = client.query(query.to_wire())
262 receivedResponse = dns.message.from_wire(data)
263 receivedQuery = None
264 if not self._fromResponderQueue.empty():
265 receivedQuery = self._fromResponderQueue.get(query)
266
267 self.assertTrue(receivedQuery)
268 self.assertTrue(receivedResponse)
269 receivedQuery.id = query.id
270 self.assertEquals(query, receivedQuery)
271 self.assertEquals(response, receivedResponse)
272 misses += 1
273
274 # second query should get a cached response
275 data = client.query(query.to_wire())
276 receivedResponse = dns.message.from_wire(data)
277 receivedQuery = None
278 if not self._fromResponderQueue.empty():
279 receivedQuery = self._fromResponderQueue.get(query)
280
281 self.assertEquals(receivedQuery, None)
282 self.assertTrue(receivedResponse)
283 self.assertEquals(response, receivedResponse)
284 total = 0
285 for key in self._responsesCounter:
286 total += self._responsesCounter[key]
287 self.assertEquals(total, misses)
288
289 class TestDNSCryptAutomaticRotation(DNSCryptTest):
290 _config_template = """
291 setKey("%s")
292 controlSocket("127.0.0.1:%s")
293 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
294 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
295 newServer{address="127.0.0.1:%s"}
296
297 local last = 0
298 serial = %d
299 function maintenance()
300 local now = os.time()
301 if ((now - last) > 2) then
302 serial = serial + 1
303 getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', serial, now - 60, now + 120)
304 last = now
305 end
306 end
307 """
308
309 _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort', '_resolverCertificateSerial']
310
311 def testCertRotation(self):
312 """
313 DNSCrypt: automatic certificate rotation
314 """
315 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
316
317 client.refreshResolverCertificates()
318 cert = client.getResolverCertificate()
319 self.assertTrue(cert)
320 firstSerial = cert.serial
321 self.assertGreaterEqual(cert.serial, self._resolverCertificateSerial)
322
323 time.sleep(3)
324
325 client.refreshResolverCertificates()
326 cert = client.getResolverCertificate()
327 self.assertTrue(cert)
328 secondSerial = cert.serial
329 self.assertGreater(cert.serial, firstSerial)
330
331 name = 'automatic-rotation.dnscrypt.tests.powerdns.com.'
332 query = dns.message.make_query(name, 'A', 'IN')
333 response = dns.message.make_response(query)
334 rrset = dns.rrset.from_text(name,
335 3600,
336 dns.rdataclass.IN,
337 dns.rdatatype.A,
338 '192.2.0.1')
339 response.answer.append(rrset)
340
341 self.doDNSCryptQuery(client, query, response, False)
342 self.doDNSCryptQuery(client, query, response, True)