]>
Commit | Line | Data |
---|---|---|
788c3243 RG |
1 | #!/usr/bin/env python |
2 | from datetime import datetime, timedelta | |
3 | import time | |
4 | import dns | |
5 | from dnsdisttests import DNSDistTest | |
6 | ||
7 | class TestResponseRuleNXDelayed(DNSDistTest): | |
8 | ||
9 | _config_template = """ | |
10 | newServer{address="127.0.0.1:%s"} | |
d3ec24f9 | 11 | addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), DelayResponseAction(1000)) |
788c3243 RG |
12 | """ |
13 | ||
14 | def testNXDelayed(self): | |
15 | """ | |
16 | Responses: Delayed on NXDomain | |
17 | ||
18 | Send an A query to "delayed.responses.tests.powerdns.com.", | |
19 | check that the response delay is longer than 1000 ms | |
20 | for a NXDomain response over UDP, shorter for a NoError one. | |
21 | """ | |
22 | name = 'delayed.responses.tests.powerdns.com.' | |
23 | query = dns.message.make_query(name, 'A', 'IN') | |
24 | response = dns.message.make_response(query) | |
25 | ||
26 | # NX over UDP | |
27 | response.set_rcode(dns.rcode.NXDOMAIN) | |
28 | begin = datetime.now() | |
29 | (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) | |
30 | end = datetime.now() | |
31 | receivedQuery.id = query.id | |
32 | self.assertEquals(query, receivedQuery) | |
33 | self.assertEquals(response, receivedResponse) | |
34 | self.assertTrue((end - begin) > timedelta(0, 1)) | |
35 | ||
36 | # NoError over UDP | |
37 | response.set_rcode(dns.rcode.NOERROR) | |
38 | begin = datetime.now() | |
39 | (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) | |
40 | end = datetime.now() | |
41 | receivedQuery.id = query.id | |
42 | self.assertEquals(query, receivedQuery) | |
43 | self.assertEquals(response, receivedResponse) | |
44 | self.assertTrue((end - begin) < timedelta(0, 1)) | |
45 | ||
46 | # NX over TCP | |
47 | response.set_rcode(dns.rcode.NXDOMAIN) | |
48 | begin = datetime.now() | |
49 | (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) | |
50 | end = datetime.now() | |
51 | receivedQuery.id = query.id | |
52 | self.assertEquals(query, receivedQuery) | |
53 | self.assertEquals(response, receivedResponse) | |
54 | self.assertTrue((end - begin) < timedelta(0, 1)) | |
55 | ||
d83feb68 CH |
56 | class TestResponseRuleERCode(DNSDistTest): |
57 | ||
58 | _config_template = """ | |
59 | newServer{address="127.0.0.1:%s"} | |
d3ec24f9 | 60 | addResponseAction(ERCodeRule(DNSRCode.BADVERS), DelayResponseAction(1000)) |
d83feb68 CH |
61 | """ |
62 | ||
63 | def testBADVERSDelayed(self): | |
64 | """ | |
65 | Responses: Delayed on BADVERS | |
66 | ||
67 | Send an A query to "delayed.responses.tests.powerdns.com.", | |
68 | check that the response delay is longer than 1000 ms | |
69 | for a BADVERS response over UDP, shorter for BADKEY and NoError. | |
70 | """ | |
71 | name = 'delayed.responses.tests.powerdns.com.' | |
72 | query = dns.message.make_query(name, 'A', 'IN') | |
73 | response = dns.message.make_response(query) | |
74 | response.use_edns(edns=True) | |
75 | ||
76 | # BADVERS over UDP | |
77 | # BADVERS == 16, so rcode==0, ercode==1 | |
78 | response.set_rcode(dns.rcode.BADVERS) | |
79 | begin = datetime.now() | |
80 | (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) | |
81 | end = datetime.now() | |
82 | receivedQuery.id = query.id | |
83 | self.assertEquals(query, receivedQuery) | |
84 | self.assertEquals(response, receivedResponse) | |
85 | self.assertTrue((end - begin) > timedelta(0, 1)) | |
86 | ||
87 | # BADKEY (17, an ERCode) over UDP | |
88 | response.set_rcode(17) | |
89 | begin = datetime.now() | |
90 | (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) | |
91 | end = datetime.now() | |
92 | receivedQuery.id = query.id | |
93 | self.assertEquals(query, receivedQuery) | |
94 | self.assertEquals(response, receivedResponse) | |
95 | self.assertTrue((end - begin) < timedelta(0, 1)) | |
96 | ||
97 | # NoError (non-ERcode, basic RCode bits match BADVERS) over UDP | |
98 | response.set_rcode(dns.rcode.NOERROR) | |
99 | begin = datetime.now() | |
100 | (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) | |
101 | end = datetime.now() | |
102 | receivedQuery.id = query.id | |
103 | self.assertEquals(query, receivedQuery) | |
104 | self.assertEquals(response, receivedResponse) | |
105 | self.assertTrue((end - begin) < timedelta(0, 1)) | |
106 | ||
788c3243 RG |
107 | class TestResponseRuleQNameDropped(DNSDistTest): |
108 | ||
109 | _config_template = """ | |
110 | newServer{address="127.0.0.1:%s"} | |
111 | addResponseAction("drop.responses.tests.powerdns.com.", DropResponseAction()) | |
112 | """ | |
113 | ||
114 | def testDropped(self): | |
115 | """ | |
116 | Responses: Dropped on QName | |
117 | ||
118 | Send an A query to "drop.responses.tests.powerdns.com.", | |
119 | check that the response (not the query) is dropped. | |
120 | """ | |
121 | name = 'drop.responses.tests.powerdns.com.' | |
122 | query = dns.message.make_query(name, 'A', 'IN') | |
123 | response = dns.message.make_response(query) | |
124 | ||
6ca2e796 RG |
125 | for method in ("sendUDPQuery", "sendTCPQuery"): |
126 | sender = getattr(self, method) | |
127 | (receivedQuery, receivedResponse) = sender(query, response) | |
128 | receivedQuery.id = query.id | |
129 | self.assertEquals(query, receivedQuery) | |
130 | self.assertEquals(receivedResponse, None) | |
788c3243 RG |
131 | |
132 | def testNotDropped(self): | |
133 | """ | |
134 | Responses: NOT Dropped on QName | |
135 | ||
136 | Send an A query to "dontdrop.responses.tests.powerdns.com.", | |
137 | check that the response is not dropped. | |
138 | """ | |
139 | name = 'dontdrop.responses.tests.powerdns.com.' | |
140 | query = dns.message.make_query(name, 'A', 'IN') | |
141 | response = dns.message.make_response(query) | |
142 | ||
6ca2e796 RG |
143 | for method in ("sendUDPQuery", "sendTCPQuery"): |
144 | sender = getattr(self, method) | |
145 | (receivedQuery, receivedResponse) = sender(query, response) | |
146 | receivedQuery.id = query.id | |
147 | self.assertEquals(query, receivedQuery) | |
148 | self.assertEquals(response, receivedResponse) | |
788c3243 RG |
149 | |
150 | class TestResponseRuleQNameAllowed(DNSDistTest): | |
151 | ||
152 | _config_template = """ | |
153 | newServer{address="127.0.0.1:%s"} | |
154 | addResponseAction("allow.responses.tests.powerdns.com.", AllowResponseAction()) | |
155 | addResponseAction(AllRule(), DropResponseAction()) | |
156 | """ | |
157 | ||
158 | def testAllowed(self): | |
159 | """ | |
160 | Responses: Allowed on QName | |
161 | ||
162 | Send an A query to "allow.responses.tests.powerdns.com.", | |
163 | check that the response is allowed. | |
164 | """ | |
165 | name = 'allow.responses.tests.powerdns.com.' | |
166 | query = dns.message.make_query(name, 'A', 'IN') | |
167 | response = dns.message.make_response(query) | |
168 | ||
6ca2e796 RG |
169 | for method in ("sendUDPQuery", "sendTCPQuery"): |
170 | sender = getattr(self, method) | |
171 | (receivedQuery, receivedResponse) = sender(query, response) | |
172 | receivedQuery.id = query.id | |
173 | self.assertEquals(query, receivedQuery) | |
174 | self.assertEquals(response, receivedResponse) | |
788c3243 RG |
175 | |
176 | def testNotAllowed(self): | |
177 | """ | |
178 | Responses: Not allowed on QName | |
179 | ||
180 | Send an A query to "dontallow.responses.tests.powerdns.com.", | |
181 | check that the response is dropped. | |
182 | """ | |
183 | name = 'dontallow.responses.tests.powerdns.com.' | |
184 | query = dns.message.make_query(name, 'A', 'IN') | |
185 | response = dns.message.make_response(query) | |
186 | ||
6ca2e796 RG |
187 | for method in ("sendUDPQuery", "sendTCPQuery"): |
188 | sender = getattr(self, method) | |
189 | (receivedQuery, receivedResponse) = sender(query, response) | |
190 | receivedQuery.id = query.id | |
191 | self.assertEquals(query, receivedQuery) | |
192 | self.assertEquals(receivedResponse, None) | |
153d5065 RG |
193 | |
194 | class TestResponseRuleEditTTL(DNSDistTest): | |
195 | ||
196 | _ttl = 5 | |
197 | _config_params = ['_testServerPort', '_ttl'] | |
198 | _config_template = """ | |
199 | newServer{address="127.0.0.1:%s"} | |
200 | ||
201 | function editTTLCallback(section, class, type, ttl) | |
202 | return %d | |
203 | end | |
204 | ||
205 | function editTTLFunc(dr) | |
206 | dr:editTTLs(editTTLCallback) | |
207 | return DNSAction.None, "" | |
208 | end | |
209 | ||
a2ff35e3 | 210 | addResponseAction(AllRule(), LuaResponseAction(editTTLFunc)) |
153d5065 RG |
211 | """ |
212 | ||
213 | def testTTLEdited(self): | |
214 | """ | |
215 | Responses: Alter the TTLs | |
216 | """ | |
217 | name = 'editttl.responses.tests.powerdns.com.' | |
218 | query = dns.message.make_query(name, 'A', 'IN') | |
219 | response = dns.message.make_response(query) | |
220 | rrset = dns.rrset.from_text(name, | |
221 | 3600, | |
222 | dns.rdataclass.IN, | |
223 | dns.rdatatype.A, | |
224 | '192.0.2.1') | |
225 | response.answer.append(rrset) | |
226 | ||
6ca2e796 RG |
227 | for method in ("sendUDPQuery", "sendTCPQuery"): |
228 | sender = getattr(self, method) | |
229 | (receivedQuery, receivedResponse) = sender(query, response) | |
230 | receivedQuery.id = query.id | |
231 | self.assertEquals(query, receivedQuery) | |
232 | self.assertEquals(response, receivedResponse) | |
233 | self.assertNotEquals(response.answer[0].ttl, receivedResponse.answer[0].ttl) | |
234 | self.assertEquals(receivedResponse.answer[0].ttl, self._ttl) | |
63cdc73d CHB |
235 | |
236 | class TestResponseLuaActionReturnSyntax(DNSDistTest): | |
237 | ||
238 | _config_template = """ | |
239 | newServer{address="127.0.0.1:%s"} | |
240 | function customDelay(dr) | |
241 | return DNSResponseAction.Delay, "1000" | |
242 | end | |
243 | function customDrop(dr) | |
244 | return DNSResponseAction.Drop | |
245 | end | |
246 | addResponseAction("drop.responses.tests.powerdns.com.", LuaResponseAction(customDrop)) | |
d3ec24f9 | 247 | addResponseAction(RCodeRule(DNSRCode.NXDOMAIN), LuaResponseAction(customDelay)) |
63cdc73d CHB |
248 | """ |
249 | ||
250 | def testResponseActionDelayed(self): | |
251 | """ | |
252 | Responses: Delayed via LuaResponseAction | |
253 | ||
254 | Send an A query to "delayed.responses.tests.powerdns.com.", | |
255 | check that the response delay is longer than 1000 ms | |
256 | for a NXDomain response over UDP, shorter for a NoError one. | |
257 | """ | |
258 | name = 'delayed.responses.tests.powerdns.com.' | |
259 | query = dns.message.make_query(name, 'A', 'IN') | |
260 | response = dns.message.make_response(query) | |
261 | ||
262 | # NX over UDP | |
263 | response.set_rcode(dns.rcode.NXDOMAIN) | |
264 | begin = datetime.now() | |
265 | (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) | |
266 | end = datetime.now() | |
267 | receivedQuery.id = query.id | |
268 | self.assertEquals(query, receivedQuery) | |
269 | self.assertEquals(response, receivedResponse) | |
270 | self.assertTrue((end - begin) > timedelta(0, 1)) | |
271 | ||
272 | def testDropped(self): | |
273 | """ | |
274 | Responses: Dropped via user defined LuaResponseAction | |
275 | ||
276 | Send an A query to "drop.responses.tests.powerdns.com.", | |
277 | check that the response (not the query) is dropped. | |
278 | """ | |
279 | name = 'drop.responses.tests.powerdns.com.' | |
280 | query = dns.message.make_query(name, 'A', 'IN') | |
281 | response = dns.message.make_response(query) | |
282 | ||
6ca2e796 RG |
283 | for method in ("sendUDPQuery", "sendTCPQuery"): |
284 | sender = getattr(self, method) | |
285 | (receivedQuery, receivedResponse) = sender(query, response) | |
286 | receivedQuery.id = query.id | |
287 | self.assertEquals(query, receivedQuery) | |
288 | self.assertEquals(receivedResponse, None) |