]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_BrokenAnswer.py
Merge pull request #8795 from omoerbeek/rec-lua-docs-policytag
[thirdparty/pdns.git] / regression-tests.dnsdist / test_BrokenAnswer.py
CommitLineData
a620f197
RG
1#!/usr/bin/env python
2import threading
3import clientsubnetoption
4import dns
5from dnsdisttests import DNSDistTest
6
7def responseCallback(request):
8 if len(request.question) != 1:
9 print("Skipping query with question count %d" % (len(request.question)))
10 return None
11 healthCheck = str(request.question[0].name).endswith('a.root-servers.net.')
12 if healthCheck:
13 response = dns.message.make_response(request)
14 return response.to_wire()
15 # now we create a broken response
16 response = dns.message.make_response(request)
17 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 32)
18 response.use_edns(edns=True, payload=4096, options=[ecso])
19 rrset = dns.rrset.from_text(request.question[0].name,
20 3600,
21 dns.rdataclass.IN,
22 dns.rdatatype.A,
23 '127.0.0.1')
24 response.answer.append(rrset)
25 raw = response.to_wire()
26 # first label length of this rrset is at 12 (dnsheader) + length(qname) + 2 (leading label length + trailing 0) + 2 (qtype) + 2 (qclass)
27 offset = 12 + len(str(request.question[0].name)) + 2 + 2 + 2
92643b6a 28 altered = raw[:offset] + b'\xff' + raw[offset+1:]
a620f197
RG
29 return altered
30
31class TestBrokenAnswerECS(DNSDistTest):
32
33 # this test suite uses a different responder port
34 # because, contrary to the other ones, its
35 # responders send raw, broken data
36 _testServerPort = 5400
37 _config_template = """
38 setECSSourcePrefixV4(32)
39 newServer{address="127.0.0.1:%s", useClientSubnet=true}
40 """
41 @classmethod
42 def startResponders(cls):
43 print("Launching responders..")
44
45 # Returns broken data for non-healthcheck queries
46 cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, responseCallback])
47 cls._UDPResponder.setDaemon(True)
48 cls._UDPResponder.start()
49
50 # Returns broken data for non-healthcheck queries
51 cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, responseCallback])
52 cls._TCPResponder.setDaemon(True)
53 cls._TCPResponder.start()
54
55 def testUDPWithInvalidAnswer(self):
56 """
57 Broken Answer: Invalid UDP answer with ECS
58 """
59 name = 'invalid-ecs-udp.broken-answer.tests.powerdns.com.'
60 query = dns.message.make_query(name, 'A', 'IN')
61 rrset = dns.rrset.from_text(name,
62 3600,
63 dns.rdataclass.IN,
64 dns.rdatatype.A,
65 '127.0.0.1')
66 expectedResponse = dns.message.make_response(query)
67 expectedResponse.answer.append(rrset)
68
69 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
70 self.assertFalse(receivedQuery)
71 self.assertFalse(receivedResponse)
72
73 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
74 self.assertFalse(receivedQuery)
75 self.assertFalse(receivedResponse)
76
77 def testTCPWithInvalidAnswer(self):
78 """
79 Broken Answer: Invalid TCP answer with ECS
80 """
81 name = 'invalid-ecs-tcp.broken-answer.tests.powerdns.com.'
82 query = dns.message.make_query(name, 'A', 'IN')
83 rrset = dns.rrset.from_text(name,
84 3600,
85 dns.rdataclass.IN,
86 dns.rdatatype.A,
87 '127.0.0.1')
88 expectedResponse = dns.message.make_response(query)
89 expectedResponse.answer.append(rrset)
90
91 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
92 self.assertFalse(receivedQuery)
93 self.assertFalse(receivedResponse)
94
95 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
96 self.assertFalse(receivedQuery)
97 self.assertFalse(receivedResponse)