6 from dnsdisttests
import DNSDistTest
9 class DNSCryptTest(DNSDistTest
):
11 dnsdist is configured to accept DNSCrypt queries on 127.0.0.1:_dnsDistPortDNSCrypt.
12 The provider's keys have been generated with:
13 generateDNSCryptProviderKeys("DNSCryptProviderPublic.key", "DNSCryptProviderPrivate.key")
14 Be careful to change the _providerFingerprint below if you want to regenerate the keys.
18 _dnsDistPortDNSCrypt
= 8443
20 _consoleKey
= DNSDistTest
.generateConsoleKey()
21 _consoleKeyB64
= base64
.b64encode(_consoleKey
).decode('ascii')
23 _providerFingerprint
= 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA'
24 _providerName
= "2.provider.name"
25 _resolverCertificateSerial
= 42
27 # valid from 60s ago until 2h from now
28 _resolverCertificateValidFrom
= int(time
.time() - 60)
29 _resolverCertificateValidUntil
= int(time
.time() + 7200)
31 _dnsdistStartupDelay
= 10
33 def doDNSCryptQuery(self
, client
, query
, response
, tcp
):
34 self
._toResponderQueue
.put(response
)
35 data
= client
.query(query
.to_wire(), tcp
=tcp
)
36 receivedResponse
= dns
.message
.from_wire(data
)
38 if not self
._fromResponderQueue
.empty():
39 receivedQuery
= self
._fromResponderQueue
.get(query
)
41 self
.assertTrue(receivedQuery
)
42 self
.assertTrue(receivedResponse
)
43 receivedQuery
.id = query
.id
44 self
.assertEquals(query
, receivedQuery
)
45 self
.assertEquals(response
, receivedResponse
)
48 class TestDNSCrypt(DNSCryptTest
):
49 _config_template
= """
51 controlSocket("127.0.0.1:%s")
52 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
53 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
54 newServer{address="127.0.0.1:%s"}
57 _config_params
= ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
59 def testSimpleA(self
):
61 DNSCrypt: encrypted A query
63 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", 8443)
64 name
= 'a.dnscrypt.tests.powerdns.com.'
65 query
= dns
.message
.make_query(name
, 'A', 'IN')
66 response
= dns
.message
.make_response(query
)
67 rrset
= dns
.rrset
.from_text(name
,
72 response
.answer
.append(rrset
)
74 self
.doDNSCryptQuery(client
, query
, response
, False)
75 self
.doDNSCryptQuery(client
, query
, response
, True)
77 def testResponseLargerThanPaddedQuery(self
):
79 DNSCrypt: response larger than query
81 Send a small encrypted query (don't forget to take
82 the padding into account) and check that the response
85 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", 8443)
86 name
= 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.'
87 query
= dns
.message
.make_query(name
, 'TXT', 'IN', use_edns
=True, payload
=4096)
88 response
= dns
.message
.make_response(query
)
89 rrset
= dns
.rrset
.from_text(name
,
94 response
.answer
.append(rrset
)
96 self
._toResponderQueue
.put(response
)
97 data
= client
.query(query
.to_wire())
99 if not self
._fromResponderQueue
.empty():
100 receivedQuery
= self
._fromResponderQueue
.get(query
)
102 receivedResponse
= dns
.message
.from_wire(data
)
104 self
.assertTrue(receivedQuery
)
105 receivedQuery
.id = query
.id
106 self
.assertEquals(query
, receivedQuery
)
107 self
.assertEquals(receivedResponse
.question
, response
.question
)
108 self
.assertTrue(receivedResponse
.flags
& ~dns
.flags
.TC
)
109 self
.assertTrue(len(receivedResponse
.answer
) == 0)
110 self
.assertTrue(len(receivedResponse
.authority
) == 0)
111 self
.assertTrue(len(receivedResponse
.additional
) == 0)
113 def testCertRotation(self
):
115 DNSCrypt: certificate rotation
117 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", 8443)
118 client
.refreshResolverCertificates()
120 cert
= client
.getResolverCertificate()
121 self
.assertTrue(cert
)
122 self
.assertEquals(cert
.serial
, self
._resolverCertificateSerial
)
124 name
= 'rotation.dnscrypt.tests.powerdns.com.'
125 query
= dns
.message
.make_query(name
, 'A', 'IN')
126 response
= dns
.message
.make_response(query
)
127 rrset
= dns
.rrset
.from_text(name
,
132 response
.answer
.append(rrset
)
134 self
.doDNSCryptQuery(client
, query
, response
, False)
135 self
.doDNSCryptQuery(client
, query
, response
, True)
137 # generate a new certificate
138 self
.sendConsoleCommand("generateDNSCryptCertificate('DNSCryptProviderPrivate.key', 'DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2', {!s}, {:.0f}, {:.0f})".format(self
._resolverCertificateSerial
+ 1, self
._resolverCertificateValidFrom
, self
._resolverCertificateValidUntil
))
139 # switch to that new certificate
140 self
.sendConsoleCommand("getDNSCryptBind(0):loadNewCertificate('DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2')")
142 oldSerial
= self
.sendConsoleCommand("getDNSCryptBind(0):getOldCertificate():getSerial()")
143 self
.assertEquals(int(oldSerial
), self
._resolverCertificateSerial
)
144 effectiveSerial
= self
.sendConsoleCommand("getDNSCryptBind(0):getCurrentCertificate():getSerial()")
145 self
.assertEquals(int(effectiveSerial
), self
._resolverCertificateSerial
+ 1)
146 tsStart
= self
.sendConsoleCommand("getDNSCryptBind(0):getCurrentCertificate():getTSStart()")
147 self
.assertEquals(int(tsStart
), self
._resolverCertificateValidFrom
)
148 tsEnd
= self
.sendConsoleCommand("getDNSCryptBind(0):getCurrentCertificate():getTSEnd()")
149 self
.assertEquals(int(tsEnd
), self
._resolverCertificateValidUntil
)
151 # we should still be able to send queries with the previous certificate
152 self
.doDNSCryptQuery(client
, query
, response
, False)
153 self
.doDNSCryptQuery(client
, query
, response
, True)
154 cert
= client
.getResolverCertificate()
155 self
.assertTrue(cert
)
156 self
.assertEquals(cert
.serial
, self
._resolverCertificateSerial
)
158 # but refreshing should get us the new one
159 client
.refreshResolverCertificates()
160 cert
= client
.getResolverCertificate()
161 self
.assertTrue(cert
)
162 self
.assertEquals(cert
.serial
, self
._resolverCertificateSerial
+ 1)
164 # generate a third certificate, this time in memory
165 self
.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self
._resolverCertificateSerial
+ 2, self
._resolverCertificateValidFrom
, self
._resolverCertificateValidUntil
))
167 # we should still be able to send queries with the previous certificate
168 self
.doDNSCryptQuery(client
, query
, response
, False)
169 self
.doDNSCryptQuery(client
, query
, response
, True)
170 cert
= client
.getResolverCertificate()
171 self
.assertTrue(cert
)
172 self
.assertEquals(cert
.serial
, self
._resolverCertificateSerial
+ 1)
174 # but refreshing should get us the new one
175 client
.refreshResolverCertificates()
176 cert
= client
.getResolverCertificate()
177 self
.assertTrue(cert
)
178 self
.assertEquals(cert
.serial
, self
._resolverCertificateSerial
+ 2)
180 class TestDNSCryptWithCache(DNSCryptTest
):
182 _config_params
= ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
183 _config_template
= """
184 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
185 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
186 pc = newPacketCache(5, 86400, 1)
187 getPool(""):setCache(pc)
188 newServer{address="127.0.0.1:%s"}
191 def testCachedSimpleA(self
):
193 DNSCrypt: encrypted A query served from cache
196 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", 8443)
197 name
= 'cacheda.dnscrypt.tests.powerdns.com.'
198 query
= dns
.message
.make_query(name
, 'A', 'IN')
199 response
= dns
.message
.make_response(query
)
200 rrset
= dns
.rrset
.from_text(name
,
205 response
.answer
.append(rrset
)
207 # first query to fill the cache
208 self
._toResponderQueue
.put(response
)
209 data
= client
.query(query
.to_wire())
210 receivedResponse
= dns
.message
.from_wire(data
)
212 if not self
._fromResponderQueue
.empty():
213 receivedQuery
= self
._fromResponderQueue
.get(query
)
215 self
.assertTrue(receivedQuery
)
216 self
.assertTrue(receivedResponse
)
217 receivedQuery
.id = query
.id
218 self
.assertEquals(query
, receivedQuery
)
219 self
.assertEquals(response
, receivedResponse
)
222 # second query should get a cached response
223 data
= client
.query(query
.to_wire())
224 receivedResponse
= dns
.message
.from_wire(data
)
226 if not self
._fromResponderQueue
.empty():
227 receivedQuery
= self
._fromResponderQueue
.get(query
)
229 self
.assertEquals(receivedQuery
, None)
230 self
.assertTrue(receivedResponse
)
231 self
.assertEquals(response
, receivedResponse
)
233 for key
in self
._responsesCounter
:
234 total
+= self
._responsesCounter
[key
]
235 self
.assertEquals(total
, misses
)
237 class TestDNSCryptAutomaticRotation(DNSCryptTest
):
238 _config_template
= """
240 controlSocket("127.0.0.1:%s")
241 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
242 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
243 newServer{address="127.0.0.1:%s"}
247 function maintenance()
248 local now = os.time()
249 if ((now - last) > 2) then
251 getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', serial, now - 60, now + 120)
257 _config_params
= ['_consoleKeyB64', '_consolePort', '_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort', '_resolverCertificateSerial']
259 def testCertRotation(self
):
261 DNSCrypt: automatic certificate rotation
263 client
= dnscrypt
.DNSCryptClient(self
._providerName
, self
._providerFingerprint
, "127.0.0.1", 8443)
265 client
.refreshResolverCertificates()
266 cert
= client
.getResolverCertificate()
267 self
.assertTrue(cert
)
268 firstSerial
= cert
.serial
269 self
.assertGreaterEqual(cert
.serial
, self
._resolverCertificateSerial
)
273 client
.refreshResolverCertificates()
274 cert
= client
.getResolverCertificate()
275 self
.assertTrue(cert
)
276 secondSerial
= cert
.serial
277 self
.assertGreater(cert
.serial
, firstSerial
)
279 name
= 'automatic-rotation.dnscrypt.tests.powerdns.com.'
280 query
= dns
.message
.make_query(name
, 'A', 'IN')
281 response
= dns
.message
.make_response(query
)
282 rrset
= dns
.rrset
.from_text(name
,
287 response
.answer
.append(rrset
)
289 self
.doDNSCryptQuery(client
, query
, response
, False)
290 self
.doDNSCryptQuery(client
, query
, response
, True)