]>
Commit | Line | Data |
---|---|---|
b8db58a2 | 1 | #!/usr/bin/env python |
b8db58a2 RG |
2 | import time |
3 | import unittest | |
b1bec9f0 RG |
4 | import dns |
5 | import dns.message | |
b8db58a2 RG |
6 | from dnsdisttests import DNSDistTest |
7 | import dnscrypt | |
8 | ||
9 | class TestDNSCrypt(DNSDistTest): | |
10 | """ | |
11 | dnsdist is configured to accept DNSCrypt queries on 127.0.0.1:_dnsDistPortDNSCrypt. | |
12 | The provider's keys have been generated with: | |
13 | generateDNSCryptProviderKeys("DNSCryptProviderPublic.key", "DNSCryptProviderPrivate.key") | |
14 | Be careful to change the _providerFingerprint below if you want to regenerate the keys. | |
15 | """ | |
16 | ||
17 | _dnsDistPort = 5340 | |
18 | _dnsDistPortDNSCrypt = 8443 | |
19 | _config_template = """ | |
bd64cc44 | 20 | generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d) |
b8db58a2 RG |
21 | addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key") |
22 | newServer{address="127.0.0.1:%s"} | |
23 | """ | |
24 | ||
b8db58a2 RG |
25 | _providerFingerprint = 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA' |
26 | _providerName = "2.provider.name" | |
bd64cc44 RG |
27 | _resolverCertificateSerial = 42 |
28 | # valid from 60s ago until 2h from now | |
29 | _resolverCertificateValidFrom = time.time() - 60 | |
30 | _resolverCertificateValidUntil = time.time() + 7200 | |
31 | _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort'] | |
617dfe22 | 32 | _dnsdistStartupDelay = 10 |
b8db58a2 RG |
33 | |
34 | def testSimpleA(self): | |
35 | """ | |
617dfe22 | 36 | DNSCrypt: encrypted A query |
b8db58a2 RG |
37 | """ |
38 | client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443) | |
39 | name = 'a.dnscrypt.tests.powerdns.com.' | |
40 | query = dns.message.make_query(name, 'A', 'IN') | |
41 | response = dns.message.make_response(query) | |
42 | rrset = dns.rrset.from_text(name, | |
43 | 3600, | |
44 | dns.rdataclass.IN, | |
45 | dns.rdatatype.A, | |
46 | '127.0.0.1') | |
47 | response.answer.append(rrset) | |
48 | ||
49 | self._toResponderQueue.put(response) | |
50 | data = client.query(query.to_wire()) | |
51 | receivedResponse = dns.message.from_wire(data) | |
52 | receivedQuery = None | |
53 | if not self._fromResponderQueue.empty(): | |
54 | receivedQuery = self._fromResponderQueue.get(query) | |
55 | ||
56 | self.assertTrue(receivedQuery) | |
57 | self.assertTrue(receivedResponse) | |
58 | receivedQuery.id = query.id | |
b8db58a2 RG |
59 | self.assertEquals(query, receivedQuery) |
60 | self.assertEquals(response, receivedResponse) | |
61 | ||
bd64cc44 RG |
62 | def testResponseLargerThanPaddedQuery(self): |
63 | """ | |
617dfe22 RG |
64 | DNSCrypt: response larger than query |
65 | ||
bd64cc44 RG |
66 | Send a small encrypted query (don't forget to take |
67 | the padding into account) and check that the response | |
68 | is truncated. | |
69 | """ | |
70 | client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443) | |
71 | name = 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.' | |
72 | query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096) | |
73 | response = dns.message.make_response(query) | |
74 | rrset = dns.rrset.from_text(name, | |
75 | 3600, | |
76 | dns.rdataclass.IN, | |
77 | dns.rdatatype.TXT, | |
78 | 'A'*255) | |
79 | response.answer.append(rrset) | |
80 | ||
81 | self._toResponderQueue.put(response) | |
82 | data = client.query(query.to_wire()) | |
83 | receivedQuery = None | |
84 | if not self._fromResponderQueue.empty(): | |
85 | receivedQuery = self._fromResponderQueue.get(query) | |
86 | ||
87 | receivedResponse = dns.message.from_wire(data) | |
88 | ||
89 | self.assertTrue(receivedQuery) | |
90 | receivedQuery.id = query.id | |
91 | self.assertEquals(query, receivedQuery) | |
92 | self.assertEquals(receivedResponse.question, response.question) | |
93 | self.assertTrue(receivedResponse.flags & ~dns.flags.TC) | |
94 | self.assertTrue(len(receivedResponse.answer) == 0) | |
95 | self.assertTrue(len(receivedResponse.authority) == 0) | |
96 | self.assertTrue(len(receivedResponse.additional) == 0) | |
97 | ||
b8db58a2 RG |
98 | if __name__ == '__main__': |
99 | unittest.main() | |
100 | exit(0) |