]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_TLS.py
Merge pull request #8713 from rgacogne/auth-strict-caches-size
[thirdparty/pdns.git] / regression-tests.dnsdist / test_TLS.py
1 #!/usr/bin/env python
2 import dns
3 from dnsdisttests import DNSDistTest
4
5 class TestTLS(DNSDistTest):
6
7 _serverKey = 'server.key'
8 _serverCert = 'server.chain'
9 _serverName = 'tls.tests.dnsdist.org'
10 _caCert = 'ca.pem'
11 _tlsServerPort = 8453
12 _config_template = """
13 newServer{address="127.0.0.1:%s"}
14 addTLSLocal("127.0.0.1:%s", "%s", "%s")
15 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
16 """
17 _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
18
19 def testTLSSimple(self):
20 """
21 TLS: Single query
22 """
23 name = 'single.tls.tests.powerdns.com.'
24 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
25 response = dns.message.make_response(query)
26 rrset = dns.rrset.from_text(name,
27 3600,
28 dns.rdataclass.IN,
29 dns.rdatatype.A,
30 '127.0.0.1')
31 response.answer.append(rrset)
32
33 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
34
35 self.sendTCPQueryOverConnection(conn, query, response=response)
36 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
37 self.assertTrue(receivedQuery)
38 self.assertTrue(receivedResponse)
39 receivedQuery.id = query.id
40 self.assertEquals(query, receivedQuery)
41 self.assertEquals(response, receivedResponse)
42
43 def testTLKA(self):
44 """
45 TLS: Several queries over the same connection
46 """
47 name = 'ka.tls.tests.powerdns.com.'
48 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
49 response = dns.message.make_response(query)
50 rrset = dns.rrset.from_text(name,
51 3600,
52 dns.rdataclass.IN,
53 dns.rdatatype.A,
54 '127.0.0.1')
55 response.answer.append(rrset)
56
57 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
58
59 for idx in range(5):
60 self.sendTCPQueryOverConnection(conn, query, response=response)
61 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
62 self.assertTrue(receivedQuery)
63 self.assertTrue(receivedResponse)
64 receivedQuery.id = query.id
65 self.assertEquals(query, receivedQuery)
66 self.assertEquals(response, receivedResponse)
67
68 def testTLSPipelining(self):
69 """
70 TLS: Several queries over the same connection without waiting for the responses
71 """
72 name = 'pipelining.tls.tests.powerdns.com.'
73 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
74 response = dns.message.make_response(query)
75 rrset = dns.rrset.from_text(name,
76 3600,
77 dns.rdataclass.IN,
78 dns.rdatatype.A,
79 '127.0.0.1')
80 response.answer.append(rrset)
81
82 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
83
84 for idx in range(100):
85 self.sendTCPQueryOverConnection(conn, query, response=response)
86
87 for idx in range(100):
88 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
89 self.assertTrue(receivedQuery)
90 self.assertTrue(receivedResponse)
91 receivedQuery.id = query.id
92 self.assertEquals(query, receivedQuery)
93 self.assertEquals(response, receivedResponse)
94
95 def testTLSSNIRouting(self):
96 """
97 TLS: SNI Routing
98 """
99 name = 'sni.tls.tests.powerdns.com.'
100 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
101 query.flags &= ~dns.flags.RD
102 response = dns.message.make_response(query)
103 rrset = dns.rrset.from_text(name,
104 3600,
105 dns.rdataclass.IN,
106 dns.rdatatype.A,
107 '127.0.0.1')
108 response.answer.append(rrset)
109 expectedResponse = dns.message.make_response(query)
110 rrset = dns.rrset.from_text(name,
111 3600,
112 dns.rdataclass.IN,
113 dns.rdatatype.A,
114 '1.2.3.4')
115 expectedResponse.answer.append(rrset)
116
117 # this SNI should match so we should get a spoofed answer
118 conn = self.openTLSConnection(self._tlsServerPort, 'powerdns.com', self._caCert)
119
120 self.sendTCPQueryOverConnection(conn, query, response=None)
121 receivedResponse = self.recvTCPResponseOverConnection(conn, useQueue=False)
122 self.assertTrue(receivedResponse)
123 self.assertEquals(expectedResponse, receivedResponse)
124
125 # this one should not
126 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
127
128 self.sendTCPQueryOverConnection(conn, query, response=response)
129 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
130 self.assertTrue(receivedQuery)
131 self.assertTrue(receivedResponse)
132 receivedQuery.id = query.id
133 self.assertEquals(query, receivedQuery)
134 self.assertEquals(response, receivedResponse)
135
136 class TestDOTWithCache(DNSDistTest):
137
138 _serverKey = 'server.key'
139 _serverCert = 'server.chain'
140 _serverName = 'tls.tests.dnsdist.org'
141 _caCert = 'ca.pem'
142 _tlsServerPort = 8453
143 _config_template = """
144 newServer{address="127.0.0.1:%s"}
145
146 addTLSLocal("127.0.0.1:%s", "%s", "%s")
147
148 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
149 getPool(""):setCache(pc)
150 """
151 _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
152
153 def testDOTCacheLargeAnswer(self):
154 """
155 DOT with cache: Check that we can cache (and retrieve) large answers
156 """
157 numberOfQueries = 10
158 name = 'large.dot-with-cache.tests.powerdns.com.'
159 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
160 query.id = 0
161 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
162 expectedQuery.id = 0
163 response = dns.message.make_response(query)
164 # we prepare a large answer
165 content = ""
166 for i in range(44):
167 if len(content) > 0:
168 content = content + ', '
169 content = content + (str(i)*50)
170 # pad up to 4096
171 content = content + 'A'*40
172
173 rrset = dns.rrset.from_text(name,
174 3600,
175 dns.rdataclass.IN,
176 dns.rdatatype.TXT,
177 content)
178 response.answer.append(rrset)
179 self.assertEquals(len(response.to_wire()), 4096)
180
181 # first query to fill the cache
182 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
183 self.sendTCPQueryOverConnection(conn, query, response=response)
184 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
185
186 self.assertTrue(receivedQuery)
187 self.assertTrue(receivedResponse)
188 receivedQuery.id = expectedQuery.id
189 self.assertEquals(expectedQuery, receivedQuery)
190 self.checkQueryNoEDNS(expectedQuery, receivedQuery)
191 self.assertEquals(response, receivedResponse)
192
193 for _ in range(numberOfQueries):
194 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
195 self.sendTCPQueryOverConnection(conn, query, response=None)
196 receivedResponse = self.recvTCPResponseOverConnection(conn, useQueue=False)
197 self.assertEquals(receivedResponse, response)