]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOQ.py
Merge pull request #13362 from omoerbeek/rec-convert-api-config
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOQ.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4
5 from dnsdisttests import DNSDistTest
6 from dnsdisttests import pickAvailablePort
7 from doqclient import quic_bogus_query
8 import doqclient
9
10 class TestDOQBogus(DNSDistTest):
11 _serverKey = 'server.key'
12 _serverCert = 'server.chain'
13 _serverName = 'tls.tests.dnsdist.org'
14 _caCert = 'ca.pem'
15 _doqServerPort = pickAvailablePort()
16 _config_template = """
17 newServer{address="127.0.0.1:%d"}
18
19 addDOQLocal("127.0.0.1:%d", "%s", "%s")
20 """
21 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
22 _verboseMode = True
23
24 def testDOQBogus(self):
25 """
26 DOQ: Test a bogus query (wrong packed length)
27 """
28 name = 'bogus.doq.tests.powerdns.com.'
29 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
30 query.id = 0
31 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
32 expectedQuery.id = 0
33
34 try:
35 message = quic_bogus_query(query, '127.0.0.1', 2.0, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
36 self.assertFalse(True)
37 except doqclient.StreamResetError as e :
38 self.assertEqual(e.error, 2);
39
40 class TestDOQ(DNSDistTest):
41 _serverKey = 'server.key'
42 _serverCert = 'server.chain'
43 _serverName = 'tls.tests.dnsdist.org'
44 _caCert = 'ca.pem'
45 _doqServerPort = pickAvailablePort()
46 _config_template = """
47 newServer{address="127.0.0.1:%d"}
48
49 addAction("drop.doq.tests.powerdns.com.", DropAction())
50 addAction("refused.doq.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
51 addAction("spoof.doq.tests.powerdns.com.", SpoofAction("1.2.3.4"))
52 addAction("no-backend.doq.tests.powerdns.com.", PoolAction('this-pool-has-no-backend'))
53
54 addDOQLocal("127.0.0.1:%d", "%s", "%s")
55 """
56 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
57 _verboseMode = True
58
59 def testDOQSimple(self):
60 """
61 DOQ: Simple query
62 """
63 name = 'simple.doq.tests.powerdns.com.'
64 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
65 query.id = 0
66 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
67 expectedQuery.id = 0
68 response = dns.message.make_response(query)
69 rrset = dns.rrset.from_text(name,
70 3600,
71 dns.rdataclass.IN,
72 dns.rdatatype.A,
73 '127.0.0.1')
74 response.answer.append(rrset)
75 (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName)
76 self.assertTrue(receivedQuery)
77 self.assertTrue(receivedResponse)
78 receivedQuery.id = expectedQuery.id
79 self.assertEqual(expectedQuery, receivedQuery)
80
81 def testDOQMultipleStreams(self):
82 """
83 DOQ: Test multiple queries using the same connection
84 """
85
86 name = 'simple.doq.tests.powerdns.com.'
87 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
88 query.id = 0
89 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
90 expectedQuery.id = 0
91 response = dns.message.make_response(query)
92 rrset = dns.rrset.from_text(name,
93 3600,
94 dns.rdataclass.IN,
95 dns.rdatatype.A,
96 '127.0.0.1')
97 response.answer.append(rrset)
98
99 connection = self.getDOQConnection(self._doqServerPort, self._caCert)
100
101 (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName, connection=connection)
102 self.assertTrue(receivedQuery)
103 self.assertTrue(receivedResponse)
104 receivedQuery.id = expectedQuery.id
105 self.assertEqual(expectedQuery, receivedQuery)
106
107 (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName, connection=connection)
108 self.assertTrue(receivedQuery)
109 self.assertTrue(receivedResponse)
110 receivedQuery.id = expectedQuery.id
111 self.assertEqual(expectedQuery, receivedQuery)
112
113 def testDropped(self):
114 """
115 DOQ: Dropped query
116 """
117 name = 'drop.doq.tests.powerdns.com.'
118 query = dns.message.make_query(name, 'A', 'IN')
119 dropped = False
120 try:
121 (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
122 self.assertTrue(False)
123 except doqclient.StreamResetError as e :
124 self.assertEqual(e.error, 5);
125
126 def testRefused(self):
127 """
128 DOQ: Refused
129 """
130 name = 'refused.doq.tests.powerdns.com.'
131 query = dns.message.make_query(name, 'A', 'IN')
132 query.id = 0
133 query.flags &= ~dns.flags.RD
134 expectedResponse = dns.message.make_response(query)
135 expectedResponse.set_rcode(dns.rcode.REFUSED)
136
137 (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
138 self.assertEqual(receivedResponse, expectedResponse)
139
140 def testSpoof(self):
141 """
142 DOQ: Spoofed
143 """
144 name = 'spoof.doq.tests.powerdns.com.'
145 query = dns.message.make_query(name, 'A', 'IN')
146 query.id = 0
147 query.flags &= ~dns.flags.RD
148 expectedResponse = dns.message.make_response(query)
149 rrset = dns.rrset.from_text(name,
150 3600,
151 dns.rdataclass.IN,
152 dns.rdatatype.A,
153 '1.2.3.4')
154 expectedResponse.answer.append(rrset)
155
156 (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
157 self.assertEqual(receivedResponse, expectedResponse)
158
159 def testDOQNoBackend(self):
160 """
161 DOQ: No backend
162 """
163 name = 'no-backend.doq.tests.powerdns.com.'
164 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
165 dropped = False
166 try:
167 (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
168 self.assertTrue(False)
169 except doqclient.StreamResetError as e :
170 self.assertEqual(e.error, 5);
171
172 class TestDOQWithCache(DNSDistTest):
173 _serverKey = 'server.key'
174 _serverCert = 'server.chain'
175 _serverName = 'tls.tests.dnsdist.org'
176 _caCert = 'ca.pem'
177 _doqServerPort = pickAvailablePort()
178 _config_template = """
179 newServer{address="127.0.0.1:%d"}
180
181 addDOQLocal("127.0.0.1:%d", "%s", "%s")
182
183 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
184 getPool(""):setCache(pc)
185 """
186 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
187 _verboseMode = True
188
189 def testCached(self):
190 """
191 Cache: Served from cache
192
193 dnsdist is configured to cache entries, we are sending several
194 identical requests and checking that the backend only receive
195 the first one.
196 """
197 numberOfQueries = 10
198 name = 'cached.cache.tests.powerdns.com.'
199 query = dns.message.make_query(name, 'AAAA', 'IN')
200 query.id = 0
201 response = dns.message.make_response(query)
202 rrset = dns.rrset.from_text(name,
203 3600,
204 dns.rdataclass.IN,
205 dns.rdatatype.AAAA,
206 '::1')
207 response.answer.append(rrset)
208
209 # first query to fill the cache
210 (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName)
211 self.assertTrue(receivedQuery)
212 self.assertTrue(receivedResponse)
213 receivedQuery.id = query.id
214 self.assertEqual(query, receivedQuery)
215 self.assertEqual(receivedResponse, response)
216
217 for _ in range(numberOfQueries):
218 (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
219 self.assertEqual(receivedResponse, response)
220
221 total = 0
222 for key in self._responsesCounter:
223 total += self._responsesCounter[key]
224 TestDOQWithCache._responsesCounter[key] = 0
225
226 self.assertEqual(total, 1)