]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_ExtendedErrors.py
Merge pull request #12086 from zeha/apizonepost
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ExtendedErrors.py
CommitLineData
e95b2a7c
RG
1import dns
2import os
3import extendederrors
4
5from recursortests import RecursorTest
6
7class ExtendedErrorsRecursorTest(RecursorTest):
8
9 _confdir = 'ExtendedErrors'
9425e4ca 10 _config_template = """
e95b2a7c 11dnssec=validate
9425e4ca 12extended-resolution-errors=yes
e95b2a7c 13"""
af231ceb
RG
14 _lua_config_file = """
15 rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz.", extendedErrorCode=15, extendedErrorExtra='Blocked by RPZ!'})
16 """ % (_confdir)
e95b2a7c
RG
17 _lua_dns_script_file = """
18 function preresolve(dq)
19 if dq.qname == newDN('fromlua.extended.') then
20 dq.extendedErrorCode = 10
21 dq.extendedErrorExtra = "Extra text from Lua!"
22 return true
23 end
24 if dq.qname == newDN('toolarge.extended.') then
25 dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
26 dq.extendedErrorCode = 10
27 dq.extendedErrorExtra = "Extra text from Lua!"
28 return true
29 end
30 return false
31 end
32
33 local ffi = require("ffi")
34
35 ffi.cdef[[
36 typedef struct pdns_ffi_param pdns_ffi_param_t;
37
38 const char* pdns_ffi_param_get_qname(pdns_ffi_param_t* ref) __attribute__ ((visibility ("default")));
39 void pdns_ffi_param_set_rcode(pdns_ffi_param_t* ref, int rcode) __attribute__ ((visibility ("default")));
40 void pdns_ffi_param_set_extended_error_code(pdns_ffi_param_t* ref, uint16_t code) __attribute__ ((visibility ("default")));
41 void pdns_ffi_param_set_extended_error_extra(pdns_ffi_param_t* ref, size_t len, const char* extra);
42 ]]
43
44 function gettag_ffi(obj)
45 local qname = ffi.string(ffi.C.pdns_ffi_param_get_qname(obj))
46 if qname == 'fromluaffi.extended' then
47 ffi.C.pdns_ffi_param_set_rcode(obj, 0)
48 ffi.C.pdns_ffi_param_set_extended_error_code(obj, 10)
49 local extra = 'Extra text from Lua FFI!'
50 ffi.C.pdns_ffi_param_set_extended_error_extra(obj, #extra, extra)
51 end
e95b2a7c
RG
52 end
53 """ % ('A'*427)
54
55 _roothints = None
56
57 @classmethod
58 def setUpClass(cls):
59
60 # we don't need all the auth stuff
61 cls.setUpSockets()
62 cls.startResponders()
63
64 confdir = os.path.join('configs', cls._confdir)
65 cls.createConfigDir(confdir)
66
67 cls.generateRecursorConfig(confdir)
68 cls.startRecursor(confdir, cls._recursorPort)
69
70 @classmethod
71 def tearDownClass(cls):
72 cls.tearDownRecursor()
73
74 @classmethod
75 def generateRecursorConfig(cls, confdir):
af231ceb
RG
76 rpzFilePath = os.path.join(confdir, 'zone.rpz')
77 with open(rpzFilePath, 'w') as rpzZone:
78 rpzZone.write("""$ORIGIN zone.rpz.
79@ 3600 IN SOA {soa}
80*.rpz.extended.zone.rpz. 60 IN CNAME .
81""".format(soa=cls._SOA))
82
e95b2a7c
RG
83 super(ExtendedErrorsRecursorTest, cls).generateRecursorConfig(confdir)
84
85 def testNotIncepted(self):
86 qname = 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
87 query = dns.message.make_query(qname, 'A', want_dnssec=True)
88
89 for method in ("sendUDPQuery", "sendTCPQuery"):
90 sender = getattr(self, method)
91 res = sender(query, timeout=5.0)
92 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
93 self.assertEqual(res.edns, 0)
94 self.assertEqual(len(res.options), 1)
95 self.assertEqual(res.options[0].otype, 15)
96 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(8, b''))
97
98 def testExpired(self):
99 qname = 'sigexpired.bad-dnssec.wb.sidnlabs.nl.'
100 query = dns.message.make_query(qname, 'A', want_dnssec=True)
101
102 for method in ("sendUDPQuery", "sendTCPQuery"):
103 sender = getattr(self, method)
104 res = sender(query, timeout=5.0)
105 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
106 self.assertEqual(res.edns, 0)
107 self.assertEqual(len(res.options), 1)
108 self.assertEqual(res.options[0].otype, 15)
109 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(7, b''))
110
0dd902cb
OM
111 def testAllExpired(self):
112 qname = 'servfail.nl.'
113 query = dns.message.make_query(qname, 'AAAA', want_dnssec=True)
114
115 for method in ("sendUDPQuery", "sendTCPQuery"):
116 sender = getattr(self, method)
117 res = sender(query, timeout=5.0)
118 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
119 self.assertEqual(res.edns, 0)
120 self.assertEqual(len(res.options), 1)
121 self.assertEqual(res.options[0].otype, 15)
2cd34ba5 122 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(6, b''))
0dd902cb 123
e95b2a7c 124 def testBogus(self):
af231ceb 125 qname = 'bogussig.ok.bad-dnssec.wb.sidnlabs.nl.'
e95b2a7c
RG
126 query = dns.message.make_query(qname, 'A', want_dnssec=True)
127
128 for method in ("sendUDPQuery", "sendTCPQuery"):
129 sender = getattr(self, method)
130 res = sender(query, timeout=5.0)
131 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
132 self.assertEqual(res.edns, 0)
133 self.assertEqual(len(res.options), 1)
134 self.assertEqual(res.options[0].otype, 15)
135 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(6, b''))
136
137 def testMissingRRSIG(self):
138 qname = 'brokendnssec.net.'
139 query = dns.message.make_query(qname, 'A', want_dnssec=True)
140
141 for method in ("sendUDPQuery", "sendTCPQuery"):
142 sender = getattr(self, method)
143 res = sender(query, timeout=5.0)
144 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
145 self.assertEqual(res.edns, 0)
146 self.assertEqual(len(res.options), 1)
147 self.assertEqual(res.options[0].otype, 15)
148 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b''))
149
150 def testFromLua(self):
151 qname = 'fromlua.extended.'
152 query = dns.message.make_query(qname, 'A', want_dnssec=True)
153
154 for method in ("sendUDPQuery", "sendTCPQuery"):
155 sender = getattr(self, method)
156 res = sender(query, timeout=5.0)
157 self.assertRcodeEqual(res, dns.rcode.NOERROR)
158 self.assertEqual(res.edns, 0)
159 self.assertEqual(len(res.options), 1)
160 self.assertEqual(res.options[0].otype, 15)
161 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua!'))
162
163 def testFromLuaFFI(self):
164 qname = 'fromluaffi.extended.'
165 query = dns.message.make_query(qname, 'A', want_dnssec=True)
166
167 for method in ("sendUDPQuery", "sendTCPQuery"):
168 sender = getattr(self, method)
169 res = sender(query, timeout=5.0)
170 self.assertRcodeEqual(res, dns.rcode.NOERROR)
171 self.assertEqual(res.edns, 0)
172 self.assertEqual(len(res.options), 1)
173 self.assertEqual(res.options[0].otype, 15)
174 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua FFI!'))
175
af231ceb
RG
176 def testRPZ(self):
177 qname = 'sub.rpz.extended.'
178 query = dns.message.make_query(qname, 'A', want_dnssec=True)
179
180 for method in ("sendUDPQuery", "sendTCPQuery"):
181 sender = getattr(self, method)
182 res = sender(query, timeout=5.0)
183 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
184 self.assertEqual(res.edns, 0)
185 self.assertEqual(len(res.options), 1)
186 self.assertEqual(res.options[0].otype, 15)
187 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(15, b'Blocked by RPZ!'))
188
e95b2a7c
RG
189 def testTooLarge(self):
190 qname = 'toolarge.extended.'
191 query = dns.message.make_query(qname, 'A', want_dnssec=True, payload=512)
192
193 # should not have the Extended Option since the packet is too large already
194 res = self.sendUDPQuery(query, timeout=5.0)
195 self.assertRcodeEqual(res, dns.rcode.NOERROR)
4bfebc93 196 self.assertEqual(len(res.answer), 1)
e95b2a7c
RG
197 self.assertEqual(res.edns, 0)
198 self.assertEqual(len(res.options), 0)
199
200 res = self.sendTCPQuery(query, timeout=5.0)
201 self.assertRcodeEqual(res, dns.rcode.NOERROR)
4bfebc93 202 self.assertEqual(len(res.answer), 1)
e95b2a7c
RG
203 self.assertEqual(res.edns, 0)
204 self.assertEqual(len(res.options), 1)
205 self.assertEqual(res.options[0].otype, 15)
206 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua!'))
207
208class NoExtendedErrorsRecursorTest(RecursorTest):
209
210 _confdir = 'ExtendedErrorsDisabled'
e95b2a7c 211 _config_template = """
9425e4ca
O
212dnssec=validate
213extended-resolution-errors=no
e95b2a7c
RG
214 """
215 _roothints = None
216
217 @classmethod
218 def setUpClass(cls):
219
220 # we don't need all the auth stuff
221 cls.setUpSockets()
222 cls.startResponders()
223
224 confdir = os.path.join('configs', cls._confdir)
225 cls.createConfigDir(confdir)
226
227 cls.generateRecursorConfig(confdir)
228 cls.startRecursor(confdir, cls._recursorPort)
229
230 @classmethod
231 def tearDownClass(cls):
232 cls.tearDownRecursor()
233
234 @classmethod
235 def generateRecursorConfig(cls, confdir):
236 super(NoExtendedErrorsRecursorTest, cls).generateRecursorConfig(confdir)
237
238 def testNotIncepted(self):
239 qname = 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
240 query = dns.message.make_query(qname, 'A', want_dnssec=True)
241
242 for method in ("sendUDPQuery", "sendTCPQuery"):
243 sender = getattr(self, method)
244 res = sender(query, timeout=5.0)
245 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
246 self.assertEqual(res.edns, 0)
247 self.assertEqual(len(res.options), 0)