]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_ExtendedErrors.py
rec: Add Lua bindings, regression tests for Extended DNS Errors
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_ExtendedErrors.py
CommitLineData
e95b2a7c
RG
1import dns
2import os
3import extendederrors
4
5from recursortests import RecursorTest
6
7class ExtendedErrorsRecursorTest(RecursorTest):
8
9 _confdir = 'ExtendedErrors'
10 _config_template_default = """
11dnssec=validate
12daemon=no
13trace=yes
14packetcache-ttl=0
15packetcache-servfail-ttl=0
16max-cache-ttl=15
17threads=1
18loglevel=9
19disable-syslog=yes
20log-common-errors=yes
21"""
22 _config_template = """
23 extended-errors=yes
24 """
25 _lua_dns_script_file = """
26 function preresolve(dq)
27 if dq.qname == newDN('fromlua.extended.') then
28 dq.extendedErrorCode = 10
29 dq.extendedErrorExtra = "Extra text from Lua!"
30 return true
31 end
32 if dq.qname == newDN('toolarge.extended.') then
33 dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
34 dq.extendedErrorCode = 10
35 dq.extendedErrorExtra = "Extra text from Lua!"
36 return true
37 end
38 return false
39 end
40
41 local ffi = require("ffi")
42
43 ffi.cdef[[
44 typedef struct pdns_ffi_param pdns_ffi_param_t;
45
46 const char* pdns_ffi_param_get_qname(pdns_ffi_param_t* ref) __attribute__ ((visibility ("default")));
47 void pdns_ffi_param_set_rcode(pdns_ffi_param_t* ref, int rcode) __attribute__ ((visibility ("default")));
48 void pdns_ffi_param_set_extended_error_code(pdns_ffi_param_t* ref, uint16_t code) __attribute__ ((visibility ("default")));
49 void pdns_ffi_param_set_extended_error_extra(pdns_ffi_param_t* ref, size_t len, const char* extra);
50 ]]
51
52 function gettag_ffi(obj)
53 local qname = ffi.string(ffi.C.pdns_ffi_param_get_qname(obj))
54 if qname == 'fromluaffi.extended' then
55 ffi.C.pdns_ffi_param_set_rcode(obj, 0)
56 ffi.C.pdns_ffi_param_set_extended_error_code(obj, 10)
57 local extra = 'Extra text from Lua FFI!'
58 ffi.C.pdns_ffi_param_set_extended_error_extra(obj, #extra, extra)
59 end
60 return {}
61 end
62 """ % ('A'*427)
63
64 _roothints = None
65
66 @classmethod
67 def setUpClass(cls):
68
69 # we don't need all the auth stuff
70 cls.setUpSockets()
71 cls.startResponders()
72
73 confdir = os.path.join('configs', cls._confdir)
74 cls.createConfigDir(confdir)
75
76 cls.generateRecursorConfig(confdir)
77 cls.startRecursor(confdir, cls._recursorPort)
78
79 @classmethod
80 def tearDownClass(cls):
81 cls.tearDownRecursor()
82
83 @classmethod
84 def generateRecursorConfig(cls, confdir):
85 super(ExtendedErrorsRecursorTest, cls).generateRecursorConfig(confdir)
86
87 def testNotIncepted(self):
88 qname = 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
89 query = dns.message.make_query(qname, 'A', want_dnssec=True)
90
91 for method in ("sendUDPQuery", "sendTCPQuery"):
92 sender = getattr(self, method)
93 res = sender(query, timeout=5.0)
94 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
95 self.assertEqual(res.edns, 0)
96 self.assertEqual(len(res.options), 1)
97 self.assertEqual(res.options[0].otype, 15)
98 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(8, b''))
99
100 def testExpired(self):
101 qname = 'sigexpired.bad-dnssec.wb.sidnlabs.nl.'
102 query = dns.message.make_query(qname, 'A', want_dnssec=True)
103
104 for method in ("sendUDPQuery", "sendTCPQuery"):
105 sender = getattr(self, method)
106 res = sender(query, timeout=5.0)
107 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
108 self.assertEqual(res.edns, 0)
109 self.assertEqual(len(res.options), 1)
110 self.assertEqual(res.options[0].otype, 15)
111 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(7, b''))
112
113 def testBogus(self):
114 qname = 'unknownalgorithm.bad-dnssec.wb.sidnlabs.nl.'
115 query = dns.message.make_query(qname, 'A', want_dnssec=True)
116
117 for method in ("sendUDPQuery", "sendTCPQuery"):
118 sender = getattr(self, method)
119 res = sender(query, timeout=5.0)
120 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
121 self.assertEqual(res.edns, 0)
122 self.assertEqual(len(res.options), 1)
123 self.assertEqual(res.options[0].otype, 15)
124 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(6, b''))
125
126 def testMissingRRSIG(self):
127 qname = 'brokendnssec.net.'
128 query = dns.message.make_query(qname, 'A', want_dnssec=True)
129
130 for method in ("sendUDPQuery", "sendTCPQuery"):
131 sender = getattr(self, method)
132 res = sender(query, timeout=5.0)
133 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
134 self.assertEqual(res.edns, 0)
135 self.assertEqual(len(res.options), 1)
136 self.assertEqual(res.options[0].otype, 15)
137 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b''))
138
139 def testFromLua(self):
140 qname = 'fromlua.extended.'
141 query = dns.message.make_query(qname, 'A', want_dnssec=True)
142
143 for method in ("sendUDPQuery", "sendTCPQuery"):
144 sender = getattr(self, method)
145 res = sender(query, timeout=5.0)
146 self.assertRcodeEqual(res, dns.rcode.NOERROR)
147 self.assertEqual(res.edns, 0)
148 self.assertEqual(len(res.options), 1)
149 self.assertEqual(res.options[0].otype, 15)
150 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua!'))
151
152 def testFromLuaFFI(self):
153 qname = 'fromluaffi.extended.'
154 query = dns.message.make_query(qname, 'A', want_dnssec=True)
155
156 for method in ("sendUDPQuery", "sendTCPQuery"):
157 sender = getattr(self, method)
158 res = sender(query, timeout=5.0)
159 self.assertRcodeEqual(res, dns.rcode.NOERROR)
160 self.assertEqual(res.edns, 0)
161 self.assertEqual(len(res.options), 1)
162 self.assertEqual(res.options[0].otype, 15)
163 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua FFI!'))
164
165 def testTooLarge(self):
166 qname = 'toolarge.extended.'
167 query = dns.message.make_query(qname, 'A', want_dnssec=True, payload=512)
168
169 # should not have the Extended Option since the packet is too large already
170 res = self.sendUDPQuery(query, timeout=5.0)
171 self.assertRcodeEqual(res, dns.rcode.NOERROR)
172 self.assertEquals(len(res.answer), 1)
173 self.assertEqual(res.edns, 0)
174 self.assertEqual(len(res.options), 0)
175
176 res = self.sendTCPQuery(query, timeout=5.0)
177 self.assertRcodeEqual(res, dns.rcode.NOERROR)
178 self.assertEquals(len(res.answer), 1)
179 self.assertEqual(res.edns, 0)
180 self.assertEqual(len(res.options), 1)
181 self.assertEqual(res.options[0].otype, 15)
182 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua!'))
183
184class NoExtendedErrorsRecursorTest(RecursorTest):
185
186 _confdir = 'ExtendedErrorsDisabled'
187 _config_template_default = """
188dnssec=validate
189daemon=no
190trace=yes
191packetcache-ttl=0
192packetcache-servfail-ttl=0
193max-cache-ttl=15
194threads=1
195loglevel=9
196disable-syslog=yes
197log-common-errors=yes
198"""
199 _config_template = """
200 extended-errors=no
201 """
202 _roothints = None
203
204 @classmethod
205 def setUpClass(cls):
206
207 # we don't need all the auth stuff
208 cls.setUpSockets()
209 cls.startResponders()
210
211 confdir = os.path.join('configs', cls._confdir)
212 cls.createConfigDir(confdir)
213
214 cls.generateRecursorConfig(confdir)
215 cls.startRecursor(confdir, cls._recursorPort)
216
217 @classmethod
218 def tearDownClass(cls):
219 cls.tearDownRecursor()
220
221 @classmethod
222 def generateRecursorConfig(cls, confdir):
223 super(NoExtendedErrorsRecursorTest, cls).generateRecursorConfig(confdir)
224
225 def testNotIncepted(self):
226 qname = 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
227 query = dns.message.make_query(qname, 'A', want_dnssec=True)
228
229 for method in ("sendUDPQuery", "sendTCPQuery"):
230 sender = getattr(self, method)
231 res = sender(query, timeout=5.0)
232 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
233 self.assertEqual(res.edns, 0)
234 self.assertEqual(len(res.options), 0)