]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_EDNSBufferSize.py
8 from recursortests
import RecursorTest
9 from twisted
.internet
.protocol
import DatagramProtocol
10 from twisted
.internet
import reactor
12 ednsBufferReactorRunning
= False
14 class EDNSBufferTest(RecursorTest
):
16 The tests derived from this one test several truncation related issues.
17 As an overview, this is what can be tested:
19 udp-truncation-threshold edns-outgoing-bufsize
21 +------+ v +----------+ v +------------+
22 | stub | <=========> | recursor | <===================> | responders |
23 +------+ +----------+ +------------+
26 client bufsize (stub => recursor)
27 bufsize to client (recursor => stub)
29 The subclasses will test the following scenarios:
31 test | udp-trunc | edns-outgoing | client bufsize | response size | result to client | bufsize to client |
32 -----+-----------+---------------+----------------+-----------------+------------------+-------------------+
33 01 | 1680 | 1680 | 4096 | 1680 (inc EDNS) | 1680 (inc EDNS) | 1680 |
34 02 | 1680 | 1680 | 1679 | 1680 (inc EDNS) | TC (+EDNS) | 1680 |
35 03 | 1680 | 1681 | 4096 | 1681 (inc EDNS) | TC (+EDNS) | 1680 |
36 04 | 1680 | 1679 | 4096 | 1679 (inc EDNS) | 1679 (inc EDNS) | 1680 |
37 05 | 1680 | 1680 | 1680 | 1680 (inc EDNS) | 1680 (inc EDNS) | 1680 |
38 06 | 1680 | 1680 | 512 (No EDNS) | 512 (+EDNS) | 512 (no EDNS) | (no EDNS) |
39 07 | 1680 | 1680 | 512 (No EDNS) | 513 (+EDNS) | TC (no EDNS) | (no EDNS) |
40 08 | 1680 | 1680 | 511 | 501 (+EDNS) | 512 (inc EDNS) | 1680 |
42 The qname is $testnum.edns-tests.example.
44 _confdir
= 'EDNSBuffer'
45 _udpTruncationThreshold
= 1680
46 _ednsOutgoingBufsize
= 1680
47 _qnameSuffix
= '.edns-tests.example.'
49 _config_template
= """
50 forward-zones=edns-tests.example=%s.22
51 udp-truncation-threshold=%d
52 edns-outgoing-bufsize=%d
53 """ % (os
.environ
['PREFIX'], _udpTruncationThreshold
, _ednsOutgoingBufsize
)
56 def startResponders(cls
):
57 global ednsBufferReactorRunning
58 print("Launching responders..")
60 address
= cls
._PREFIX
+ '.22'
63 if not ednsBufferReactorRunning
:
64 reactor
.listenUDP(port
, UDPLargeResponder(), interface
=address
)
65 ednsBufferReactorRunning
= True
67 if not reactor
.running
:
68 cls
._UDPResponder
= threading
.Thread(
69 name
='UDP Responder', target
=reactor
.run
, args
=(False,))
70 cls
._UDPResponder
.setDaemon(True)
71 cls
._UDPResponder
.start()
73 def getMessage(self
, testnum
, payload
=0):
75 return dns
.message
.make_query(testnum
+ self
._qnameSuffix
, 'TXT', 'IN',
76 use_edns
=do_edns
, payload
=payload
)
78 def checkResponseContent(self
, rawResponse
, value
, size
, txt_final
):
80 Tests the rawResponse (bytes that represent the DNS packet) has size
81 number of bytes. And that the content of all TXT records is of value
82 and has total_txt_val characters.
84 response
= dns
.message
.from_wire(rawResponse
)
86 self
.assertEquals(len(rawResponse
), size
)
87 self
.assertRcodeEqual(response
, dns
.rcode
.NOERROR
)
89 self
.assertMessageHasFlags(response
, ['QR', 'RD', 'RA'])
91 for record
in response
.answer
:
92 self
.assertEquals(record
.rdtype
, dns
.rdatatype
.TXT
)
94 for string
in part
.strings
:
95 self
.assertTrue(len(string
) == 255 or
96 len(string
) == txt_final
)
98 def checkTruncatedResponse(self
, message
):
99 self
.assertMessageHasFlags(message
, ['QR', 'RD', 'RA', 'TC'])
101 def checkEDNS(self
, message
, bufsize
=0):
103 Checks that the DNSMessage message has EDNS if bufsize > 0 and that
104 the buffer size is correct.
107 self
.assertEqual(message
.edns
, 0)
108 self
.assertEqual(message
.payload
, bufsize
)
110 self
.assertEqual(message
.edns
, -1)
113 class EDNSBufferTest16801680(EDNSBufferTest
):
115 Runs test cases 1, 2, 5, 6, 7, 8
118 def testEdnsBufferTestCase01(self
):
119 query
= self
.getMessage('01', 4096)
121 raw
= self
.sendUDPQuery(query
, decode
=False)
122 self
.checkResponseContent(raw
, 'A',
123 self
._udpTruncationThreshold
, 9)
124 message
= dns
.message
.from_wire(raw
)
125 self
.checkEDNS(message
, 512)
127 def testEdnsBufferTestCase02(self
):
128 query
= self
.getMessage('02', 1679)
130 message
= self
.sendUDPQuery(query
)
131 self
.checkTruncatedResponse(message
)
132 self
.checkEDNS(message
, 512)
134 def testEdnsBufferTestCase05(self
):
135 query
= self
.getMessage('05', 1680)
137 raw
= self
.sendUDPQuery(query
, decode
=False)
138 self
.checkResponseContent(raw
, 'E',
139 self
._udpTruncationThreshold
, 9)
140 message
= dns
.message
.from_wire(raw
)
141 self
.checkEDNS(message
, 512)
143 def testEdnsBufferTestCase06(self
):
144 query
= self
.getMessage('06', 0)
146 raw
= self
.sendUDPQuery(query
, decode
=False)
147 self
.checkResponseContent(raw
, 'F', 512, 192)
148 message
= dns
.message
.from_wire(raw
)
149 self
.checkEDNS(message
, 0)
151 def testEdnsBufferTestCase07(self
):
152 query
= self
.getMessage('07', 0)
154 message
= self
.sendUDPQuery(query
)
155 self
.checkTruncatedResponse(message
)
156 self
.checkEDNS(message
, 0)
158 def testEdnsBufferTestCase08(self
):
159 query
= self
.getMessage('08', 511)
161 raw
= self
.sendUDPQuery(query
, decode
=False)
162 self
.checkResponseContent(raw
, 'H', 512, 181)
163 message
= dns
.message
.from_wire(raw
)
164 self
.checkEDNS(message
, 512)
166 class EDNSBufferTest16801681(EDNSBufferTest
):
170 _confdir
= 'EDNSBuffer16801681'
171 _udpTruncationThreshold
= 1680
172 _ednsOutgoingBufsize
= 1681
173 _qnameSuffix
= '.edns-tests.example.'
175 _config_template
= """
176 forward-zones=edns-tests.example=%s.22
177 udp-truncation-threshold=%d
178 edns-outgoing-bufsize=%d
179 """ % (os
.environ
['PREFIX'], _udpTruncationThreshold
, _ednsOutgoingBufsize
)
181 def testEdnsBufferTestCase03(self
):
182 query
= self
.getMessage('03', 4096)
184 message
= self
.sendUDPQuery(query
)
185 self
.checkTruncatedResponse(message
)
186 self
.checkEDNS(message
, 512)
189 class EDNSBufferTest16801679(EDNSBufferTest
):
193 _confdir
= 'EDNSBuffer16801679'
194 _udpTruncationThreshold
= 1680
195 _ednsOutgoingBufsize
= 1679
196 _qnameSuffix
= '.edns-tests.example.'
198 _config_template
= """
199 forward-zones=edns-tests.example=%s.22
200 udp-truncation-threshold=%d
201 edns-outgoing-bufsize=%d
202 """ % (os
.environ
['PREFIX'], _udpTruncationThreshold
, _ednsOutgoingBufsize
)
204 def testEdnsBufferTestCase04(self
):
205 query
= self
.getMessage('04', 4096)
207 raw
= self
.sendUDPQuery(query
, decode
=False)
208 self
.checkResponseContent(raw
, 'D',
209 self
._ednsOutgoingBufsize
, 8)
210 message
= dns
.message
.from_wire(raw
)
211 self
.checkEDNS(message
, 512)
214 class UDPLargeResponder(DatagramProtocol
):
215 def datagramReceived(self
, datagram
, address
):
216 request
= dns
.message
.from_wire(datagram
)
217 # The outgoing packet should be EDNS buffersize bytes
218 packet_size
= request
.payload
220 testnum
= int(str(request
.question
[0].name
).split('.')[0])
222 # Unless we have special tests
224 packet_size
= 512 + 11
226 packet_size
= 513 + 11
228 packet_size
= 501 + 11
230 # An EDNS(0) RR without options is 11 bytes:
239 # But the header also counts, which is 12 bytes
242 # The packet has a question section
246 response
= dns
.message
.make_response(request
)
247 # This is an authoritative answer
248 response
.flags |
= dns
.flags
.AA
249 # We pretend to do EDNS with a 4096 buffer size
251 response
.payload
= 4096
253 # What we use to fill the TXT records
254 # Test number + 64, so 01 = 'A', 02 = 'B' etc...
255 value
= chr(testnum
+ 64)
257 # Each pre-RDATA answer RR is 12 bytes
258 # NAME: 2 (ptr to begin of packet, 0xC00C)
263 while packet_size
> 0:
264 # Remove the pre-RDATA length
266 # And the TXT size indicator (first byte in the TXT record)
268 txt_size
= min(packet_size
, 255)
269 answer
= dns
.rrset
.from_text(request
.question
[0].name
,
270 0, dns
.rdataclass
.IN
, 'TXT',
273 response
.answer
.append(answer
)
274 packet_size
-= txt_size
276 assert(packet_size
== 0)
278 self
.transport
.write(response
.to_wire(max_size
=65535), address
)