]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_EDNSBufferSize.py
SuffixMatchTree: Fix the removal of the root
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_EDNSBufferSize.py
1 import dns
2 import os
3 import socket
4 import struct
5 import threading
6 import time
7
8 from recursortests import RecursorTest
9 from twisted.internet.protocol import DatagramProtocol
10 from twisted.internet import reactor
11
12 ednsBufferReactorRunning = False
13
14 class EDNSBufferTest(RecursorTest):
15 """
16 The tests derived from this one test several truncation related issues.
17 As an overview, this is what can be tested:
18
19 udp-truncation-threshold edns-outgoing-bufsize
20 | |
21 +------+ v +----------+ v +------------+
22 | stub | <=========> | recursor | <===================> | responders |
23 +------+ +----------+ +------------+
24 ^
25 |
26 client bufsize (stub => recursor)
27 bufsize to client (recursor => stub)
28
29 The subclasses will test the following scenarios:
30
31 test | udp-trunc | edns-outgoing | client bufsize | response size | result to client | bufsize to client |
32 -----+-----------+---------------+----------------+-----------------+------------------+-------------------+
33 01 | 1680 | 1680 | 4096 | 1680 (inc EDNS) | 1680 (inc EDNS) | 1680 |
34 02 | 1680 | 1680 | 1679 | 1680 (inc EDNS) | TC (+EDNS) | 1680 |
35 03 | 1680 | 1681 | 4096 | 1681 (inc EDNS) | TC (+EDNS) | 1680 |
36 04 | 1680 | 1679 | 4096 | 1679 (inc EDNS) | 1679 (inc EDNS) | 1680 |
37 05 | 1680 | 1680 | 1680 | 1680 (inc EDNS) | 1680 (inc EDNS) | 1680 |
38 06 | 1680 | 1680 | 512 (No EDNS) | 512 (+EDNS) | 512 (no EDNS) | (no EDNS) |
39 07 | 1680 | 1680 | 512 (No EDNS) | 513 (+EDNS) | TC (no EDNS) | (no EDNS) |
40 08 | 1680 | 1680 | 511 | 501 (+EDNS) | 512 (inc EDNS) | 1680 |
41
42 The qname is $testnum.edns-tests.example.
43 """
44 _confdir = 'EDNSBuffer'
45 _udpTruncationThreshold = 1680
46 _ednsOutgoingBufsize = 1680
47 _qnameSuffix = '.edns-tests.example.'
48
49 _config_template = """
50 forward-zones=edns-tests.example=%s.22
51 udp-truncation-threshold=%d
52 edns-outgoing-bufsize=%d
53 """ % (os.environ['PREFIX'], _udpTruncationThreshold, _ednsOutgoingBufsize)
54
55 @classmethod
56 def startResponders(cls):
57 global ednsBufferReactorRunning
58 print("Launching responders..")
59
60 address = cls._PREFIX + '.22'
61 port = 53
62
63 if not ednsBufferReactorRunning:
64 reactor.listenUDP(port, UDPLargeResponder(), interface=address)
65 ednsBufferReactorRunning = True
66
67 if not reactor.running:
68 cls._UDPResponder = threading.Thread(
69 name='UDP Responder', target=reactor.run, args=(False,))
70 cls._UDPResponder.setDaemon(True)
71 cls._UDPResponder.start()
72
73 def getMessage(self, testnum, payload=0):
74 do_edns = payload > 0
75 return dns.message.make_query(testnum + self._qnameSuffix, 'TXT', 'IN',
76 use_edns=do_edns, payload=payload)
77
78 def checkResponseContent(self, rawResponse, value, size, txt_final):
79 """
80 Tests the rawResponse (bytes that represent the DNS packet) has size
81 number of bytes. And that the content of all TXT records is of value
82 and has total_txt_val characters.
83 """
84 response = dns.message.from_wire(rawResponse)
85
86 self.assertEquals(len(rawResponse), size)
87 self.assertRcodeEqual(response, dns.rcode.NOERROR)
88
89 self.assertMessageHasFlags(response, ['QR', 'RD', 'RA'])
90
91 for record in response.answer:
92 self.assertEquals(record.rdtype, dns.rdatatype.TXT)
93 for part in record:
94 for string in part.strings:
95 self.assertTrue(len(string) == 255 or
96 len(string) == txt_final)
97
98 def checkTruncatedResponse(self, message):
99 self.assertMessageHasFlags(message, ['QR', 'RD', 'RA', 'TC'])
100
101 def checkEDNS(self, message, bufsize=0):
102 """
103 Checks that the DNSMessage message has EDNS if bufsize > 0 and that
104 the buffer size is correct.
105 """
106 if bufsize > 0:
107 self.assertEqual(message.edns, 0)
108 self.assertEqual(message.payload, bufsize)
109 else:
110 self.assertEqual(message.edns, -1)
111
112
113 class EDNSBufferTest16801680(EDNSBufferTest):
114 """
115 Runs test cases 1, 2, 5, 6, 7, 8
116 """
117
118 def testEdnsBufferTestCase01(self):
119 query = self.getMessage('01', 4096)
120 for _ in range(10):
121 raw = self.sendUDPQuery(query, decode=False)
122 self.checkResponseContent(raw, 'A',
123 self._udpTruncationThreshold, 9)
124 message = dns.message.from_wire(raw)
125 self.checkEDNS(message, 512)
126
127 def testEdnsBufferTestCase02(self):
128 query = self.getMessage('02', 1679)
129 for _ in range(10):
130 message = self.sendUDPQuery(query)
131 self.checkTruncatedResponse(message)
132 self.checkEDNS(message, 512)
133
134 def testEdnsBufferTestCase05(self):
135 query = self.getMessage('05', 1680)
136 for _ in range(10):
137 raw = self.sendUDPQuery(query, decode=False)
138 self.checkResponseContent(raw, 'E',
139 self._udpTruncationThreshold, 9)
140 message = dns.message.from_wire(raw)
141 self.checkEDNS(message, 512)
142
143 def testEdnsBufferTestCase06(self):
144 query = self.getMessage('06', 0)
145 for _ in range(10):
146 raw = self.sendUDPQuery(query, decode=False)
147 self.checkResponseContent(raw, 'F', 512, 192)
148 message = dns.message.from_wire(raw)
149 self.checkEDNS(message, 0)
150
151 def testEdnsBufferTestCase07(self):
152 query = self.getMessage('07', 0)
153 for _ in range(10):
154 message = self.sendUDPQuery(query)
155 self.checkTruncatedResponse(message)
156 self.checkEDNS(message, 0)
157
158 def testEdnsBufferTestCase08(self):
159 query = self.getMessage('08', 511)
160 for _ in range(10):
161 raw = self.sendUDPQuery(query, decode=False)
162 self.checkResponseContent(raw, 'H', 512, 181)
163 message = dns.message.from_wire(raw)
164 self.checkEDNS(message, 512)
165
166 class EDNSBufferTest16801681(EDNSBufferTest):
167 """
168 Runs test case 3
169 """
170 _confdir = 'EDNSBuffer16801681'
171 _udpTruncationThreshold = 1680
172 _ednsOutgoingBufsize = 1681
173 _qnameSuffix = '.edns-tests.example.'
174
175 _config_template = """
176 forward-zones=edns-tests.example=%s.22
177 udp-truncation-threshold=%d
178 edns-outgoing-bufsize=%d
179 """ % (os.environ['PREFIX'], _udpTruncationThreshold, _ednsOutgoingBufsize)
180
181 def testEdnsBufferTestCase03(self):
182 query = self.getMessage('03', 4096)
183 for _ in range(10):
184 message = self.sendUDPQuery(query)
185 self.checkTruncatedResponse(message)
186 self.checkEDNS(message, 512)
187
188
189 class EDNSBufferTest16801679(EDNSBufferTest):
190 """
191 Runs test case 4
192 """
193 _confdir = 'EDNSBuffer16801679'
194 _udpTruncationThreshold = 1680
195 _ednsOutgoingBufsize = 1679
196 _qnameSuffix = '.edns-tests.example.'
197
198 _config_template = """
199 forward-zones=edns-tests.example=%s.22
200 udp-truncation-threshold=%d
201 edns-outgoing-bufsize=%d
202 """ % (os.environ['PREFIX'], _udpTruncationThreshold, _ednsOutgoingBufsize)
203
204 def testEdnsBufferTestCase04(self):
205 query = self.getMessage('04', 4096)
206 for _ in range(10):
207 raw = self.sendUDPQuery(query, decode=False)
208 self.checkResponseContent(raw, 'D',
209 self._ednsOutgoingBufsize, 8)
210 message = dns.message.from_wire(raw)
211 self.checkEDNS(message, 512)
212
213
214 class UDPLargeResponder(DatagramProtocol):
215 def datagramReceived(self, datagram, address):
216 request = dns.message.from_wire(datagram)
217 # The outgoing packet should be EDNS buffersize bytes
218 packet_size = request.payload
219
220 testnum = int(str(request.question[0].name).split('.')[0])
221
222 # Unless we have special tests
223 if testnum == 6:
224 packet_size = 512 + 11
225 if testnum == 7:
226 packet_size = 513 + 11
227 if testnum == 8:
228 packet_size = 501 + 11
229
230 # An EDNS(0) RR without options is 11 bytes:
231 # NAME: 1
232 # TYPE: 2
233 # CLASS: 2
234 # TTL: 4
235 # RDLEN: 2
236 # RDATA: 0
237 packet_size -= 11
238
239 # But the header also counts, which is 12 bytes
240 packet_size -= 12
241
242 # The packet has a question section
243 packet_size -= 27
244
245 # Make the response
246 response = dns.message.make_response(request)
247 # This is an authoritative answer
248 response.flags |= dns.flags.AA
249 # We pretend to do EDNS with a 4096 buffer size
250 response.edns = 0
251 response.payload = 4096
252
253 # What we use to fill the TXT records
254 # Test number + 64, so 01 = 'A', 02 = 'B' etc...
255 value = chr(testnum + 64)
256
257 # Each pre-RDATA answer RR is 12 bytes
258 # NAME: 2 (ptr to begin of packet, 0xC00C)
259 # TYPE: 2
260 # CLASS: 2
261 # TTL: 4
262 # RDLEN: 2
263 while packet_size > 0:
264 # Remove the pre-RDATA length
265 packet_size -= 12
266 # And the TXT size indicator (first byte in the TXT record)
267 packet_size -= 1
268 txt_size = min(packet_size, 255)
269 answer = dns.rrset.from_text(request.question[0].name,
270 0, dns.rdataclass.IN, 'TXT',
271 value*txt_size)
272
273 response.answer.append(answer)
274 packet_size -= txt_size
275
276 assert(packet_size == 0)
277
278 self.transport.write(response.to_wire(max_size=65535), address)