]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_EDNSBufferSize.py
8 from recursortests
import RecursorTest
9 from twisted
.internet
.protocol
import DatagramProtocol
10 from twisted
.internet
import reactor
12 ednsBufferReactorRunning
= False
14 class EDNSBufferTest(RecursorTest
):
16 The tests derived from this one test several truncation related issues.
17 As an overview, this is what can be tested:
19 udp-truncation-threshold edns-outgoing-bufsize
21 +------+ v +----------+ v +------------+
22 | stub | <=========> | recursor | <===================> | responders |
23 +------+ +----------+ +------------+
26 client bufsize (stub => recursor)
27 bufsize to client (recursor => stub)
29 The subclasses will test the following scenarios:
31 test | udp-trunc | edns-outgoing | client bufsize | response size | result to client | bufsize to client |
32 -----+-----------+---------------+----------------+-----------------+------------------+-------------------+
33 01 | 1680 | 1680 | 4096 | 1680 (inc EDNS) | 1680 (inc EDNS) | 1680 |
34 02 | 1680 | 1680 | 1679 | 1680 (inc EDNS) | TC (+EDNS) | 1680 |
35 03 | 1680 | 1681 | 4096 | 1681 (inc EDNS) | TC (+EDNS) | 1680 |
36 04 | 1680 | 1679 | 4096 | 1679 (inc EDNS) | 1679 (inc EDNS) | 1680 |
37 05 | 1680 | 1680 | 1680 | 1680 (inc EDNS) | 1680 (inc EDNS) | 1680 |
38 06 | 1680 | 1680 | 512 (No EDNS) | 512 (+EDNS) | 512 (no EDNS) | (no EDNS) |
39 07 | 1680 | 1680 | 512 (No EDNS) | 513 (+EDNS) | TC (no EDNS) | (no EDNS) |
40 08 | 1680 | 1680 | 511 | 501 (+EDNS) | 512 (inc EDNS) | 1680 |
42 The qname is $testnum.edns-tests.example.
44 _confdir
= 'EDNSBuffer'
45 _udpTruncationThreshold
= 1680
46 _ednsOutgoingBufsize
= 1680
47 _qnameSuffix
= '.edns-tests.example.'
49 _config_template
= """
51 forward-zones=edns-tests.example=%s.22
52 udp-truncation-threshold=%d
53 edns-outgoing-bufsize=%d
54 """ % (os
.environ
['PREFIX'], _udpTruncationThreshold
, _ednsOutgoingBufsize
)
57 def startResponders(cls
):
58 global ednsBufferReactorRunning
59 print("Launching responders..")
61 address
= cls
._PREFIX
+ '.22'
64 if not ednsBufferReactorRunning
:
65 reactor
.listenUDP(port
, UDPLargeResponder(), interface
=address
)
66 ednsBufferReactorRunning
= True
68 if not reactor
.running
:
69 cls
._UDPResponder
= threading
.Thread(
70 name
='UDP Responder', target
=reactor
.run
, args
=(False,))
71 cls
._UDPResponder
.setDaemon(True)
72 cls
._UDPResponder
.start()
74 def getMessage(self
, testnum
, payload
=0):
76 return dns
.message
.make_query(testnum
+ self
._qnameSuffix
, 'TXT', 'IN',
77 use_edns
=do_edns
, payload
=payload
)
79 def checkResponseContent(self
, rawResponse
, value
, size
, txt_final
):
81 Tests the rawResponse (bytes that represent the DNS packet) has size
82 number of bytes. And that the content of all TXT records is of value
83 and has total_txt_val characters.
85 response
= dns
.message
.from_wire(rawResponse
)
87 self
.assertEquals(len(rawResponse
), size
)
88 self
.assertRcodeEqual(response
, dns
.rcode
.NOERROR
)
90 self
.assertMessageHasFlags(response
, ['QR', 'RD', 'RA'])
92 for record
in response
.answer
:
93 self
.assertEquals(record
.rdtype
, dns
.rdatatype
.TXT
)
95 for string
in part
.strings
:
96 self
.assertTrue(len(string
) == 255 or
97 len(string
) == txt_final
)
99 def checkTruncatedResponse(self
, message
):
100 self
.assertMessageHasFlags(message
, ['QR', 'RD', 'RA', 'TC'])
102 def checkEDNS(self
, message
, bufsize
=0):
104 Checks that the DNSMessage message has EDNS if bufsize > 0 and that
105 the buffer size is correct.
108 self
.assertEqual(message
.edns
, 0)
109 self
.assertEqual(message
.payload
, bufsize
)
111 self
.assertEqual(message
.edns
, -1)
114 class EDNSBufferTest16801680(EDNSBufferTest
):
116 Runs test cases 1, 2, 5, 6, 7, 8
119 def testEdnsBufferTestCase01(self
):
120 query
= self
.getMessage('01', 4096)
122 raw
= self
.sendUDPQuery(query
, decode
=False)
123 self
.checkResponseContent(raw
, 'A',
124 self
._udpTruncationThreshold
, 9)
125 message
= dns
.message
.from_wire(raw
)
126 self
.checkEDNS(message
, 512)
128 def testEdnsBufferTestCase02(self
):
129 query
= self
.getMessage('02', 1679)
131 message
= self
.sendUDPQuery(query
)
132 self
.checkTruncatedResponse(message
)
133 self
.checkEDNS(message
, 512)
135 def testEdnsBufferTestCase05(self
):
136 query
= self
.getMessage('05', 1680)
138 raw
= self
.sendUDPQuery(query
, decode
=False)
139 self
.checkResponseContent(raw
, 'E',
140 self
._udpTruncationThreshold
, 9)
141 message
= dns
.message
.from_wire(raw
)
142 self
.checkEDNS(message
, 512)
144 def testEdnsBufferTestCase06(self
):
145 query
= self
.getMessage('06', 0)
147 raw
= self
.sendUDPQuery(query
, decode
=False)
148 self
.checkResponseContent(raw
, 'F', 512, 192)
149 message
= dns
.message
.from_wire(raw
)
150 self
.checkEDNS(message
, 0)
152 def testEdnsBufferTestCase07(self
):
153 query
= self
.getMessage('07', 0)
155 message
= self
.sendUDPQuery(query
)
156 self
.checkTruncatedResponse(message
)
157 self
.checkEDNS(message
, 0)
159 def testEdnsBufferTestCase08(self
):
160 query
= self
.getMessage('08', 511)
162 raw
= self
.sendUDPQuery(query
, decode
=False)
163 self
.checkResponseContent(raw
, 'H', 512, 181)
164 message
= dns
.message
.from_wire(raw
)
165 self
.checkEDNS(message
, 512)
167 class EDNSBufferTest16801681(EDNSBufferTest
):
171 _confdir
= 'EDNSBuffer16801681'
172 _udpTruncationThreshold
= 1680
173 _ednsOutgoingBufsize
= 1681
174 _qnameSuffix
= '.edns-tests.example.'
176 _config_template
= """
177 qname-minimization=no
178 forward-zones=edns-tests.example=%s.22
179 udp-truncation-threshold=%d
180 edns-outgoing-bufsize=%d
181 """ % (os
.environ
['PREFIX'], _udpTruncationThreshold
, _ednsOutgoingBufsize
)
183 def testEdnsBufferTestCase03(self
):
184 query
= self
.getMessage('03', 4096)
186 message
= self
.sendUDPQuery(query
)
187 self
.checkTruncatedResponse(message
)
188 self
.checkEDNS(message
, 512)
191 class EDNSBufferTest16801679(EDNSBufferTest
):
195 _confdir
= 'EDNSBuffer16801679'
196 _udpTruncationThreshold
= 1680
197 _ednsOutgoingBufsize
= 1679
198 _qnameSuffix
= '.edns-tests.example.'
200 _config_template
= """
201 qname-minimization=no
202 forward-zones=edns-tests.example=%s.22
203 udp-truncation-threshold=%d
204 edns-outgoing-bufsize=%d
205 """ % (os
.environ
['PREFIX'], _udpTruncationThreshold
, _ednsOutgoingBufsize
)
207 def testEdnsBufferTestCase04(self
):
208 query
= self
.getMessage('04', 4096)
210 raw
= self
.sendUDPQuery(query
, decode
=False)
211 self
.checkResponseContent(raw
, 'D',
212 self
._ednsOutgoingBufsize
, 8)
213 message
= dns
.message
.from_wire(raw
)
214 self
.checkEDNS(message
, 512)
217 class UDPLargeResponder(DatagramProtocol
):
218 def datagramReceived(self
, datagram
, address
):
219 request
= dns
.message
.from_wire(datagram
)
220 # The outgoing packet should be EDNS buffersize bytes
221 packet_size
= request
.payload
223 testnum
= int(str(request
.question
[0].name
).split('.')[0])
225 # Unless we have special tests
227 packet_size
= 512 + 11
229 packet_size
= 513 + 11
231 packet_size
= 501 + 11
233 # An EDNS(0) RR without options is 11 bytes:
242 # But the header also counts, which is 12 bytes
245 # The packet has a question section
249 response
= dns
.message
.make_response(request
)
250 # This is an authoritative answer
251 response
.flags |
= dns
.flags
.AA
252 # We pretend to do EDNS with a 4096 buffer size
254 response
.payload
= 4096
256 # What we use to fill the TXT records
257 # Test number + 64, so 01 = 'A', 02 = 'B' etc...
258 value
= chr(testnum
+ 64)
260 # Each pre-RDATA answer RR is 12 bytes
261 # NAME: 2 (ptr to begin of packet, 0xC00C)
266 while packet_size
> 0:
267 # Remove the pre-RDATA length
269 # And the TXT size indicator (first byte in the TXT record)
271 txt_size
= min(packet_size
, 255)
272 answer
= dns
.rrset
.from_text(request
.question
[0].name
,
273 0, dns
.rdataclass
.IN
, 'TXT',
276 response
.answer
.append(answer
)
277 packet_size
-= txt_size
279 assert(packet_size
== 0)
281 self
.transport
.write(response
.to_wire(max_size
=65535), address
)