]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/quictests.py
dnsdist: doq,doh3 make sure we enforce any ACL
[thirdparty/pdns.git] / regression-tests.dnsdist / quictests.py
CommitLineData
ac70190e
RG
1#!/usr/bin/env python
2
3import dns
4from doqclient import StreamResetError
5
6class QUICTests(object):
7
8 def testQUICSimple(self):
9 """
10 QUIC: Simple query
11 """
12 name = 'simple.doq.tests.powerdns.com.'
13 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
14 query.id = 0
15 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
16 expectedQuery.id = 0
17 response = dns.message.make_response(query)
18 rrset = dns.rrset.from_text(name,
19 3600,
20 dns.rdataclass.IN,
21 dns.rdatatype.A,
22 '127.0.0.1')
23 response.answer.append(rrset)
24 (receivedQuery, receivedResponse) = self.sendQUICQuery(query, response=response)
25 self.assertTrue(receivedQuery)
26 self.assertTrue(receivedResponse)
27 receivedQuery.id = expectedQuery.id
28 self.assertEqual(expectedQuery, receivedQuery)
29 self.assertEqual(receivedResponse, response)
30
31 def testQUICMultipleStreams(self):
32 """
33 QUIC: Test multiple queries using the same connection
34 """
35 name = 'simple.doq.tests.powerdns.com.'
36 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
37 query.id = 0
38 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
39 expectedQuery.id = 0
40 response = dns.message.make_response(query)
41 rrset = dns.rrset.from_text(name,
42 3600,
43 dns.rdataclass.IN,
44 dns.rdatatype.A,
45 '127.0.0.1')
46 response.answer.append(rrset)
47
48 connection = self.getQUICConnection()
49
50 (receivedQuery, receivedResponse) = self.sendQUICQuery(query, response=response, connection=connection)
51 self.assertTrue(receivedQuery)
52 self.assertTrue(receivedResponse)
53 receivedQuery.id = expectedQuery.id
54 self.assertEqual(expectedQuery, receivedQuery)
55
56 (receivedQuery, receivedResponse) = self.sendQUICQuery(query, response=response, connection=connection)
57 self.assertTrue(receivedQuery)
58 self.assertTrue(receivedResponse)
59 receivedQuery.id = expectedQuery.id
60 self.assertEqual(expectedQuery, receivedQuery)
61
62 def testDropped(self):
63 """
64 QUIC: Dropped query
65 """
66 name = 'drop.doq.tests.powerdns.com.'
67 query = dns.message.make_query(name, 'A', 'IN')
68 dropped = False
69 try:
70 (_, receivedResponse) = self.sendQUICQuery(query, response=None, useQueue=False)
71 self.assertTrue(False)
72 except StreamResetError as e:
73 self.assertEqual(e.error, 5);
74
75 def testRefused(self):
76 """
77 QUIC: Refused
78 """
79 name = 'refused.doq.tests.powerdns.com.'
80 query = dns.message.make_query(name, 'A', 'IN')
81 query.id = 0
82 query.flags &= ~dns.flags.RD
83 expectedResponse = dns.message.make_response(query)
84 expectedResponse.set_rcode(dns.rcode.REFUSED)
85
86 (_, receivedResponse) = self.sendQUICQuery(query, response=None, useQueue=False)
87 self.assertEqual(receivedResponse, expectedResponse)
88
89 def testSpoof(self):
90 """
91 QUIC: Spoofed
92 """
93 name = 'spoof.doq.tests.powerdns.com.'
94 query = dns.message.make_query(name, 'A', 'IN')
95 query.id = 0
96 query.flags &= ~dns.flags.RD
97 expectedResponse = dns.message.make_response(query)
98 rrset = dns.rrset.from_text(name,
99 3600,
100 dns.rdataclass.IN,
101 dns.rdatatype.A,
102 '1.2.3.4')
103 expectedResponse.answer.append(rrset)
104
105 (_, receivedResponse) = self.sendQUICQuery(query, response=None, useQueue=False)
106 self.assertEqual(receivedResponse, expectedResponse)
107
108 def testQUICNoBackend(self):
109 """
110 QUIC: No backend
111 """
112 name = 'no-backend.doq.tests.powerdns.com.'
113 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
114 dropped = False
115 try:
116 (_, receivedResponse) = self.sendQUICQuery(query, response=None, useQueue=False)
117 self.assertTrue(False)
118 except StreamResetError as e :
119 self.assertEqual(e.error, 5);
120
2aaf9ecd
CHB
121class QUICACLTests(object):
122
123 def testDropped(self):
124 """
125 QUIC: Dropped query because of ACL
126 """
127 name = 'acl.doq.tests.powerdns.com.'
128 query = dns.message.make_query(name, 'A', 'IN')
129 dropped = False
130 try:
131 (_, receivedResponse) = self.sendQUICQuery(query, response=None, useQueue=False)
132 self.assertTrue(False)
133 except StreamResetError as e:
134 self.assertEqual(e.error, 5);
135 dropped = True
136 self.assertTrue(dropped)
137
ac70190e
RG
138class QUICWithCacheTests(object):
139 def testCached(self):
140 """
141 QUIC Cache: Served from cache
142 """
143 numberOfQueries = 10
144 name = 'cached.quic.tests.powerdns.com.'
145 query = dns.message.make_query(name, 'AAAA', 'IN')
146 query.id = 0
147 response = dns.message.make_response(query)
148 rrset = dns.rrset.from_text(name,
149 3600,
150 dns.rdataclass.IN,
151 dns.rdatatype.AAAA,
152 '::1')
153 response.answer.append(rrset)
154
155 # first query to fill the cache
156 (receivedQuery, receivedResponse) = self.sendQUICQuery(query, response=response)
157 self.assertTrue(receivedQuery)
158 self.assertTrue(receivedResponse)
159 receivedQuery.id = query.id
160 self.assertEqual(query, receivedQuery)
161 self.assertEqual(receivedResponse, response)
162
163 for _ in range(numberOfQueries):
164 (_, receivedResponse) = self.sendQUICQuery(query, response=None, useQueue=False)
165 self.assertEqual(receivedResponse, response)
166
167 total = 0
168 for key in self._responsesCounter:
169 total += self._responsesCounter[key]
170
171 self.assertEqual(total, 1)