]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_DNSCrypt.py
Merge pull request #5353 from rgacogne/dnsdist-statnode-labels
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DNSCrypt.py
CommitLineData
b8db58a2 1#!/usr/bin/env python
79500db5 2import base64
b8db58a2 3import time
b1bec9f0
RG
4import dns
5import dns.message
b8db58a2
RG
6from dnsdisttests import DNSDistTest
7import dnscrypt
8
79500db5 9class DNSCryptTest(DNSDistTest):
b8db58a2
RG
10 """
11 dnsdist is configured to accept DNSCrypt queries on 127.0.0.1:_dnsDistPortDNSCrypt.
12 The provider's keys have been generated with:
13 generateDNSCryptProviderKeys("DNSCryptProviderPublic.key", "DNSCryptProviderPrivate.key")
14 Be careful to change the _providerFingerprint below if you want to regenerate the keys.
15 """
16
17 _dnsDistPort = 5340
18 _dnsDistPortDNSCrypt = 8443
79500db5
RG
19
20 _consoleKey = DNSDistTest.generateConsoleKey()
21 _consoleKeyB64 = base64.b64encode(_consoleKey)
b8db58a2 22
b8db58a2
RG
23 _providerFingerprint = 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA'
24 _providerName = "2.provider.name"
bd64cc44 25 _resolverCertificateSerial = 42
79500db5 26
bd64cc44
RG
27 # valid from 60s ago until 2h from now
28 _resolverCertificateValidFrom = time.time() - 60
29 _resolverCertificateValidUntil = time.time() + 7200
79500db5 30
617dfe22 31 _dnsdistStartupDelay = 10
b8db58a2 32
79500db5
RG
33 def doDNSCryptQuery(self, client, query, response, tcp):
34 self._toResponderQueue.put(response)
35 data = client.query(query.to_wire(), tcp=tcp)
36 receivedResponse = dns.message.from_wire(data)
37 receivedQuery = None
38 if not self._fromResponderQueue.empty():
39 receivedQuery = self._fromResponderQueue.get(query)
40
41 self.assertTrue(receivedQuery)
42 self.assertTrue(receivedResponse)
43 receivedQuery.id = query.id
44 self.assertEquals(query, receivedQuery)
45 self.assertEquals(response, receivedResponse)
46
47
48class TestDNSCrypt(DNSCryptTest):
49 _config_template = """
50 setKey("%s")
51 controlSocket("127.0.0.1:%s")
52 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
53 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
54 newServer{address="127.0.0.1:%s"}
55 """
56
57 _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
58
b8db58a2
RG
59 def testSimpleA(self):
60 """
617dfe22 61 DNSCrypt: encrypted A query
b8db58a2
RG
62 """
63 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
64 name = 'a.dnscrypt.tests.powerdns.com.'
65 query = dns.message.make_query(name, 'A', 'IN')
66 response = dns.message.make_response(query)
67 rrset = dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
fcffc585 71 '192.2.0.1')
b8db58a2
RG
72 response.answer.append(rrset)
73
79500db5
RG
74 self.doDNSCryptQuery(client, query, response, False)
75 self.doDNSCryptQuery(client, query, response, True)
fcffc585 76
bd64cc44
RG
77 def testResponseLargerThanPaddedQuery(self):
78 """
617dfe22
RG
79 DNSCrypt: response larger than query
80
bd64cc44
RG
81 Send a small encrypted query (don't forget to take
82 the padding into account) and check that the response
83 is truncated.
84 """
85 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
86 name = 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.'
87 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096)
88 response = dns.message.make_response(query)
89 rrset = dns.rrset.from_text(name,
90 3600,
91 dns.rdataclass.IN,
92 dns.rdatatype.TXT,
93 'A'*255)
94 response.answer.append(rrset)
95
96 self._toResponderQueue.put(response)
97 data = client.query(query.to_wire())
98 receivedQuery = None
99 if not self._fromResponderQueue.empty():
100 receivedQuery = self._fromResponderQueue.get(query)
101
102 receivedResponse = dns.message.from_wire(data)
103
104 self.assertTrue(receivedQuery)
105 receivedQuery.id = query.id
106 self.assertEquals(query, receivedQuery)
107 self.assertEquals(receivedResponse.question, response.question)
108 self.assertTrue(receivedResponse.flags & ~dns.flags.TC)
109 self.assertTrue(len(receivedResponse.answer) == 0)
110 self.assertTrue(len(receivedResponse.authority) == 0)
111 self.assertTrue(len(receivedResponse.additional) == 0)
112
79500db5
RG
113 def testCertRotation(self):
114 """
115 DNSCrypt: certificate rotation
116 """
117 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
118 client.refreshResolverCertificates()
119
120 cert = client.getResolverCertificate()
121 self.assertTrue(cert)
122 self.assertEquals(cert.serial, self._resolverCertificateSerial)
123
124 name = 'rotation.dnscrypt.tests.powerdns.com.'
125 query = dns.message.make_query(name, 'A', 'IN')
126 response = dns.message.make_response(query)
127 rrset = dns.rrset.from_text(name,
128 3600,
129 dns.rdataclass.IN,
130 dns.rdatatype.A,
131 '192.2.0.1')
132 response.answer.append(rrset)
133
134 self.doDNSCryptQuery(client, query, response, False)
135 self.doDNSCryptQuery(client, query, response, True)
136
137 # generate a new certificate
138 self.sendConsoleCommand("generateDNSCryptCertificate('DNSCryptProviderPrivate.key', 'DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 1, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
139 # switch to that new certificate
140 self.sendConsoleCommand("getDNSCryptBind(0):loadNewCertificate('DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2')")
141
142 # we should still be able to send queries with the previous certificate
143 self.doDNSCryptQuery(client, query, response, False)
144 self.doDNSCryptQuery(client, query, response, True)
145 cert = client.getResolverCertificate()
146 self.assertTrue(cert)
147 self.assertEquals(cert.serial, self._resolverCertificateSerial)
148
149 # but refreshing should get us the new one
150 client.refreshResolverCertificates()
151 cert = client.getResolverCertificate()
152 self.assertTrue(cert)
153 self.assertEquals(cert.serial, self._resolverCertificateSerial + 1)
154
155 # generate a third certificate, this time in memory
156 self.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 2, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
157
158 # we should still be able to send queries with the previous certificate
159 self.doDNSCryptQuery(client, query, response, False)
160 self.doDNSCryptQuery(client, query, response, True)
161 cert = client.getResolverCertificate()
162 self.assertTrue(cert)
163 self.assertEquals(cert.serial, self._resolverCertificateSerial + 1)
164
165 # but refreshing should get us the new one
166 client.refreshResolverCertificates()
167 cert = client.getResolverCertificate()
168 self.assertTrue(cert)
169 self.assertEquals(cert.serial, self._resolverCertificateSerial + 2)
170
171class TestDNSCryptWithCache(DNSCryptTest):
172
fcffc585
RG
173 _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
174 _config_template = """
175 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
176 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
177 pc = newPacketCache(5, 86400, 1)
178 getPool(""):setCache(pc)
179 newServer{address="127.0.0.1:%s"}
180 """
181
182 def testCachedSimpleA(self):
183 """
184 DNSCrypt: encrypted A query served from cache
185 """
e91084ce 186 misses = 0
fcffc585
RG
187 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
188 name = 'cacheda.dnscrypt.tests.powerdns.com.'
189 query = dns.message.make_query(name, 'A', 'IN')
190 response = dns.message.make_response(query)
191 rrset = dns.rrset.from_text(name,
192 3600,
193 dns.rdataclass.IN,
194 dns.rdatatype.A,
195 '192.2.0.1')
196 response.answer.append(rrset)
197
198 # first query to fill the cache
199 self._toResponderQueue.put(response)
200 data = client.query(query.to_wire())
201 receivedResponse = dns.message.from_wire(data)
202 receivedQuery = None
203 if not self._fromResponderQueue.empty():
204 receivedQuery = self._fromResponderQueue.get(query)
205
206 self.assertTrue(receivedQuery)
207 self.assertTrue(receivedResponse)
208 receivedQuery.id = query.id
209 self.assertEquals(query, receivedQuery)
210 self.assertEquals(response, receivedResponse)
e91084ce 211 misses += 1
fcffc585
RG
212
213 # second query should get a cached response
214 data = client.query(query.to_wire())
215 receivedResponse = dns.message.from_wire(data)
216 receivedQuery = None
217 if not self._fromResponderQueue.empty():
218 receivedQuery = self._fromResponderQueue.get(query)
219
220 self.assertEquals(receivedQuery, None)
221 self.assertTrue(receivedResponse)
222 self.assertEquals(response, receivedResponse)
e91084ce
RG
223 total = 0
224 for key in self._responsesCounter:
225 total += self._responsesCounter[key]
226 self.assertEquals(total, misses)
79500db5
RG
227
228class TestDNSCryptAutomaticRotation(DNSCryptTest):
229 _config_template = """
230 setKey("%s")
231 controlSocket("127.0.0.1:%s")
232 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
233 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
234 newServer{address="127.0.0.1:%s"}
235
236 local last = 0
237 serial = %d
238 function maintenance()
239 local now = os.time()
240 if ((now - last) > 2) then
241 serial = serial + 1
242 getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', serial, now - 60, now + 120)
243 last = now
244 end
245 end
246 """
247
248 _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort', '_resolverCertificateSerial']
249
250 def testCertRotation(self):
251 """
252 DNSCrypt: automatic certificate rotation
253 """
254 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
255
256 client.refreshResolverCertificates()
257 cert = client.getResolverCertificate()
258 self.assertTrue(cert)
259 firstSerial = cert.serial
260 self.assertGreaterEqual(cert.serial, self._resolverCertificateSerial)
261
262 time.sleep(3)
263
264 client.refreshResolverCertificates()
265 cert = client.getResolverCertificate()
266 self.assertTrue(cert)
267 secondSerial = cert.serial
268 self.assertGreater(cert.serial, firstSerial)
269
270 name = 'automatic-rotation.dnscrypt.tests.powerdns.com.'
271 query = dns.message.make_query(name, 'A', 'IN')
272 response = dns.message.make_response(query)
273 rrset = dns.rrset.from_text(name,
274 3600,
275 dns.rdataclass.IN,
276 dns.rdatatype.A,
277 '192.2.0.1')
278 response.answer.append(rrset)
279
280 self.doDNSCryptQuery(client, query, response, False)
281 self.doDNSCryptQuery(client, query, response, True)