]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_BrokenAnswer.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 def responseCallback(request
):
8 if len(request
.question
) != 1:
9 print("Skipping query with question count %d" % (len(request
.question
)))
11 healthCheck
= str(request
.question
[0].name
).endswith('a.root-servers.net.')
13 response
= dns
.message
.make_response(request
)
14 return response
.to_wire()
15 # now we create a broken response
16 response
= dns
.message
.make_response(request
)
17 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 32)
18 response
.use_edns(edns
=True, payload
=4096, options
=[ecso
])
19 rrset
= dns
.rrset
.from_text(request
.question
[0].name
,
24 response
.answer
.append(rrset
)
25 raw
= response
.to_wire()
26 # first label length of this rrset is at 12 (dnsheader) + length(qname) + 2 (leading label length + trailing 0) + 2 (qtype) + 2 (qclass)
27 offset
= 12 + len(str(request
.question
[0].name
)) + 2 + 2 + 2
28 altered
= raw
[:offset
] + b
'\xff' + raw
[offset
+1:]
31 class TestBrokenAnswerECS(DNSDistTest
):
33 # this test suite uses a different responder port
34 # because, contrary to the other ones, its
35 # responders send raw, broken data
36 _testServerPort
= 5400
37 _config_template
= """
38 setECSSourcePrefixV4(32)
39 newServer{address="127.0.0.1:%s", useClientSubnet=true}
42 def startResponders(cls
):
43 print("Launching responders..")
45 # Returns broken data for non-healthcheck queries
46 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
, False, responseCallback
])
47 cls
._UDPResponder
.setDaemon(True)
48 cls
._UDPResponder
.start()
50 # Returns broken data for non-healthcheck queries
51 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
, False, False, responseCallback
])
52 cls
._TCPResponder
.setDaemon(True)
53 cls
._TCPResponder
.start()
55 def testUDPWithInvalidAnswer(self
):
57 Broken Answer: Invalid UDP answer with ECS
59 name
= 'invalid-ecs-udp.broken-answer.tests.powerdns.com.'
60 query
= dns
.message
.make_query(name
, 'A', 'IN')
61 rrset
= dns
.rrset
.from_text(name
,
66 expectedResponse
= dns
.message
.make_response(query
)
67 expectedResponse
.answer
.append(rrset
)
69 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
70 self
.assertFalse(receivedQuery
)
71 self
.assertFalse(receivedResponse
)
73 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
74 self
.assertFalse(receivedQuery
)
75 self
.assertFalse(receivedResponse
)
77 def testTCPWithInvalidAnswer(self
):
79 Broken Answer: Invalid TCP answer with ECS
81 name
= 'invalid-ecs-tcp.broken-answer.tests.powerdns.com.'
82 query
= dns
.message
.make_query(name
, 'A', 'IN')
83 rrset
= dns
.rrset
.from_text(name
,
88 expectedResponse
= dns
.message
.make_response(query
)
89 expectedResponse
.answer
.append(rrset
)
91 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
92 self
.assertFalse(receivedQuery
)
93 self
.assertFalse(receivedResponse
)
95 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
96 self
.assertFalse(receivedQuery
)
97 self
.assertFalse(receivedResponse
)