]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_DNSCrypt.py
Merge pull request #3564 from rgacogne/dnsdist-readn2-eagain
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DNSCrypt.py
CommitLineData
b8db58a2 1#!/usr/bin/env python
b8db58a2
RG
2import time
3import unittest
b1bec9f0
RG
4import dns
5import dns.message
b8db58a2
RG
6from dnsdisttests import DNSDistTest
7import dnscrypt
8
9class TestDNSCrypt(DNSDistTest):
10 """
11 dnsdist is configured to accept DNSCrypt queries on 127.0.0.1:_dnsDistPortDNSCrypt.
12 The provider's keys have been generated with:
13 generateDNSCryptProviderKeys("DNSCryptProviderPublic.key", "DNSCryptProviderPrivate.key")
14 Be careful to change the _providerFingerprint below if you want to regenerate the keys.
15 """
16
17 _dnsDistPort = 5340
18 _dnsDistPortDNSCrypt = 8443
19 _config_template = """
bd64cc44 20 generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
b8db58a2
RG
21 addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
22 newServer{address="127.0.0.1:%s"}
23 """
24
b8db58a2
RG
25 _providerFingerprint = 'E1D7:2108:9A59:BF8D:F101:16FA:ED5E:EA6A:9F6C:C78F:7F91:AF6B:027E:62F4:69C3:B1AA'
26 _providerName = "2.provider.name"
bd64cc44
RG
27 _resolverCertificateSerial = 42
28 # valid from 60s ago until 2h from now
29 _resolverCertificateValidFrom = time.time() - 60
30 _resolverCertificateValidUntil = time.time() + 7200
31 _config_params = ['_resolverCertificateSerial', '_resolverCertificateValidFrom', '_resolverCertificateValidUntil', '_dnsDistPortDNSCrypt', '_providerName', '_testServerPort']
617dfe22 32 _dnsdistStartupDelay = 10
b8db58a2
RG
33
34 def testSimpleA(self):
35 """
617dfe22 36 DNSCrypt: encrypted A query
b8db58a2
RG
37 """
38 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
39 name = 'a.dnscrypt.tests.powerdns.com.'
40 query = dns.message.make_query(name, 'A', 'IN')
41 response = dns.message.make_response(query)
42 rrset = dns.rrset.from_text(name,
43 3600,
44 dns.rdataclass.IN,
45 dns.rdatatype.A,
46 '127.0.0.1')
47 response.answer.append(rrset)
48
49 self._toResponderQueue.put(response)
50 data = client.query(query.to_wire())
51 receivedResponse = dns.message.from_wire(data)
52 receivedQuery = None
53 if not self._fromResponderQueue.empty():
54 receivedQuery = self._fromResponderQueue.get(query)
55
56 self.assertTrue(receivedQuery)
57 self.assertTrue(receivedResponse)
58 receivedQuery.id = query.id
b8db58a2
RG
59 self.assertEquals(query, receivedQuery)
60 self.assertEquals(response, receivedResponse)
61
bd64cc44
RG
62 def testResponseLargerThanPaddedQuery(self):
63 """
617dfe22
RG
64 DNSCrypt: response larger than query
65
bd64cc44
RG
66 Send a small encrypted query (don't forget to take
67 the padding into account) and check that the response
68 is truncated.
69 """
70 client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
71 name = 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.'
72 query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096)
73 response = dns.message.make_response(query)
74 rrset = dns.rrset.from_text(name,
75 3600,
76 dns.rdataclass.IN,
77 dns.rdatatype.TXT,
78 'A'*255)
79 response.answer.append(rrset)
80
81 self._toResponderQueue.put(response)
82 data = client.query(query.to_wire())
83 receivedQuery = None
84 if not self._fromResponderQueue.empty():
85 receivedQuery = self._fromResponderQueue.get(query)
86
87 receivedResponse = dns.message.from_wire(data)
88
89 self.assertTrue(receivedQuery)
90 receivedQuery.id = query.id
91 self.assertEquals(query, receivedQuery)
92 self.assertEquals(receivedResponse.question, response.question)
93 self.assertTrue(receivedResponse.flags & ~dns.flags.TC)
94 self.assertTrue(len(receivedResponse.answer) == 0)
95 self.assertTrue(len(receivedResponse.authority) == 0)
96 self.assertTrue(len(receivedResponse.additional) == 0)
97
b8db58a2
RG
98if __name__ == '__main__':
99 unittest.main()
100 exit(0)