]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_DNSCrypt.py
Merge pull request #6289 from zeha/dnsdist-stats
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DNSCrypt.py
CommitLineData
b8db58a2 1#!/usr/bin/env python
79500db5 2import base64
b8db58a2 3import time
b1bec9f0
RG
4import dns
5import dns.message
b8db58a2
RG
6from dnsdisttests import DNSDistTest
7import dnscrypt
8
79500db5 9class DNSCryptTest(DNSDistTest):
b8db58a2
RG
10 """
11 dnsdist is configured to accept DNSCrypt queries on 127.0.0.1:_dnsDistPortDNSCrypt.
12 The provider's keys have been generated with:
13 generateDNSCryptProviderKeys("DNSCryptProviderPublic.key", "DNSCryptProviderPrivate.key")
14 Be careful to change the _providerFingerprint below if you want to regenerate the keys.
15 """
16
17 _dnsDistPort = 5340
18 _dnsDistPortDNSCrypt = 8443
79500db5
RG
19
20 _consoleKey = DNSDistTest.generateConsoleKey()
b4f23783 21 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
b8db58a2 22
b8db58a2
RG
23 _providerFingerprint = 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA'
24 _providerName = "2.provider.name"
bd64cc44 25 _resolverCertificateSerial = 42
79500db5 26
bd64cc44 27 # valid from 60s ago until 2h from now
13a325f5
RG
28 _resolverCertificateValidFrom = int(time.time() - 60)
29 _resolverCertificateValidUntil = int(time.time() + 7200)
79500db5 30
617dfe22 31 _dnsdistStartupDelay = 10
b8db58a2 32
79500db5
RG
33 def doDNSCryptQuery(self, client, query, response, tcp):
34 self._toResponderQueue.put(response)
35 data = client.query(query.to_wire(), tcp=tcp)
36 receivedResponse = dns.message.from_wire(data)
37 receivedQuery = None
38 if not self._fromResponderQueue.empty():
39 receivedQuery = self._fromResponderQueue.get(query)
40
41 self.assertTrue(receivedQuery)
42 self.assertTrue(receivedResponse)
43 receivedQuery.id = query.id
44 self.assertEquals(query, receivedQuery)
45 self.assertEquals(response, receivedResponse)
46
47
48class TestDNSCrypt(DNSCryptTest):
49 _config_template = """
50 setKey("%s")
51 controlSocket("127.0.0.1:%s")
52 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
53 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
54 newServer{address="127.0.0.1:%s"}
55 """
56
57 _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
58
b8db58a2
RG
59 def testSimpleA(self):
60 """
617dfe22 61 DNSCrypt: encrypted A query
b8db58a2
RG
62 """
63 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
64 name = 'a.dnscrypt.tests.powerdns.com.'
65 query = dns.message.make_query(name, 'A', 'IN')
66 response = dns.message.make_response(query)
67 rrset = dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
fcffc585 71 '192.2.0.1')
b8db58a2
RG
72 response.answer.append(rrset)
73
79500db5
RG
74 self.doDNSCryptQuery(client, query, response, False)
75 self.doDNSCryptQuery(client, query, response, True)
fcffc585 76
bd64cc44
RG
77 def testResponseLargerThanPaddedQuery(self):
78 """
617dfe22
RG
79 DNSCrypt: response larger than query
80
bd64cc44
RG
81 Send a small encrypted query (don't forget to take
82 the padding into account) and check that the response
83 is truncated.
84 """
85 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
86 name = 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.'
87 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096)
88 response = dns.message.make_response(query)
89 rrset = dns.rrset.from_text(name,
90 3600,
91 dns.rdataclass.IN,
92 dns.rdatatype.TXT,
93 'A'*255)
94 response.answer.append(rrset)
95
96 self._toResponderQueue.put(response)
97 data = client.query(query.to_wire())
98 receivedQuery = None
99 if not self._fromResponderQueue.empty():
100 receivedQuery = self._fromResponderQueue.get(query)
101
102 receivedResponse = dns.message.from_wire(data)
103
104 self.assertTrue(receivedQuery)
105 receivedQuery.id = query.id
106 self.assertEquals(query, receivedQuery)
107 self.assertEquals(receivedResponse.question, response.question)
108 self.assertTrue(receivedResponse.flags & ~dns.flags.TC)
109 self.assertTrue(len(receivedResponse.answer) == 0)
110 self.assertTrue(len(receivedResponse.authority) == 0)
111 self.assertTrue(len(receivedResponse.additional) == 0)
112
79500db5
RG
113 def testCertRotation(self):
114 """
115 DNSCrypt: certificate rotation
116 """
117 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
118 client.refreshResolverCertificates()
119
120 cert = client.getResolverCertificate()
121 self.assertTrue(cert)
122 self.assertEquals(cert.serial, self._resolverCertificateSerial)
123
124 name = 'rotation.dnscrypt.tests.powerdns.com.'
125 query = dns.message.make_query(name, 'A', 'IN')
126 response = dns.message.make_response(query)
127 rrset = dns.rrset.from_text(name,
128 3600,
129 dns.rdataclass.IN,
130 dns.rdatatype.A,
131 '192.2.0.1')
132 response.answer.append(rrset)
133
134 self.doDNSCryptQuery(client, query, response, False)
135 self.doDNSCryptQuery(client, query, response, True)
136
137 # generate a new certificate
138 self.sendConsoleCommand("generateDNSCryptCertificate('DNSCryptProviderPrivate.key', 'DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 1, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
139 # switch to that new certificate
140 self.sendConsoleCommand("getDNSCryptBind(0):loadNewCertificate('DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2')")
141
13a325f5
RG
142 oldSerial = self.sendConsoleCommand("getDNSCryptBind(0):getOldCertificate():getSerial()")
143 self.assertEquals(int(oldSerial), self._resolverCertificateSerial)
144 effectiveSerial = self.sendConsoleCommand("getDNSCryptBind(0):getCurrentCertificate():getSerial()")
145 self.assertEquals(int(effectiveSerial), self._resolverCertificateSerial + 1)
146 tsStart = self.sendConsoleCommand("getDNSCryptBind(0):getCurrentCertificate():getTSStart()")
147 self.assertEquals(int(tsStart), self._resolverCertificateValidFrom)
148 tsEnd = self.sendConsoleCommand("getDNSCryptBind(0):getCurrentCertificate():getTSEnd()")
149 self.assertEquals(int(tsEnd), self._resolverCertificateValidUntil)
150
79500db5
RG
151 # we should still be able to send queries with the previous certificate
152 self.doDNSCryptQuery(client, query, response, False)
153 self.doDNSCryptQuery(client, query, response, True)
154 cert = client.getResolverCertificate()
155 self.assertTrue(cert)
156 self.assertEquals(cert.serial, self._resolverCertificateSerial)
157
158 # but refreshing should get us the new one
159 client.refreshResolverCertificates()
160 cert = client.getResolverCertificate()
161 self.assertTrue(cert)
162 self.assertEquals(cert.serial, self._resolverCertificateSerial + 1)
163
164 # generate a third certificate, this time in memory
165 self.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 2, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
166
167 # we should still be able to send queries with the previous certificate
168 self.doDNSCryptQuery(client, query, response, False)
169 self.doDNSCryptQuery(client, query, response, True)
170 cert = client.getResolverCertificate()
171 self.assertTrue(cert)
172 self.assertEquals(cert.serial, self._resolverCertificateSerial + 1)
173
174 # but refreshing should get us the new one
175 client.refreshResolverCertificates()
176 cert = client.getResolverCertificate()
177 self.assertTrue(cert)
178 self.assertEquals(cert.serial, self._resolverCertificateSerial + 2)
179
180class TestDNSCryptWithCache(DNSCryptTest):
181
fcffc585
RG
182 _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
183 _config_template = """
184 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
185 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
186 pc = newPacketCache(5, 86400, 1)
187 getPool(""):setCache(pc)
188 newServer{address="127.0.0.1:%s"}
189 """
190
191 def testCachedSimpleA(self):
192 """
193 DNSCrypt: encrypted A query served from cache
194 """
e91084ce 195 misses = 0
fcffc585
RG
196 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
197 name = 'cacheda.dnscrypt.tests.powerdns.com.'
198 query = dns.message.make_query(name, 'A', 'IN')
199 response = dns.message.make_response(query)
200 rrset = dns.rrset.from_text(name,
201 3600,
202 dns.rdataclass.IN,
203 dns.rdatatype.A,
204 '192.2.0.1')
205 response.answer.append(rrset)
206
207 # first query to fill the cache
208 self._toResponderQueue.put(response)
209 data = client.query(query.to_wire())
210 receivedResponse = dns.message.from_wire(data)
211 receivedQuery = None
212 if not self._fromResponderQueue.empty():
213 receivedQuery = self._fromResponderQueue.get(query)
214
215 self.assertTrue(receivedQuery)
216 self.assertTrue(receivedResponse)
217 receivedQuery.id = query.id
218 self.assertEquals(query, receivedQuery)
219 self.assertEquals(response, receivedResponse)
e91084ce 220 misses += 1
fcffc585
RG
221
222 # second query should get a cached response
223 data = client.query(query.to_wire())
224 receivedResponse = dns.message.from_wire(data)
225 receivedQuery = None
226 if not self._fromResponderQueue.empty():
227 receivedQuery = self._fromResponderQueue.get(query)
228
229 self.assertEquals(receivedQuery, None)
230 self.assertTrue(receivedResponse)
231 self.assertEquals(response, receivedResponse)
e91084ce
RG
232 total = 0
233 for key in self._responsesCounter:
234 total += self._responsesCounter[key]
235 self.assertEquals(total, misses)
79500db5
RG
236
237class TestDNSCryptAutomaticRotation(DNSCryptTest):
238 _config_template = """
239 setKey("%s")
240 controlSocket("127.0.0.1:%s")
241 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
242 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
243 newServer{address="127.0.0.1:%s"}
244
245 local last = 0
246 serial = %d
247 function maintenance()
248 local now = os.time()
249 if ((now - last) > 2) then
250 serial = serial + 1
251 getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', serial, now - 60, now + 120)
252 last = now
253 end
254 end
255 """
256
257 _config_params = ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort', '_resolverCertificateSerial']
258
259 def testCertRotation(self):
260 """
261 DNSCrypt: automatic certificate rotation
262 """
263 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
264
265 client.refreshResolverCertificates()
266 cert = client.getResolverCertificate()
267 self.assertTrue(cert)
268 firstSerial = cert.serial
269 self.assertGreaterEqual(cert.serial, self._resolverCertificateSerial)
270
271 time.sleep(3)
272
273 client.refreshResolverCertificates()
274 cert = client.getResolverCertificate()
275 self.assertTrue(cert)
276 secondSerial = cert.serial
277 self.assertGreater(cert.serial, firstSerial)
278
279 name = 'automatic-rotation.dnscrypt.tests.powerdns.com.'
280 query = dns.message.make_query(name, 'A', 'IN')
281 response = dns.message.make_response(query)
282 rrset = dns.rrset.from_text(name,
283 3600,
284 dns.rdataclass.IN,
285 dns.rdatatype.A,
286 '192.2.0.1')
287 response.answer.append(rrset)
288
289 self.doDNSCryptQuery(client, query, response, False)
290 self.doDNSCryptQuery(client, query, response, True)