]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_DNSCrypt.py
dnsdist: Refactor duplicated response handling code (UDP/TCP)
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DNSCrypt.py
CommitLineData
b8db58a2 1#!/usr/bin/env python
b8db58a2 2import time
b1bec9f0
RG
3import dns
4import dns.message
b8db58a2
RG
5from dnsdisttests import DNSDistTest
6import dnscrypt
7
8class TestDNSCrypt(DNSDistTest):
9 """
10 dnsdist is configured to accept DNSCrypt queries on 127.0.0.1:_dnsDistPortDNSCrypt.
11 The provider's keys have been generated with:
12 generateDNSCryptProviderKeys("DNSCryptProviderPublic.key", "DNSCryptProviderPrivate.key")
13 Be careful to change the _providerFingerprint below if you want to regenerate the keys.
14 """
15
16 _dnsDistPort = 5340
17 _dnsDistPortDNSCrypt = 8443
18 _config_template = """
bd64cc44 19 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
b8db58a2
RG
20 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
21 newServer{address="127.0.0.1:%s"}
22 """
23
b8db58a2
RG
24 _providerFingerprint = 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA'
25 _providerName = "2.provider.name"
bd64cc44
RG
26 _resolverCertificateSerial = 42
27 # valid from 60s ago until 2h from now
28 _resolverCertificateValidFrom = time.time() - 60
29 _resolverCertificateValidUntil = time.time() + 7200
30 _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
617dfe22 31 _dnsdistStartupDelay = 10
b8db58a2
RG
32
33 def testSimpleA(self):
34 """
617dfe22 35 DNSCrypt: encrypted A query
b8db58a2
RG
36 """
37 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
38 name = 'a.dnscrypt.tests.powerdns.com.'
39 query = dns.message.make_query(name, 'A', 'IN')
40 response = dns.message.make_response(query)
41 rrset = dns.rrset.from_text(name,
42 3600,
43 dns.rdataclass.IN,
44 dns.rdatatype.A,
fcffc585 45 '192.2.0.1')
b8db58a2
RG
46 response.answer.append(rrset)
47
48 self._toResponderQueue.put(response)
49 data = client.query(query.to_wire())
50 receivedResponse = dns.message.from_wire(data)
51 receivedQuery = None
52 if not self._fromResponderQueue.empty():
53 receivedQuery = self._fromResponderQueue.get(query)
54
55 self.assertTrue(receivedQuery)
56 self.assertTrue(receivedResponse)
57 receivedQuery.id = query.id
b8db58a2
RG
58 self.assertEquals(query, receivedQuery)
59 self.assertEquals(response, receivedResponse)
60
fcffc585
RG
61 self._toResponderQueue.put(response)
62 data = client.query(query.to_wire(), tcp=True)
63 receivedResponse = dns.message.from_wire(data)
64 receivedQuery = None
65 if not self._fromResponderQueue.empty():
66 receivedQuery = self._fromResponderQueue.get(query)
67
68 self.assertTrue(receivedQuery)
69 self.assertTrue(receivedResponse)
70 receivedQuery.id = query.id
71 self.assertEquals(query, receivedQuery)
72 self.assertEquals(response, receivedResponse)
73
bd64cc44
RG
74 def testResponseLargerThanPaddedQuery(self):
75 """
617dfe22
RG
76 DNSCrypt: response larger than query
77
bd64cc44
RG
78 Send a small encrypted query (don't forget to take
79 the padding into account) and check that the response
80 is truncated.
81 """
82 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
83 name = 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.'
84 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096)
85 response = dns.message.make_response(query)
86 rrset = dns.rrset.from_text(name,
87 3600,
88 dns.rdataclass.IN,
89 dns.rdatatype.TXT,
90 'A'*255)
91 response.answer.append(rrset)
92
93 self._toResponderQueue.put(response)
94 data = client.query(query.to_wire())
95 receivedQuery = None
96 if not self._fromResponderQueue.empty():
97 receivedQuery = self._fromResponderQueue.get(query)
98
99 receivedResponse = dns.message.from_wire(data)
100
101 self.assertTrue(receivedQuery)
102 receivedQuery.id = query.id
103 self.assertEquals(query, receivedQuery)
104 self.assertEquals(receivedResponse.question, response.question)
105 self.assertTrue(receivedResponse.flags & ~dns.flags.TC)
106 self.assertTrue(len(receivedResponse.answer) == 0)
107 self.assertTrue(len(receivedResponse.authority) == 0)
108 self.assertTrue(len(receivedResponse.additional) == 0)
109
fcffc585
RG
110class TestDNSCryptWithCache(DNSDistTest):
111 _dnsDistPortDNSCrypt = 8443
112 _providerFingerprint = 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA'
113 _providerName = "2.provider.name"
114 _resolverCertificateSerial = 42
115 # valid from 60s ago until 2h from now
116 _resolverCertificateValidFrom = time.time() - 60
117 _resolverCertificateValidUntil = time.time() + 7200
118 _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
119 _config_template = """
120 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
121 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
122 pc = newPacketCache(5, 86400, 1)
123 getPool(""):setCache(pc)
124 newServer{address="127.0.0.1:%s"}
125 """
126
127 def testCachedSimpleA(self):
128 """
129 DNSCrypt: encrypted A query served from cache
130 """
131 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
132 name = 'cacheda.dnscrypt.tests.powerdns.com.'
133 query = dns.message.make_query(name, 'A', 'IN')
134 response = dns.message.make_response(query)
135 rrset = dns.rrset.from_text(name,
136 3600,
137 dns.rdataclass.IN,
138 dns.rdatatype.A,
139 '192.2.0.1')
140 response.answer.append(rrset)
141
142 # first query to fill the cache
143 self._toResponderQueue.put(response)
144 data = client.query(query.to_wire())
145 receivedResponse = dns.message.from_wire(data)
146 receivedQuery = None
147 if not self._fromResponderQueue.empty():
148 receivedQuery = self._fromResponderQueue.get(query)
149
150 self.assertTrue(receivedQuery)
151 self.assertTrue(receivedResponse)
152 receivedQuery.id = query.id
153 self.assertEquals(query, receivedQuery)
154 self.assertEquals(response, receivedResponse)
155
156 # second query should get a cached response
157 data = client.query(query.to_wire())
158 receivedResponse = dns.message.from_wire(data)
159 receivedQuery = None
160 if not self._fromResponderQueue.empty():
161 receivedQuery = self._fromResponderQueue.get(query)
162
163 self.assertEquals(receivedQuery, None)
164 self.assertTrue(receivedResponse)
165 self.assertEquals(response, receivedResponse)