]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_ExtendedErrors.py
rec: dnspython's API changed wrt NSID, apply (version dependent) fix in regression...
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ExtendedErrors.py
CommitLineData
e95b2a7c
RG
1import dns
2import os
3import extendederrors
1c47d581 4import pytest
e95b2a7c
RG
5
6from recursortests import RecursorTest
7
8class ExtendedErrorsRecursorTest(RecursorTest):
9
10 _confdir = 'ExtendedErrors'
9425e4ca 11 _config_template = """
e95b2a7c 12dnssec=validate
9425e4ca 13extended-resolution-errors=yes
e95b2a7c 14"""
af231ceb
RG
15 _lua_config_file = """
16 rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz.", extendedErrorCode=15, extendedErrorExtra='Blocked by RPZ!'})
17 """ % (_confdir)
e95b2a7c
RG
18 _lua_dns_script_file = """
19 function preresolve(dq)
20 if dq.qname == newDN('fromlua.extended.') then
21 dq.extendedErrorCode = 10
22 dq.extendedErrorExtra = "Extra text from Lua!"
23 return true
24 end
25 if dq.qname == newDN('toolarge.extended.') then
26 dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
27 dq.extendedErrorCode = 10
28 dq.extendedErrorExtra = "Extra text from Lua!"
29 return true
30 end
31 return false
32 end
33
34 local ffi = require("ffi")
35
36 ffi.cdef[[
37 typedef struct pdns_ffi_param pdns_ffi_param_t;
38
39 const char* pdns_ffi_param_get_qname(pdns_ffi_param_t* ref) __attribute__ ((visibility ("default")));
40 void pdns_ffi_param_set_rcode(pdns_ffi_param_t* ref, int rcode) __attribute__ ((visibility ("default")));
41 void pdns_ffi_param_set_extended_error_code(pdns_ffi_param_t* ref, uint16_t code) __attribute__ ((visibility ("default")));
42 void pdns_ffi_param_set_extended_error_extra(pdns_ffi_param_t* ref, size_t len, const char* extra);
43 ]]
44
45 function gettag_ffi(obj)
46 local qname = ffi.string(ffi.C.pdns_ffi_param_get_qname(obj))
47 if qname == 'fromluaffi.extended' then
48 ffi.C.pdns_ffi_param_set_rcode(obj, 0)
49 ffi.C.pdns_ffi_param_set_extended_error_code(obj, 10)
50 local extra = 'Extra text from Lua FFI!'
51 ffi.C.pdns_ffi_param_set_extended_error_extra(obj, #extra, extra)
52 end
e95b2a7c
RG
53 end
54 """ % ('A'*427)
55
56 _roothints = None
57
58 @classmethod
59 def setUpClass(cls):
60
61 # we don't need all the auth stuff
62 cls.setUpSockets()
63 cls.startResponders()
64
65 confdir = os.path.join('configs', cls._confdir)
66 cls.createConfigDir(confdir)
67
68 cls.generateRecursorConfig(confdir)
69 cls.startRecursor(confdir, cls._recursorPort)
70
71 @classmethod
72 def tearDownClass(cls):
73 cls.tearDownRecursor()
74
75 @classmethod
76 def generateRecursorConfig(cls, confdir):
af231ceb
RG
77 rpzFilePath = os.path.join(confdir, 'zone.rpz')
78 with open(rpzFilePath, 'w') as rpzZone:
79 rpzZone.write("""$ORIGIN zone.rpz.
80@ 3600 IN SOA {soa}
81*.rpz.extended.zone.rpz. 60 IN CNAME .
82""".format(soa=cls._SOA))
83
e95b2a7c
RG
84 super(ExtendedErrorsRecursorTest, cls).generateRecursorConfig(confdir)
85
1c47d581 86 @pytest.mark.skip(reason="sidnlabs no longer serves thiss until further notice")
e95b2a7c
RG
87 def testNotIncepted(self):
88 qname = 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
89 query = dns.message.make_query(qname, 'A', want_dnssec=True)
90
91 for method in ("sendUDPQuery", "sendTCPQuery"):
92 sender = getattr(self, method)
93 res = sender(query, timeout=5.0)
94 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
95 self.assertEqual(res.edns, 0)
96 self.assertEqual(len(res.options), 1)
97 self.assertEqual(res.options[0].otype, 15)
98 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(8, b''))
99
1c47d581 100 @pytest.mark.skip(reason="sidnlabs no longer serves thiss until further notice")
e95b2a7c
RG
101 def testExpired(self):
102 qname = 'sigexpired.bad-dnssec.wb.sidnlabs.nl.'
103 query = dns.message.make_query(qname, 'A', want_dnssec=True)
104
105 for method in ("sendUDPQuery", "sendTCPQuery"):
106 sender = getattr(self, method)
107 res = sender(query, timeout=5.0)
108 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
109 self.assertEqual(res.edns, 0)
110 self.assertEqual(len(res.options), 1)
111 self.assertEqual(res.options[0].otype, 15)
112 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(7, b''))
113
0dd902cb
OM
114 def testAllExpired(self):
115 qname = 'servfail.nl.'
116 query = dns.message.make_query(qname, 'AAAA', want_dnssec=True)
117
118 for method in ("sendUDPQuery", "sendTCPQuery"):
119 sender = getattr(self, method)
120 res = sender(query, timeout=5.0)
121 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
122 self.assertEqual(res.edns, 0)
123 self.assertEqual(len(res.options), 1)
124 self.assertEqual(res.options[0].otype, 15)
2cd34ba5 125 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(6, b''))
0dd902cb 126
1c47d581 127 @pytest.mark.skip(reason="sidnlabs no longer serves thiss until further notice")
e95b2a7c 128 def testBogus(self):
af231ceb 129 qname = 'bogussig.ok.bad-dnssec.wb.sidnlabs.nl.'
e95b2a7c
RG
130 query = dns.message.make_query(qname, 'A', want_dnssec=True)
131
132 for method in ("sendUDPQuery", "sendTCPQuery"):
133 sender = getattr(self, method)
134 res = sender(query, timeout=5.0)
135 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
136 self.assertEqual(res.edns, 0)
137 self.assertEqual(len(res.options), 1)
138 self.assertEqual(res.options[0].otype, 15)
139 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(6, b''))
140
141 def testMissingRRSIG(self):
142 qname = 'brokendnssec.net.'
143 query = dns.message.make_query(qname, 'A', want_dnssec=True)
144
145 for method in ("sendUDPQuery", "sendTCPQuery"):
146 sender = getattr(self, method)
147 res = sender(query, timeout=5.0)
148 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
149 self.assertEqual(res.edns, 0)
150 self.assertEqual(len(res.options), 1)
151 self.assertEqual(res.options[0].otype, 15)
152 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b''))
153
154 def testFromLua(self):
155 qname = 'fromlua.extended.'
156 query = dns.message.make_query(qname, 'A', want_dnssec=True)
157
158 for method in ("sendUDPQuery", "sendTCPQuery"):
159 sender = getattr(self, method)
160 res = sender(query, timeout=5.0)
161 self.assertRcodeEqual(res, dns.rcode.NOERROR)
162 self.assertEqual(res.edns, 0)
163 self.assertEqual(len(res.options), 1)
164 self.assertEqual(res.options[0].otype, 15)
165 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua!'))
166
167 def testFromLuaFFI(self):
168 qname = 'fromluaffi.extended.'
169 query = dns.message.make_query(qname, 'A', want_dnssec=True)
170
171 for method in ("sendUDPQuery", "sendTCPQuery"):
172 sender = getattr(self, method)
173 res = sender(query, timeout=5.0)
174 self.assertRcodeEqual(res, dns.rcode.NOERROR)
175 self.assertEqual(res.edns, 0)
176 self.assertEqual(len(res.options), 1)
177 self.assertEqual(res.options[0].otype, 15)
178 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua FFI!'))
179
af231ceb
RG
180 def testRPZ(self):
181 qname = 'sub.rpz.extended.'
182 query = dns.message.make_query(qname, 'A', want_dnssec=True)
183
184 for method in ("sendUDPQuery", "sendTCPQuery"):
185 sender = getattr(self, method)
186 res = sender(query, timeout=5.0)
187 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
188 self.assertEqual(res.edns, 0)
189 self.assertEqual(len(res.options), 1)
190 self.assertEqual(res.options[0].otype, 15)
191 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(15, b'Blocked by RPZ!'))
192
e95b2a7c
RG
193 def testTooLarge(self):
194 qname = 'toolarge.extended.'
195 query = dns.message.make_query(qname, 'A', want_dnssec=True, payload=512)
196
197 # should not have the Extended Option since the packet is too large already
198 res = self.sendUDPQuery(query, timeout=5.0)
199 self.assertRcodeEqual(res, dns.rcode.NOERROR)
4bfebc93 200 self.assertEqual(len(res.answer), 1)
e95b2a7c
RG
201 self.assertEqual(res.edns, 0)
202 self.assertEqual(len(res.options), 0)
203
204 res = self.sendTCPQuery(query, timeout=5.0)
205 self.assertRcodeEqual(res, dns.rcode.NOERROR)
4bfebc93 206 self.assertEqual(len(res.answer), 1)
e95b2a7c
RG
207 self.assertEqual(res.edns, 0)
208 self.assertEqual(len(res.options), 1)
209 self.assertEqual(res.options[0].otype, 15)
210 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua!'))
211
212class NoExtendedErrorsRecursorTest(RecursorTest):
213
214 _confdir = 'ExtendedErrorsDisabled'
e95b2a7c 215 _config_template = """
9425e4ca
O
216dnssec=validate
217extended-resolution-errors=no
e95b2a7c
RG
218 """
219 _roothints = None
220
221 @classmethod
222 def setUpClass(cls):
223
224 # we don't need all the auth stuff
225 cls.setUpSockets()
226 cls.startResponders()
227
228 confdir = os.path.join('configs', cls._confdir)
229 cls.createConfigDir(confdir)
230
231 cls.generateRecursorConfig(confdir)
232 cls.startRecursor(confdir, cls._recursorPort)
233
234 @classmethod
235 def tearDownClass(cls):
236 cls.tearDownRecursor()
237
238 @classmethod
239 def generateRecursorConfig(cls, confdir):
240 super(NoExtendedErrorsRecursorTest, cls).generateRecursorConfig(confdir)
241
1c47d581 242 @pytest.mark.skip(reason="sidnlabs no longer serves thiss until further notice")
e95b2a7c
RG
243 def testNotIncepted(self):
244 qname = 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
245 query = dns.message.make_query(qname, 'A', want_dnssec=True)
246
247 for method in ("sendUDPQuery", "sendTCPQuery"):
248 sender = getattr(self, method)
249 res = sender(query, timeout=5.0)
250 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
251 self.assertEqual(res.edns, 0)
252 self.assertEqual(len(res.options), 0)