6 from recursortests
import RecursorTest
8 class ExtendedErrorsRecursorTest(RecursorTest
):
10 _confdir
= 'ExtendedErrors'
11 _config_template
= """
13 extended-resolution-errors=yes
15 _lua_config_file
= """
16 rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz.", extendedErrorCode=15, extendedErrorExtra='Blocked by RPZ!'})
18 _lua_dns_script_file
= """
19 function preresolve(dq)
20 if dq.qname == newDN('fromlua.extended.') then
21 dq.extendedErrorCode = 10
22 dq.extendedErrorExtra = "Extra text from Lua!"
25 if dq.qname == newDN('toolarge.extended.') then
26 dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
27 dq.extendedErrorCode = 10
28 dq.extendedErrorExtra = "Extra text from Lua!"
34 local ffi = require("ffi")
37 typedef struct pdns_ffi_param pdns_ffi_param_t;
39 const char* pdns_ffi_param_get_qname(pdns_ffi_param_t* ref) __attribute__ ((visibility ("default")));
40 void pdns_ffi_param_set_rcode(pdns_ffi_param_t* ref, int rcode) __attribute__ ((visibility ("default")));
41 void pdns_ffi_param_set_extended_error_code(pdns_ffi_param_t* ref, uint16_t code) __attribute__ ((visibility ("default")));
42 void pdns_ffi_param_set_extended_error_extra(pdns_ffi_param_t* ref, size_t len, const char* extra);
45 function gettag_ffi(obj)
46 local qname = ffi.string(ffi.C.pdns_ffi_param_get_qname(obj))
47 if qname == 'fromluaffi.extended' then
48 ffi.C.pdns_ffi_param_set_rcode(obj, 0)
49 ffi.C.pdns_ffi_param_set_extended_error_code(obj, 10)
50 local extra = 'Extra text from Lua FFI!'
51 ffi.C.pdns_ffi_param_set_extended_error_extra(obj, #extra, extra)
61 # we don't need all the auth stuff
65 confdir
= os
.path
.join('configs', cls
._confdir
)
66 cls
.createConfigDir(confdir
)
68 cls
.generateRecursorConfig(confdir
)
69 cls
.startRecursor(confdir
, cls
._recursorPort
)
72 def tearDownClass(cls
):
73 cls
.tearDownRecursor()
76 def generateRecursorConfig(cls
, confdir
):
77 rpzFilePath
= os
.path
.join(confdir
, 'zone.rpz')
78 with
open(rpzFilePath
, 'w') as rpzZone
:
79 rpzZone
.write("""$ORIGIN zone.rpz.
81 *.rpz.extended.zone.rpz. 60 IN CNAME .
82 """.format(soa
=cls
._SOA
))
84 super(ExtendedErrorsRecursorTest
, cls
).generateRecursorConfig(confdir
)
86 @pytest.mark
.skip(reason
="sidnlabs no longer serves thiss until further notice")
87 def testNotIncepted(self
):
88 qname
= 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
89 query
= dns
.message
.make_query(qname
, 'A', want_dnssec
=True)
91 for method
in ("sendUDPQuery", "sendTCPQuery"):
92 sender
= getattr(self
, method
)
93 res
= sender(query
, timeout
=5.0)
94 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
95 self
.assertEqual(res
.edns
, 0)
96 self
.assertEqual(len(res
.options
), 1)
97 self
.assertEqual(res
.options
[0].otype
, 15)
98 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(8, b
''))
100 @pytest.mark
.skip(reason
="sidnlabs no longer serves thiss until further notice")
101 def testExpired(self
):
102 qname
= 'sigexpired.bad-dnssec.wb.sidnlabs.nl.'
103 query
= dns
.message
.make_query(qname
, 'A', want_dnssec
=True)
105 for method
in ("sendUDPQuery", "sendTCPQuery"):
106 sender
= getattr(self
, method
)
107 res
= sender(query
, timeout
=5.0)
108 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
109 self
.assertEqual(res
.edns
, 0)
110 self
.assertEqual(len(res
.options
), 1)
111 self
.assertEqual(res
.options
[0].otype
, 15)
112 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(7, b
''))
114 def testAllExpired(self
):
115 qname
= 'servfail.nl.'
116 query
= dns
.message
.make_query(qname
, 'AAAA', want_dnssec
=True)
118 for method
in ("sendUDPQuery", "sendTCPQuery"):
119 sender
= getattr(self
, method
)
120 res
= sender(query
, timeout
=5.0)
121 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
122 self
.assertEqual(res
.edns
, 0)
123 self
.assertEqual(len(res
.options
), 1)
124 self
.assertEqual(res
.options
[0].otype
, 15)
125 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(6, b
''))
127 @pytest.mark
.skip(reason
="sidnlabs no longer serves thiss until further notice")
129 qname
= 'bogussig.ok.bad-dnssec.wb.sidnlabs.nl.'
130 query
= dns
.message
.make_query(qname
, 'A', want_dnssec
=True)
132 for method
in ("sendUDPQuery", "sendTCPQuery"):
133 sender
= getattr(self
, method
)
134 res
= sender(query
, timeout
=5.0)
135 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
136 self
.assertEqual(res
.edns
, 0)
137 self
.assertEqual(len(res
.options
), 1)
138 self
.assertEqual(res
.options
[0].otype
, 15)
139 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(6, b
''))
141 def testMissingRRSIG(self
):
142 qname
= 'brokendnssec.net.'
143 query
= dns
.message
.make_query(qname
, 'A', want_dnssec
=True)
145 for method
in ("sendUDPQuery", "sendTCPQuery"):
146 sender
= getattr(self
, method
)
147 res
= sender(query
, timeout
=5.0)
148 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
149 self
.assertEqual(res
.edns
, 0)
150 self
.assertEqual(len(res
.options
), 1)
151 self
.assertEqual(res
.options
[0].otype
, 15)
152 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(10, b
''))
154 def testFromLua(self
):
155 qname
= 'fromlua.extended.'
156 query
= dns
.message
.make_query(qname
, 'A', want_dnssec
=True)
158 for method
in ("sendUDPQuery", "sendTCPQuery"):
159 sender
= getattr(self
, method
)
160 res
= sender(query
, timeout
=5.0)
161 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
162 self
.assertEqual(res
.edns
, 0)
163 self
.assertEqual(len(res
.options
), 1)
164 self
.assertEqual(res
.options
[0].otype
, 15)
165 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(10, b
'Extra text from Lua!'))
167 def testFromLuaFFI(self
):
168 qname
= 'fromluaffi.extended.'
169 query
= dns
.message
.make_query(qname
, 'A', want_dnssec
=True)
171 for method
in ("sendUDPQuery", "sendTCPQuery"):
172 sender
= getattr(self
, method
)
173 res
= sender(query
, timeout
=5.0)
174 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
175 self
.assertEqual(res
.edns
, 0)
176 self
.assertEqual(len(res
.options
), 1)
177 self
.assertEqual(res
.options
[0].otype
, 15)
178 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(10, b
'Extra text from Lua FFI!'))
181 qname
= 'sub.rpz.extended.'
182 query
= dns
.message
.make_query(qname
, 'A', want_dnssec
=True)
184 for method
in ("sendUDPQuery", "sendTCPQuery"):
185 sender
= getattr(self
, method
)
186 res
= sender(query
, timeout
=5.0)
187 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
188 self
.assertEqual(res
.edns
, 0)
189 self
.assertEqual(len(res
.options
), 1)
190 self
.assertEqual(res
.options
[0].otype
, 15)
191 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(15, b
'Blocked by RPZ!'))
193 def testTooLarge(self
):
194 qname
= 'toolarge.extended.'
195 query
= dns
.message
.make_query(qname
, 'A', want_dnssec
=True, payload
=512)
197 # should not have the Extended Option since the packet is too large already
198 res
= self
.sendUDPQuery(query
, timeout
=5.0)
199 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
200 self
.assertEqual(len(res
.answer
), 1)
201 self
.assertEqual(res
.edns
, 0)
202 self
.assertEqual(len(res
.options
), 0)
204 res
= self
.sendTCPQuery(query
, timeout
=5.0)
205 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
206 self
.assertEqual(len(res
.answer
), 1)
207 self
.assertEqual(res
.edns
, 0)
208 self
.assertEqual(len(res
.options
), 1)
209 self
.assertEqual(res
.options
[0].otype
, 15)
210 self
.assertEqual(res
.options
[0], extendederrors
.ExtendedErrorOption(10, b
'Extra text from Lua!'))
212 class NoExtendedErrorsRecursorTest(RecursorTest
):
214 _confdir
= 'ExtendedErrorsDisabled'
215 _config_template
= """
217 extended-resolution-errors=no
224 # we don't need all the auth stuff
226 cls
.startResponders()
228 confdir
= os
.path
.join('configs', cls
._confdir
)
229 cls
.createConfigDir(confdir
)
231 cls
.generateRecursorConfig(confdir
)
232 cls
.startRecursor(confdir
, cls
._recursorPort
)
235 def tearDownClass(cls
):
236 cls
.tearDownRecursor()
239 def generateRecursorConfig(cls
, confdir
):
240 super(NoExtendedErrorsRecursorTest
, cls
).generateRecursorConfig(confdir
)
242 @pytest.mark
.skip(reason
="sidnlabs no longer serves thiss until further notice")
243 def testNotIncepted(self
):
244 qname
= 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
245 query
= dns
.message
.make_query(qname
, 'A', want_dnssec
=True)
247 for method
in ("sendUDPQuery", "sendTCPQuery"):
248 sender
= getattr(self
, method
)
249 res
= sender(query
, timeout
=5.0)
250 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
251 self
.assertEqual(res
.edns
, 0)
252 self
.assertEqual(len(res
.options
), 0)