]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_DOQ.py
dnsdist: enable doh3 in our CI
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOQ.py
CommitLineData
3dc49a89
CHB
1#!/usr/bin/env python
2import dns
3import clientsubnetoption
4
5from dnsdisttests import DNSDistTest
6from dnsdisttests import pickAvailablePort
e7000cce
CHB
7from doqclient import quic_bogus_query
8import doqclient
9
10class TestDOQBogus(DNSDistTest):
11 _serverKey = 'server.key'
12 _serverCert = 'server.chain'
13 _serverName = 'tls.tests.dnsdist.org'
14 _caCert = 'ca.pem'
15 _doqServerPort = pickAvailablePort()
16 _config_template = """
17 newServer{address="127.0.0.1:%d"}
18
19 addDOQLocal("127.0.0.1:%d", "%s", "%s")
20 """
21 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
22 _verboseMode = True
23
24 def testDOQBogus(self):
25 """
26 DOQ: Test a bogus query (wrong packed length)
27 """
28 name = 'bogus.doq.tests.powerdns.com.'
29 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
30 query.id = 0
31 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
32 expectedQuery.id = 0
33
34 try:
35 message = quic_bogus_query(query, '127.0.0.1', 2.0, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
36 self.assertFalse(True)
37 except doqclient.StreamResetError as e :
38 self.assertEqual(e.error, 2);
3dc49a89
CHB
39
40class TestDOQ(DNSDistTest):
41 _serverKey = 'server.key'
42 _serverCert = 'server.chain'
0a6676a4 43 _serverName = 'tls.tests.dnsdist.org'
3dc49a89 44 _caCert = 'ca.pem'
8225acad 45 _doqServerPort = pickAvailablePort()
3dc49a89
CHB
46 _config_template = """
47 newServer{address="127.0.0.1:%d"}
48
49 addAction("drop.doq.tests.powerdns.com.", DropAction())
50 addAction("refused.doq.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
51 addAction("spoof.doq.tests.powerdns.com.", SpoofAction("1.2.3.4"))
52 addAction("no-backend.doq.tests.powerdns.com.", PoolAction('this-pool-has-no-backend'))
53
54 addDOQLocal("127.0.0.1:%d", "%s", "%s")
55 """
56 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
57 _verboseMode = True
58
59 def testDOQSimple(self):
60 """
61 DOQ: Simple query
62 """
63 name = 'simple.doq.tests.powerdns.com.'
64 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
65 query.id = 0
66 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
67 expectedQuery.id = 0
68 response = dns.message.make_response(query)
69 rrset = dns.rrset.from_text(name,
70 3600,
71 dns.rdataclass.IN,
72 dns.rdatatype.A,
73 '127.0.0.1')
74 response.answer.append(rrset)
0a6676a4 75 (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName)
3dc49a89
CHB
76 self.assertTrue(receivedQuery)
77 self.assertTrue(receivedResponse)
78 receivedQuery.id = expectedQuery.id
79 self.assertEqual(expectedQuery, receivedQuery)
80
81 def testDOQMultipleStreams(self):
82 """
83 DOQ: Test multiple queries using the same connection
84 """
85
86 name = 'simple.doq.tests.powerdns.com.'
87 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
88 query.id = 0
89 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
90 expectedQuery.id = 0
91 response = dns.message.make_response(query)
92 rrset = dns.rrset.from_text(name,
93 3600,
94 dns.rdataclass.IN,
95 dns.rdatatype.A,
96 '127.0.0.1')
97 response.answer.append(rrset)
98
0a6676a4 99 connection = self.getDOQConnection(self._doqServerPort, self._caCert)
3dc49a89 100
0a6676a4 101 (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName, connection=connection)
3dc49a89
CHB
102 self.assertTrue(receivedQuery)
103 self.assertTrue(receivedResponse)
104 receivedQuery.id = expectedQuery.id
105 self.assertEqual(expectedQuery, receivedQuery)
106
0a6676a4 107 (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName, connection=connection)
3dc49a89
CHB
108 self.assertTrue(receivedQuery)
109 self.assertTrue(receivedResponse)
110 receivedQuery.id = expectedQuery.id
111 self.assertEqual(expectedQuery, receivedQuery)
112
113 def testDropped(self):
114 """
115 DOQ: Dropped query
116 """
117 name = 'drop.doq.tests.powerdns.com.'
118 query = dns.message.make_query(name, 'A', 'IN')
119 dropped = False
120 try:
0a6676a4 121 (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
e7000cce
CHB
122 self.assertTrue(False)
123 except doqclient.StreamResetError as e :
124 self.assertEqual(e.error, 5);
3dc49a89
CHB
125
126 def testRefused(self):
127 """
128 DOQ: Refused
129 """
130 name = 'refused.doq.tests.powerdns.com.'
131 query = dns.message.make_query(name, 'A', 'IN')
132 query.id = 0
133 query.flags &= ~dns.flags.RD
134 expectedResponse = dns.message.make_response(query)
135 expectedResponse.set_rcode(dns.rcode.REFUSED)
136
0a6676a4 137 (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
3dc49a89
CHB
138 self.assertEqual(receivedResponse, expectedResponse)
139
140 def testSpoof(self):
141 """
142 DOQ: Spoofed
143 """
144 name = 'spoof.doq.tests.powerdns.com.'
145 query = dns.message.make_query(name, 'A', 'IN')
146 query.id = 0
147 query.flags &= ~dns.flags.RD
148 expectedResponse = dns.message.make_response(query)
149 rrset = dns.rrset.from_text(name,
150 3600,
151 dns.rdataclass.IN,
152 dns.rdatatype.A,
153 '1.2.3.4')
154 expectedResponse.answer.append(rrset)
155
0a6676a4 156 (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
3dc49a89
CHB
157 self.assertEqual(receivedResponse, expectedResponse)
158
159 def testDOQNoBackend(self):
160 """
161 DOQ: No backend
162 """
163 name = 'no-backend.doq.tests.powerdns.com.'
164 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
165 dropped = False
166 try:
0a6676a4 167 (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
e7000cce
CHB
168 self.assertTrue(False)
169 except doqclient.StreamResetError as e :
170 self.assertEqual(e.error, 5);
57b57259
CHB
171
172class TestDOQWithCache(DNSDistTest):
173 _serverKey = 'server.key'
174 _serverCert = 'server.chain'
0a6676a4 175 _serverName = 'tls.tests.dnsdist.org'
57b57259 176 _caCert = 'ca.pem'
f9a95152 177 _doqServerPort = pickAvailablePort()
57b57259
CHB
178 _config_template = """
179 newServer{address="127.0.0.1:%d"}
180
181 addDOQLocal("127.0.0.1:%d", "%s", "%s")
182
183 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
184 getPool(""):setCache(pc)
185 """
186 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
187 _verboseMode = True
188
189 def testCached(self):
190 """
191 Cache: Served from cache
192
193 dnsdist is configured to cache entries, we are sending several
194 identical requests and checking that the backend only receive
195 the first one.
196 """
197 numberOfQueries = 10
198 name = 'cached.cache.tests.powerdns.com.'
199 query = dns.message.make_query(name, 'AAAA', 'IN')
200 query.id = 0
201 response = dns.message.make_response(query)
202 rrset = dns.rrset.from_text(name,
203 3600,
204 dns.rdataclass.IN,
205 dns.rdatatype.AAAA,
206 '::1')
207 response.answer.append(rrset)
208
209 # first query to fill the cache
0a6676a4 210 (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, serverName=self._serverName)
57b57259
CHB
211 self.assertTrue(receivedQuery)
212 self.assertTrue(receivedResponse)
213 receivedQuery.id = query.id
214 self.assertEqual(query, receivedQuery)
215 self.assertEqual(receivedResponse, response)
216
217 for _ in range(numberOfQueries):
0a6676a4 218 (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, query, response=None, caFile=self._caCert, useQueue=False, serverName=self._serverName)
57b57259
CHB
219 self.assertEqual(receivedResponse, response)
220
221 total = 0
222 for key in self._responsesCounter:
223 total += self._responsesCounter[key]
224 TestDOQWithCache._responsesCounter[key] = 0
225
226 self.assertEqual(total, 1)