]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDE.py
Merge pull request #13770 from Assumeru/require-tsig
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDE.py
1 #!/usr/bin/env python
2 import extendederrors
3 import dns
4 from dnsdisttests import DNSDistTest, pickAvailablePort
5
6 class TestBasics(DNSDistTest):
7
8 _config_template = """
9 newServer{address="127.0.0.1:%s"}
10 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
11 getPool(""):setCache(pc)
12
13 local ffi = require("ffi")
14 function ffiAction(dq)
15 local extraText = 'Synthesized from Lua'
16 ffi.C.dnsdist_ffi_dnsquestion_set_extended_dns_error(dq, 29, extraText, #extraText)
17 local str = "192.0.2.2"
18 local buf = ffi.new("char[?]", #str + 1)
19 ffi.copy(buf, str)
20 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
21 return DNSAction.Spoof
22 end
23
24 addAction("self-answered.ede.tests.powerdns.com.", SpoofAction("192.0.2.1"))
25 addAction("self-answered-ffi.ede.tests.powerdns.com.", LuaFFIAction(ffiAction))
26 addSelfAnsweredResponseAction("self-answered.ede.tests.powerdns.com.", SetExtendedDNSErrorResponseAction(42, "my self-answered extended error status"))
27 addAction(AllRule(), SetExtendedDNSErrorAction(16, "my extended error status"))
28
29 """
30
31 def testExtendedErrorNoEDNS(self):
32 """
33 EDE: No EDNS
34 """
35 name = 'no-edns.ede.tests.powerdns.com.'
36 # no EDNS
37 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
38 response = dns.message.make_response(query)
39 rrset = dns.rrset.from_text(name,
40 60,
41 dns.rdataclass.IN,
42 dns.rdatatype.A,
43 '127.0.0.1')
44
45 response.answer.append(rrset)
46
47 for method in ("sendUDPQuery", "sendTCPQuery"):
48 sender = getattr(self, method)
49 (receivedQuery, receivedResponse) = sender(query, response)
50 receivedQuery.id = query.id
51 self.assertEqual(query, receivedQuery)
52 self.checkResponseNoEDNS(response, receivedResponse)
53
54 def testExtendedErrorBackendResponse(self):
55 """
56 EDE: Backend response
57 """
58 name = 'backend-response.ede.tests.powerdns.com.'
59 ede = extendederrors.ExtendedErrorOption(16, b'my extended error status')
60 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
61
62 backendResponse = dns.message.make_response(query)
63 backendResponse.use_edns(edns=True, payload=4096, options=[])
64 rrset = dns.rrset.from_text(name,
65 60,
66 dns.rdataclass.IN,
67 dns.rdatatype.A,
68 '127.0.0.1')
69
70 backendResponse.answer.append(rrset)
71 expectedResponse = dns.message.make_response(query)
72 expectedResponse.use_edns(edns=True, payload=4096, options=[ede])
73 rrset = dns.rrset.from_text(name,
74 60,
75 dns.rdataclass.IN,
76 dns.rdatatype.A,
77 '127.0.0.1')
78 expectedResponse.answer.append(rrset)
79
80 for method in ("sendUDPQuery", "sendTCPQuery"):
81 sender = getattr(self, method)
82 (receivedQuery, receivedResponse) = sender(query, backendResponse)
83 receivedQuery.id = query.id
84 self.assertEqual(query, receivedQuery)
85 self.checkMessageEDNS(expectedResponse, receivedResponse)
86
87 # testing the cache
88 for method in ("sendUDPQuery", "sendTCPQuery"):
89 sender = getattr(self, method)
90 (_, receivedResponse) = sender(query, response=None, useQueue=False)
91 self.checkMessageEDNS(expectedResponse, receivedResponse)
92
93 def testExtendedErrorBackendResponseWithExistingEDE(self):
94 """
95 EDE: Backend response with existing EDE
96 """
97 name = 'backend-response-existing-ede.ede.tests.powerdns.com.'
98 ede = extendederrors.ExtendedErrorOption(16, b'my extended error status')
99 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
100
101 backendResponse = dns.message.make_response(query)
102 backendEDE = extendederrors.ExtendedErrorOption(3, b'Stale answer')
103 backendResponse.use_edns(edns=True, payload=4096, options=[backendEDE])
104 rrset = dns.rrset.from_text(name,
105 60,
106 dns.rdataclass.IN,
107 dns.rdatatype.A,
108 '127.0.0.1')
109
110 backendResponse.answer.append(rrset)
111 expectedResponse = dns.message.make_response(query)
112 expectedResponse.use_edns(edns=True, payload=4096, options=[ede])
113 rrset = dns.rrset.from_text(name,
114 60,
115 dns.rdataclass.IN,
116 dns.rdatatype.A,
117 '127.0.0.1')
118 expectedResponse.answer.append(rrset)
119
120 for method in ("sendUDPQuery", "sendTCPQuery"):
121 sender = getattr(self, method)
122 (receivedQuery, receivedResponse) = sender(query, backendResponse)
123 receivedQuery.id = query.id
124 self.assertEqual(query, receivedQuery)
125 self.checkMessageEDNS(expectedResponse, receivedResponse)
126
127 # testing the cache
128 for method in ("sendUDPQuery", "sendTCPQuery"):
129 sender = getattr(self, method)
130 (_, receivedResponse) = sender(query, response=None, useQueue=False)
131 self.checkMessageEDNS(expectedResponse, receivedResponse)
132
133 def testExtendedErrorSelfAnswered(self):
134 """
135 EDE: Self-answered
136 """
137 name = 'self-answered.ede.tests.powerdns.com.'
138 ede = extendederrors.ExtendedErrorOption(42, b'my self-answered extended error status')
139 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
140 # dnsdist sets RA = RD for self-generated responses
141 query.flags &= ~dns.flags.RD
142
143 expectedResponse = dns.message.make_response(query)
144 expectedResponse.use_edns(edns=True, payload=1232, options=[ede])
145 rrset = dns.rrset.from_text(name,
146 60,
147 dns.rdataclass.IN,
148 dns.rdatatype.A,
149 '192.0.2.1')
150 expectedResponse.answer.append(rrset)
151
152 for method in ("sendUDPQuery", "sendTCPQuery"):
153 sender = getattr(self, method)
154 (_, receivedResponse) = sender(query, response=None, useQueue=False)
155 self.checkMessageEDNS(expectedResponse, receivedResponse)
156
157 def testExtendedErrorLuaFFI(self):
158 """
159 EDE: Self-answered via Lua FFI
160 """
161 name = 'self-answered-ffi.ede.tests.powerdns.com.'
162 ede = extendederrors.ExtendedErrorOption(29, b'Synthesized from Lua')
163 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
164 # dnsdist sets RA = RD for self-generated responses
165 query.flags &= ~dns.flags.RD
166
167 expectedResponse = dns.message.make_response(query)
168 expectedResponse.use_edns(edns=True, payload=1232, options=[ede])
169 rrset = dns.rrset.from_text(name,
170 60,
171 dns.rdataclass.IN,
172 dns.rdatatype.A,
173 '192.0.2.2')
174 expectedResponse.answer.append(rrset)
175
176 for method in ("sendUDPQuery", "sendTCPQuery"):
177 sender = getattr(self, method)
178 (_, receivedResponse) = sender(query, response=None, useQueue=False)
179 self.checkMessageEDNS(expectedResponse, receivedResponse)