]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_ExtendedErrors.py
Merge pull request #8918 from rgacogne/rec-edns-padding-plus-tests
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ExtendedErrors.py
1 import dns
2 import os
3 import extendederrors
4
5 from recursortests import RecursorTest
6
7 class ExtendedErrorsRecursorTest(RecursorTest):
8
9 _confdir = 'ExtendedErrors'
10 _config_template_default = """
11 dnssec=validate
12 daemon=no
13 trace=yes
14 packetcache-ttl=0
15 packetcache-servfail-ttl=0
16 max-cache-ttl=15
17 threads=1
18 loglevel=9
19 disable-syslog=yes
20 log-common-errors=yes
21 """
22 _config_template = """
23 extended-resolution-errors=yes
24 """
25 _lua_config_file = """
26 rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz.", extendedErrorCode=15, extendedErrorExtra='Blocked by RPZ!'})
27 """ % (_confdir)
28 _lua_dns_script_file = """
29 function preresolve(dq)
30 if dq.qname == newDN('fromlua.extended.') then
31 dq.extendedErrorCode = 10
32 dq.extendedErrorExtra = "Extra text from Lua!"
33 return true
34 end
35 if dq.qname == newDN('toolarge.extended.') then
36 dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
37 dq.extendedErrorCode = 10
38 dq.extendedErrorExtra = "Extra text from Lua!"
39 return true
40 end
41 return false
42 end
43
44 local ffi = require("ffi")
45
46 ffi.cdef[[
47 typedef struct pdns_ffi_param pdns_ffi_param_t;
48
49 const char* pdns_ffi_param_get_qname(pdns_ffi_param_t* ref) __attribute__ ((visibility ("default")));
50 void pdns_ffi_param_set_rcode(pdns_ffi_param_t* ref, int rcode) __attribute__ ((visibility ("default")));
51 void pdns_ffi_param_set_extended_error_code(pdns_ffi_param_t* ref, uint16_t code) __attribute__ ((visibility ("default")));
52 void pdns_ffi_param_set_extended_error_extra(pdns_ffi_param_t* ref, size_t len, const char* extra);
53 ]]
54
55 function gettag_ffi(obj)
56 local qname = ffi.string(ffi.C.pdns_ffi_param_get_qname(obj))
57 if qname == 'fromluaffi.extended' then
58 ffi.C.pdns_ffi_param_set_rcode(obj, 0)
59 ffi.C.pdns_ffi_param_set_extended_error_code(obj, 10)
60 local extra = 'Extra text from Lua FFI!'
61 ffi.C.pdns_ffi_param_set_extended_error_extra(obj, #extra, extra)
62 end
63 end
64 """ % ('A'*427)
65
66 _roothints = None
67
68 @classmethod
69 def setUpClass(cls):
70
71 # we don't need all the auth stuff
72 cls.setUpSockets()
73 cls.startResponders()
74
75 confdir = os.path.join('configs', cls._confdir)
76 cls.createConfigDir(confdir)
77
78 cls.generateRecursorConfig(confdir)
79 cls.startRecursor(confdir, cls._recursorPort)
80
81 @classmethod
82 def tearDownClass(cls):
83 cls.tearDownRecursor()
84
85 @classmethod
86 def generateRecursorConfig(cls, confdir):
87 rpzFilePath = os.path.join(confdir, 'zone.rpz')
88 with open(rpzFilePath, 'w') as rpzZone:
89 rpzZone.write("""$ORIGIN zone.rpz.
90 @ 3600 IN SOA {soa}
91 *.rpz.extended.zone.rpz. 60 IN CNAME .
92 """.format(soa=cls._SOA))
93
94 super(ExtendedErrorsRecursorTest, cls).generateRecursorConfig(confdir)
95
96 def testNotIncepted(self):
97 qname = 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
98 query = dns.message.make_query(qname, 'A', want_dnssec=True)
99
100 for method in ("sendUDPQuery", "sendTCPQuery"):
101 sender = getattr(self, method)
102 res = sender(query, timeout=5.0)
103 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
104 self.assertEqual(res.edns, 0)
105 self.assertEqual(len(res.options), 1)
106 self.assertEqual(res.options[0].otype, 15)
107 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(8, b''))
108
109 def testExpired(self):
110 qname = 'sigexpired.bad-dnssec.wb.sidnlabs.nl.'
111 query = dns.message.make_query(qname, 'A', want_dnssec=True)
112
113 for method in ("sendUDPQuery", "sendTCPQuery"):
114 sender = getattr(self, method)
115 res = sender(query, timeout=5.0)
116 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
117 self.assertEqual(res.edns, 0)
118 self.assertEqual(len(res.options), 1)
119 self.assertEqual(res.options[0].otype, 15)
120 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(7, b''))
121
122 def testBogus(self):
123 qname = 'bogussig.ok.bad-dnssec.wb.sidnlabs.nl.'
124 query = dns.message.make_query(qname, 'A', want_dnssec=True)
125
126 for method in ("sendUDPQuery", "sendTCPQuery"):
127 sender = getattr(self, method)
128 res = sender(query, timeout=5.0)
129 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
130 self.assertEqual(res.edns, 0)
131 self.assertEqual(len(res.options), 1)
132 self.assertEqual(res.options[0].otype, 15)
133 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(6, b''))
134
135 def testMissingRRSIG(self):
136 qname = 'brokendnssec.net.'
137 query = dns.message.make_query(qname, 'A', want_dnssec=True)
138
139 for method in ("sendUDPQuery", "sendTCPQuery"):
140 sender = getattr(self, method)
141 res = sender(query, timeout=5.0)
142 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
143 self.assertEqual(res.edns, 0)
144 self.assertEqual(len(res.options), 1)
145 self.assertEqual(res.options[0].otype, 15)
146 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b''))
147
148 def testFromLua(self):
149 qname = 'fromlua.extended.'
150 query = dns.message.make_query(qname, 'A', want_dnssec=True)
151
152 for method in ("sendUDPQuery", "sendTCPQuery"):
153 sender = getattr(self, method)
154 res = sender(query, timeout=5.0)
155 self.assertRcodeEqual(res, dns.rcode.NOERROR)
156 self.assertEqual(res.edns, 0)
157 self.assertEqual(len(res.options), 1)
158 self.assertEqual(res.options[0].otype, 15)
159 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua!'))
160
161 def testFromLuaFFI(self):
162 qname = 'fromluaffi.extended.'
163 query = dns.message.make_query(qname, 'A', want_dnssec=True)
164
165 for method in ("sendUDPQuery", "sendTCPQuery"):
166 sender = getattr(self, method)
167 res = sender(query, timeout=5.0)
168 self.assertRcodeEqual(res, dns.rcode.NOERROR)
169 self.assertEqual(res.edns, 0)
170 self.assertEqual(len(res.options), 1)
171 self.assertEqual(res.options[0].otype, 15)
172 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua FFI!'))
173
174 def testRPZ(self):
175 qname = 'sub.rpz.extended.'
176 query = dns.message.make_query(qname, 'A', want_dnssec=True)
177
178 for method in ("sendUDPQuery", "sendTCPQuery"):
179 sender = getattr(self, method)
180 res = sender(query, timeout=5.0)
181 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
182 self.assertEqual(res.edns, 0)
183 self.assertEqual(len(res.options), 1)
184 self.assertEqual(res.options[0].otype, 15)
185 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(15, b'Blocked by RPZ!'))
186
187 def testTooLarge(self):
188 qname = 'toolarge.extended.'
189 query = dns.message.make_query(qname, 'A', want_dnssec=True, payload=512)
190
191 # should not have the Extended Option since the packet is too large already
192 res = self.sendUDPQuery(query, timeout=5.0)
193 self.assertRcodeEqual(res, dns.rcode.NOERROR)
194 self.assertEquals(len(res.answer), 1)
195 self.assertEqual(res.edns, 0)
196 self.assertEqual(len(res.options), 0)
197
198 res = self.sendTCPQuery(query, timeout=5.0)
199 self.assertRcodeEqual(res, dns.rcode.NOERROR)
200 self.assertEquals(len(res.answer), 1)
201 self.assertEqual(res.edns, 0)
202 self.assertEqual(len(res.options), 1)
203 self.assertEqual(res.options[0].otype, 15)
204 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua!'))
205
206 class NoExtendedErrorsRecursorTest(RecursorTest):
207
208 _confdir = 'ExtendedErrorsDisabled'
209 _config_template_default = """
210 dnssec=validate
211 daemon=no
212 trace=yes
213 packetcache-ttl=0
214 packetcache-servfail-ttl=0
215 max-cache-ttl=15
216 threads=1
217 loglevel=9
218 disable-syslog=yes
219 log-common-errors=yes
220 """
221 _config_template = """
222 extended-resolution-errors=no
223 """
224 _roothints = None
225
226 @classmethod
227 def setUpClass(cls):
228
229 # we don't need all the auth stuff
230 cls.setUpSockets()
231 cls.startResponders()
232
233 confdir = os.path.join('configs', cls._confdir)
234 cls.createConfigDir(confdir)
235
236 cls.generateRecursorConfig(confdir)
237 cls.startRecursor(confdir, cls._recursorPort)
238
239 @classmethod
240 def tearDownClass(cls):
241 cls.tearDownRecursor()
242
243 @classmethod
244 def generateRecursorConfig(cls, confdir):
245 super(NoExtendedErrorsRecursorTest, cls).generateRecursorConfig(confdir)
246
247 def testNotIncepted(self):
248 qname = 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
249 query = dns.message.make_query(qname, 'A', want_dnssec=True)
250
251 for method in ("sendUDPQuery", "sendTCPQuery"):
252 sender = getattr(self, method)
253 res = sender(query, timeout=5.0)
254 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
255 self.assertEqual(res.edns, 0)
256 self.assertEqual(len(res.options), 0)