]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDE.py
4 from dnsdisttests
import DNSDistTest
, pickAvailablePort
6 class TestBasics(DNSDistTest
):
9 newServer{address="127.0.0.1:%s"}
10 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
11 getPool(""):setCache(pc)
13 local ffi = require("ffi")
14 function ffiAction(dq)
15 local extraText = 'Synthesized from Lua'
16 ffi.C.dnsdist_ffi_dnsquestion_set_extended_dns_error(dq, 29, extraText, #extraText)
17 local str = "192.0.2.2"
18 local buf = ffi.new("char[?]", #str + 1)
20 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
21 return DNSAction.Spoof
24 addAction("self-answered.ede.tests.powerdns.com.", SpoofAction("192.0.2.1"))
25 addAction("self-answered-ffi.ede.tests.powerdns.com.", LuaFFIAction(ffiAction))
26 addSelfAnsweredResponseAction("self-answered.ede.tests.powerdns.com.", SetExtendedDNSErrorResponseAction(42, "my self-answered extended error status"))
27 addAction(AllRule(), SetExtendedDNSErrorAction(16, "my extended error status"))
31 def testExtendedErrorNoEDNS(self
):
35 name
= 'no-edns.ede.tests.powerdns.com.'
37 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
38 response
= dns
.message
.make_response(query
)
39 rrset
= dns
.rrset
.from_text(name
,
45 response
.answer
.append(rrset
)
47 for method
in ("sendUDPQuery", "sendTCPQuery"):
48 sender
= getattr(self
, method
)
49 (receivedQuery
, receivedResponse
) = sender(query
, response
)
50 receivedQuery
.id = query
.id
51 self
.assertEqual(query
, receivedQuery
)
52 self
.checkResponseNoEDNS(response
, receivedResponse
)
54 def testExtendedErrorBackendResponse(self
):
58 name
= 'backend-response.ede.tests.powerdns.com.'
59 ede
= extendederrors
.ExtendedErrorOption(16, b
'my extended error status')
60 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
62 backendResponse
= dns
.message
.make_response(query
)
63 backendResponse
.use_edns(edns
=True, payload
=4096, options
=[])
64 rrset
= dns
.rrset
.from_text(name
,
70 backendResponse
.answer
.append(rrset
)
71 expectedResponse
= dns
.message
.make_response(query
)
72 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ede
])
73 rrset
= dns
.rrset
.from_text(name
,
78 expectedResponse
.answer
.append(rrset
)
80 for method
in ("sendUDPQuery", "sendTCPQuery"):
81 sender
= getattr(self
, method
)
82 (receivedQuery
, receivedResponse
) = sender(query
, backendResponse
)
83 receivedQuery
.id = query
.id
84 self
.assertEqual(query
, receivedQuery
)
85 self
.checkMessageEDNS(expectedResponse
, receivedResponse
)
88 for method
in ("sendUDPQuery", "sendTCPQuery"):
89 sender
= getattr(self
, method
)
90 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
91 self
.checkMessageEDNS(expectedResponse
, receivedResponse
)
93 def testExtendedErrorBackendResponseWithExistingEDE(self
):
95 EDE: Backend response with existing EDE
97 name
= 'backend-response-existing-ede.ede.tests.powerdns.com.'
98 ede
= extendederrors
.ExtendedErrorOption(16, b
'my extended error status')
99 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
101 backendResponse
= dns
.message
.make_response(query
)
102 backendEDE
= extendederrors
.ExtendedErrorOption(3, b
'Stale answer')
103 backendResponse
.use_edns(edns
=True, payload
=4096, options
=[backendEDE
])
104 rrset
= dns
.rrset
.from_text(name
,
110 backendResponse
.answer
.append(rrset
)
111 expectedResponse
= dns
.message
.make_response(query
)
112 expectedResponse
.use_edns(edns
=True, payload
=4096, options
=[ede
])
113 rrset
= dns
.rrset
.from_text(name
,
118 expectedResponse
.answer
.append(rrset
)
120 for method
in ("sendUDPQuery", "sendTCPQuery"):
121 sender
= getattr(self
, method
)
122 (receivedQuery
, receivedResponse
) = sender(query
, backendResponse
)
123 receivedQuery
.id = query
.id
124 self
.assertEqual(query
, receivedQuery
)
125 self
.checkMessageEDNS(expectedResponse
, receivedResponse
)
128 for method
in ("sendUDPQuery", "sendTCPQuery"):
129 sender
= getattr(self
, method
)
130 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
131 self
.checkMessageEDNS(expectedResponse
, receivedResponse
)
133 def testExtendedErrorSelfAnswered(self
):
137 name
= 'self-answered.ede.tests.powerdns.com.'
138 ede
= extendederrors
.ExtendedErrorOption(42, b
'my self-answered extended error status')
139 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
140 # dnsdist sets RA = RD for self-generated responses
141 query
.flags
&= ~dns
.flags
.RD
143 expectedResponse
= dns
.message
.make_response(query
)
144 expectedResponse
.use_edns(edns
=True, payload
=1232, options
=[ede
])
145 rrset
= dns
.rrset
.from_text(name
,
150 expectedResponse
.answer
.append(rrset
)
152 for method
in ("sendUDPQuery", "sendTCPQuery"):
153 sender
= getattr(self
, method
)
154 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
155 self
.checkMessageEDNS(expectedResponse
, receivedResponse
)
157 def testExtendedErrorLuaFFI(self
):
159 EDE: Self-answered via Lua FFI
161 name
= 'self-answered-ffi.ede.tests.powerdns.com.'
162 ede
= extendederrors
.ExtendedErrorOption(29, b
'Synthesized from Lua')
163 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
164 # dnsdist sets RA = RD for self-generated responses
165 query
.flags
&= ~dns
.flags
.RD
167 expectedResponse
= dns
.message
.make_response(query
)
168 expectedResponse
.use_edns(edns
=True, payload
=1232, options
=[ede
])
169 rrset
= dns
.rrset
.from_text(name
,
174 expectedResponse
.answer
.append(rrset
)
176 for method
in ("sendUDPQuery", "sendTCPQuery"):
177 sender
= getattr(self
, method
)
178 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
179 self
.checkMessageEDNS(expectedResponse
, receivedResponse
)