]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_EDNSBufferSize.py
Make sure we can install unsigned packages.
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_EDNSBufferSize.py
1 import dns
2 import os
3 import socket
4 import struct
5 import threading
6 import time
7
8 from recursortests import RecursorTest
9 from twisted.internet.protocol import DatagramProtocol
10 from twisted.internet import reactor
11
12 ednsBufferReactorRunning = False
13
14 class EDNSBufferTest(RecursorTest):
15 """
16 The tests derived from this one test several truncation related issues.
17 As an overview, this is what can be tested:
18
19 udp-truncation-threshold edns-outgoing-bufsize
20 | |
21 +------+ v +----------+ v +------------+
22 | stub | <=========> | recursor | <===================> | responders |
23 +------+ +----------+ +------------+
24 ^
25 |
26 client bufsize (stub => recursor)
27 bufsize to client (recursor => stub)
28
29 The subclasses will test the following scenarios:
30
31 test | udp-trunc | edns-outgoing | client bufsize | response size | result to client | bufsize to client |
32 -----+-----------+---------------+----------------+-----------------+------------------+-------------------+
33 01 | 1680 | 1680 | 4096 | 1680 (inc EDNS) | 1680 (inc EDNS) | 1680 |
34 02 | 1680 | 1680 | 1679 | 1680 (inc EDNS) | TC (+EDNS) | 1680 |
35 03 | 1680 | 1681 | 4096 | 1681 (inc EDNS) | TC (+EDNS) | 1680 |
36 04 | 1680 | 1679 | 4096 | 1679 (inc EDNS) | 1679 (inc EDNS) | 1680 |
37 05 | 1680 | 1680 | 1680 | 1680 (inc EDNS) | 1680 (inc EDNS) | 1680 |
38 06 | 1680 | 1680 | 512 (No EDNS) | 512 (+EDNS) | 512 (no EDNS) | (no EDNS) |
39 07 | 1680 | 1680 | 512 (No EDNS) | 513 (+EDNS) | TC (no EDNS) | (no EDNS) |
40 08 | 1680 | 1680 | 511 | 501 (+EDNS) | 512 (inc EDNS) | 1680 |
41
42 The qname is $testnum.edns-tests.example.
43 """
44 _confdir = 'EDNSBuffer'
45 _udpTruncationThreshold = 1680
46 _ednsOutgoingBufsize = 1680
47 _qnameSuffix = '.edns-tests.example.'
48
49 _config_template = """
50 qname-minimization=no
51 forward-zones=edns-tests.example=%s.22
52 udp-truncation-threshold=%d
53 edns-outgoing-bufsize=%d
54 """ % (os.environ['PREFIX'], _udpTruncationThreshold, _ednsOutgoingBufsize)
55
56 @classmethod
57 def startResponders(cls):
58 global ednsBufferReactorRunning
59 print("Launching responders..")
60
61 address = cls._PREFIX + '.22'
62 port = 53
63
64 if not ednsBufferReactorRunning:
65 reactor.listenUDP(port, UDPLargeResponder(), interface=address)
66 ednsBufferReactorRunning = True
67
68 if not reactor.running:
69 cls._UDPResponder = threading.Thread(
70 name='UDP Responder', target=reactor.run, args=(False,))
71 cls._UDPResponder.setDaemon(True)
72 cls._UDPResponder.start()
73
74 def getMessage(self, testnum, payload=0):
75 do_edns = payload > 0
76 return dns.message.make_query(testnum + self._qnameSuffix, 'TXT', 'IN',
77 use_edns=do_edns, payload=payload)
78
79 def checkResponseContent(self, rawResponse, value, size, txt_final):
80 """
81 Tests the rawResponse (bytes that represent the DNS packet) has size
82 number of bytes. And that the content of all TXT records is of value
83 and has total_txt_val characters.
84 """
85 response = dns.message.from_wire(rawResponse)
86
87 self.assertEquals(len(rawResponse), size)
88 self.assertRcodeEqual(response, dns.rcode.NOERROR)
89
90 self.assertMessageHasFlags(response, ['QR', 'RD', 'RA'])
91
92 for record in response.answer:
93 self.assertEquals(record.rdtype, dns.rdatatype.TXT)
94 for part in record:
95 for string in part.strings:
96 self.assertTrue(len(string) == 255 or
97 len(string) == txt_final)
98
99 def checkTruncatedResponse(self, message):
100 self.assertMessageHasFlags(message, ['QR', 'RD', 'RA', 'TC'])
101
102 def checkEDNS(self, message, bufsize=0):
103 """
104 Checks that the DNSMessage message has EDNS if bufsize > 0 and that
105 the buffer size is correct.
106 """
107 if bufsize > 0:
108 self.assertEqual(message.edns, 0)
109 self.assertEqual(message.payload, bufsize)
110 else:
111 self.assertEqual(message.edns, -1)
112
113
114 class EDNSBufferTest16801680(EDNSBufferTest):
115 """
116 Runs test cases 1, 2, 5, 6, 7, 8
117 """
118
119 def testEdnsBufferTestCase01(self):
120 query = self.getMessage('01', 4096)
121 for _ in range(10):
122 raw = self.sendUDPQuery(query, decode=False)
123 self.checkResponseContent(raw, 'A',
124 self._udpTruncationThreshold, 9)
125 message = dns.message.from_wire(raw)
126 self.checkEDNS(message, 512)
127
128 def testEdnsBufferTestCase02(self):
129 query = self.getMessage('02', 1679)
130 for _ in range(10):
131 message = self.sendUDPQuery(query)
132 self.checkTruncatedResponse(message)
133 self.checkEDNS(message, 512)
134
135 def testEdnsBufferTestCase05(self):
136 query = self.getMessage('05', 1680)
137 for _ in range(10):
138 raw = self.sendUDPQuery(query, decode=False)
139 self.checkResponseContent(raw, 'E',
140 self._udpTruncationThreshold, 9)
141 message = dns.message.from_wire(raw)
142 self.checkEDNS(message, 512)
143
144 def testEdnsBufferTestCase06(self):
145 query = self.getMessage('06', 0)
146 for _ in range(10):
147 raw = self.sendUDPQuery(query, decode=False)
148 self.checkResponseContent(raw, 'F', 512, 192)
149 message = dns.message.from_wire(raw)
150 self.checkEDNS(message, 0)
151
152 def testEdnsBufferTestCase07(self):
153 query = self.getMessage('07', 0)
154 for _ in range(10):
155 message = self.sendUDPQuery(query)
156 self.checkTruncatedResponse(message)
157 self.checkEDNS(message, 0)
158
159 def testEdnsBufferTestCase08(self):
160 query = self.getMessage('08', 511)
161 for _ in range(10):
162 raw = self.sendUDPQuery(query, decode=False)
163 self.checkResponseContent(raw, 'H', 512, 181)
164 message = dns.message.from_wire(raw)
165 self.checkEDNS(message, 512)
166
167 class EDNSBufferTest16801681(EDNSBufferTest):
168 """
169 Runs test case 3
170 """
171 _confdir = 'EDNSBuffer16801681'
172 _udpTruncationThreshold = 1680
173 _ednsOutgoingBufsize = 1681
174 _qnameSuffix = '.edns-tests.example.'
175
176 _config_template = """
177 qname-minimization=no
178 forward-zones=edns-tests.example=%s.22
179 udp-truncation-threshold=%d
180 edns-outgoing-bufsize=%d
181 """ % (os.environ['PREFIX'], _udpTruncationThreshold, _ednsOutgoingBufsize)
182
183 def testEdnsBufferTestCase03(self):
184 query = self.getMessage('03', 4096)
185 for _ in range(10):
186 message = self.sendUDPQuery(query)
187 self.checkTruncatedResponse(message)
188 self.checkEDNS(message, 512)
189
190
191 class EDNSBufferTest16801679(EDNSBufferTest):
192 """
193 Runs test case 4
194 """
195 _confdir = 'EDNSBuffer16801679'
196 _udpTruncationThreshold = 1680
197 _ednsOutgoingBufsize = 1679
198 _qnameSuffix = '.edns-tests.example.'
199
200 _config_template = """
201 qname-minimization=no
202 forward-zones=edns-tests.example=%s.22
203 udp-truncation-threshold=%d
204 edns-outgoing-bufsize=%d
205 """ % (os.environ['PREFIX'], _udpTruncationThreshold, _ednsOutgoingBufsize)
206
207 def testEdnsBufferTestCase04(self):
208 query = self.getMessage('04', 4096)
209 for _ in range(10):
210 raw = self.sendUDPQuery(query, decode=False)
211 self.checkResponseContent(raw, 'D',
212 self._ednsOutgoingBufsize, 8)
213 message = dns.message.from_wire(raw)
214 self.checkEDNS(message, 512)
215
216
217 class UDPLargeResponder(DatagramProtocol):
218 def datagramReceived(self, datagram, address):
219 request = dns.message.from_wire(datagram)
220 # The outgoing packet should be EDNS buffersize bytes
221 packet_size = request.payload
222
223 testnum = int(str(request.question[0].name).split('.')[0])
224
225 # Unless we have special tests
226 if testnum == 6:
227 packet_size = 512 + 11
228 if testnum == 7:
229 packet_size = 513 + 11
230 if testnum == 8:
231 packet_size = 501 + 11
232
233 # An EDNS(0) RR without options is 11 bytes:
234 # NAME: 1
235 # TYPE: 2
236 # CLASS: 2
237 # TTL: 4
238 # RDLEN: 2
239 # RDATA: 0
240 packet_size -= 11
241
242 # But the header also counts, which is 12 bytes
243 packet_size -= 12
244
245 # The packet has a question section
246 packet_size -= 27
247
248 # Make the response
249 response = dns.message.make_response(request)
250 # This is an authoritative answer
251 response.flags |= dns.flags.AA
252 # We pretend to do EDNS with a 4096 buffer size
253 response.edns = 0
254 response.payload = 4096
255
256 # What we use to fill the TXT records
257 # Test number + 64, so 01 = 'A', 02 = 'B' etc...
258 value = chr(testnum + 64)
259
260 # Each pre-RDATA answer RR is 12 bytes
261 # NAME: 2 (ptr to begin of packet, 0xC00C)
262 # TYPE: 2
263 # CLASS: 2
264 # TTL: 4
265 # RDLEN: 2
266 while packet_size > 0:
267 # Remove the pre-RDATA length
268 packet_size -= 12
269 # And the TXT size indicator (first byte in the TXT record)
270 packet_size -= 1
271 txt_size = min(packet_size, 255)
272 answer = dns.rrset.from_text(request.question[0].name,
273 0, dns.rdataclass.IN, 'TXT',
274 value*txt_size)
275
276 response.answer.append(answer)
277 packet_size -= txt_size
278
279 assert(packet_size == 0)
280
281 self.transport.write(response.to_wire(max_size=65535), address)