]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Tags.py
Merge pull request #13423 from phonedph1/patch-3
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Tags.py
CommitLineData
a76b0d63 1#!/usr/bin/env python
a76b0d63
RG
2import dns
3import clientsubnetoption
4from dnsdisttests import DNSDistTest
5
086a43eb 6class TestTags(DNSDistTest):
a76b0d63
RG
7
8 _config_template = """
9 newServer{address="127.0.0.1:%s"}
10
11 function lol(dq)
12 return DNSAction.None, ""
13 end
14 addAction(AllRule(), LuaAction(lol))
15
198d8159
RG
16 addAction("tag-me-dns-1.tags.tests.powerdns.com.", SetTagAction("dns", "value1"))
17 addAction("tag-me-dns-2.tags.tests.powerdns.com.", SetTagAction("dns", "value2"))
18 addAction("tag-me-response-1.tags.tests.powerdns.com.", SetTagAction("response", "value1"))
19 addAction("tag-me-response-2.tags.tests.powerdns.com.", SetTagAction("response", "value2"))
a76b0d63
RG
20
21 addAction(TagRule("not-dns"), SpoofAction("1.2.3.4"))
22 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
23 addAction(TagRule("dns"), SpoofAction("1.2.3.100"))
24
25 function responseHandlerSetTC(dr)
26 dr.dh:setTC(true)
27 return DNSResponseAction.HeaderModify, ""
28 end
29
30 function responseHandlerUnsetQR(dr)
31 dr.dh:setQR(false)
32 return DNSResponseAction.HeaderModify, ""
33 end
34
35 addResponseAction(TagRule("not-dns"), DropResponseAction())
36 addResponseAction(TagRule("response", "value1"), LuaResponseAction(responseHandlerSetTC))
37 addResponseAction(TagRule("response", "no-match-value"), DropResponseAction())
38
198d8159 39 addResponseAction("tag-me-response-3.tags.tests.powerdns.com.", SetTagResponseAction("response-tag", "value"))
a76b0d63
RG
40 addResponseAction(TagRule("response-tag"), LuaResponseAction(responseHandlerUnsetQR))
41 """
42
43 def testQuestionNoTag(self):
44 """
45 Tag: No match
46 """
47 name = 'no-match.tags.tests.powerdns.com.'
48 query = dns.message.make_query(name, 'A', 'IN')
49 response = dns.message.make_response(query)
50 rrset = dns.rrset.from_text(name,
51 3600,
52 dns.rdataclass.IN,
53 dns.rdatatype.A,
54 '127.0.0.1')
55 response.answer.append(rrset)
56
6ca2e796
RG
57 for method in ("sendUDPQuery", "sendTCPQuery"):
58 sender = getattr(self, method)
59 (receivedQuery, receivedResponse) = sender(query, response)
60 self.assertTrue(receivedQuery)
61 self.assertTrue(receivedResponse)
62 receivedQuery.id = query.id
4bfebc93
CH
63 self.assertEqual(query, receivedQuery)
64 self.assertEqual(response, receivedResponse)
a76b0d63
RG
65
66 def testQuestionMatchTagAndValue(self):
67 """
68 Tag: Name and value match
69 """
70 name = 'tag-me-dns-1.tags.tests.powerdns.com.'
71 query = dns.message.make_query(name, 'A', 'IN')
72 # dnsdist set RA = RD for spoofed responses
73 query.flags &= ~dns.flags.RD
74 expectedResponse = dns.message.make_response(query)
75 rrset = dns.rrset.from_text(name,
76 60,
77 dns.rdataclass.IN,
78 dns.rdatatype.A,
79 '1.2.3.50')
80 expectedResponse.answer.append(rrset)
81
6ca2e796
RG
82 for method in ("sendUDPQuery", "sendTCPQuery"):
83 sender = getattr(self, method)
84 (_, receivedResponse) = sender(query, response=None, useQueue=False)
85 self.assertTrue(receivedResponse)
4bfebc93 86 self.assertEqual(expectedResponse, receivedResponse)
a76b0d63
RG
87
88 def testQuestionMatchTagOnly(self):
89 """
90 Tag: Name matches
91 """
92 name = 'tag-me-dns-2.tags.tests.powerdns.com.'
93 query = dns.message.make_query(name, 'A', 'IN')
94 # dnsdist set RA = RD for spoofed responses
95 query.flags &= ~dns.flags.RD
96 expectedResponse = dns.message.make_response(query)
97 rrset = dns.rrset.from_text(name,
98 60,
99 dns.rdataclass.IN,
100 dns.rdatatype.A,
101 '1.2.3.100')
102 expectedResponse.answer.append(rrset)
103
6ca2e796
RG
104 for method in ("sendUDPQuery", "sendTCPQuery"):
105 sender = getattr(self, method)
106 (_, receivedResponse) = sender(query, response=None, useQueue=False)
107 self.assertTrue(receivedResponse)
4bfebc93 108 self.assertEqual(expectedResponse, receivedResponse)
a76b0d63
RG
109
110 def testResponseNoMatch(self):
111 """
112 Tag: Tag set on query does not match anything
113 """
114 name = 'tag-me-response-2.tags.tests.powerdns.com.'
115 query = dns.message.make_query(name, 'A', 'IN')
116 response = dns.message.make_response(query)
117 rrset = dns.rrset.from_text(name,
118 60,
119 dns.rdataclass.IN,
120 dns.rdatatype.A,
121 '192.0.2.1')
122 response.answer.append(rrset)
123
6ca2e796
RG
124 for method in ("sendUDPQuery", "sendTCPQuery"):
125 sender = getattr(self, method)
126 (receivedQuery, receivedResponse) = sender(query, response)
127 self.assertTrue(receivedQuery)
128 self.assertTrue(receivedResponse)
129 receivedQuery.id = query.id
4bfebc93
CH
130 self.assertEqual(query, receivedQuery)
131 self.assertEqual(response, receivedResponse)
a76b0d63
RG
132
133 def testResponseMatchTagAndValue(self):
134 """
135 Tag: Tag and value set on query matches on response
136 """
137 name = 'tag-me-response-1.tags.tests.powerdns.com.'
138 query = dns.message.make_query(name, 'A', 'IN')
139 # dnsdist set RA = RD for spoofed responses
140 query.flags &= ~dns.flags.RD
141 response = dns.message.make_response(query)
142 rrset = dns.rrset.from_text(name,
143 60,
144 dns.rdataclass.IN,
145 dns.rdatatype.A,
146 '1.2.3.100')
147 response.answer.append(rrset)
148 expectedResponse = dns.message.make_response(query)
149 expectedResponse.answer.append(rrset)
150 # we will set TC if the tag matches
151 expectedResponse.flags |= dns.flags.TC
152
6ca2e796
RG
153 for method in ("sendUDPQuery", "sendTCPQuery"):
154 sender = getattr(self, method)
155 (receivedQuery, receivedResponse) = sender(query, response)
156 self.assertTrue(receivedQuery)
157 self.assertTrue(receivedResponse)
158 receivedQuery.id = query.id
4bfebc93
CH
159 self.assertEqual(query, receivedQuery)
160 self.assertEqual(expectedResponse, receivedResponse)
a76b0d63
RG
161
162 def testResponseMatchResponseTagMatches(self):
163 """
164 Tag: Tag set on response matches
165 """
166 name = 'tag-me-response-3.tags.tests.powerdns.com.'
167 query = dns.message.make_query(name, 'A', 'IN')
168 # dnsdist set RA = RD for spoofed responses
169 query.flags &= ~dns.flags.RD
170 response = dns.message.make_response(query)
171 rrset = dns.rrset.from_text(name,
172 60,
173 dns.rdataclass.IN,
174 dns.rdatatype.A,
175 '1.2.3.100')
176 response.answer.append(rrset)
177 expectedResponse = dns.message.make_response(query)
178 expectedResponse.answer.append(rrset)
179 # we will set QR=0 if the tag matches
180 expectedResponse.flags &= ~dns.flags.QR
181
6ca2e796
RG
182 for method in ("sendUDPQuery", "sendTCPQuery"):
183 sender = getattr(self, method)
184 (receivedQuery, receivedResponse) = sender(query, response)
185 self.assertTrue(receivedQuery)
186 self.assertTrue(receivedResponse)
187 receivedQuery.id = query.id
4bfebc93
CH
188 self.assertEqual(query, receivedQuery)
189 self.assertEqual(expectedResponse, receivedResponse)
4170d08a
CHB
190
191class TestSetTagAction(DNSDistTest):
192
193 _config_template = """
194 newServer{address="127.0.0.1:%s"}
195
196 addAction(AllRule(), SetTagAction("dns", "value1"))
197 addAction("tag-me-dns-2.tags.tests.powerdns.com.", SetTagAction("dns", "value2"))
198
199 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
200 addAction(TagRule("dns", "value2"), SpoofAction("1.2.3.4"))
201
202 """
203
204 def testSetTagDefault(self):
205
206 """
207 Tag: Test setTag overwrites existing value
208 """
209 name = 'tag-me-dns-1.tags.tests.powerdns.com.'
210 query = dns.message.make_query(name, 'A', 'IN')
211 # dnsdist set RA = RD for spoofed responses
212 query.flags &= ~dns.flags.RD
213 expectedResponse = dns.message.make_response(query)
214 rrset = dns.rrset.from_text(name,
215 60,
216 dns.rdataclass.IN,
217 dns.rdatatype.A,
218 '1.2.3.50')
219 expectedResponse.answer.append(rrset)
220
221 for method in ("sendUDPQuery", "sendTCPQuery"):
222 sender = getattr(self, method)
223 (_, receivedResponse) = sender(query, response=None, useQueue=False)
224 self.assertTrue(receivedResponse)
225 self.assertEqual(expectedResponse, receivedResponse)
226
227 def testSetTagOverwritten(self):
228
229 """
230 Tag: Test setTag overwrites existing value
231 """
232 name = 'tag-me-dns-2.tags.tests.powerdns.com.'
233 query = dns.message.make_query(name, 'A', 'IN')
234 # dnsdist set RA = RD for spoofed responses
235 query.flags &= ~dns.flags.RD
236 expectedResponse = dns.message.make_response(query)
237 rrset = dns.rrset.from_text(name,
238 60,
239 dns.rdataclass.IN,
240 dns.rdatatype.A,
241 '1.2.3.4')
242 expectedResponse.answer.append(rrset)
243
244 for method in ("sendUDPQuery", "sendTCPQuery"):
245 sender = getattr(self, method)
246 (_, receivedResponse) = sender(query, response=None, useQueue=False)
247 self.assertTrue(receivedResponse)
248 self.assertEqual(expectedResponse, receivedResponse)
249
250class TestSetTag(DNSDistTest):
251
252 _config_template = """
253 newServer{address="127.0.0.1:%s"}
254
255 function dqset(dq)
256 dq:setTag("dns", "value1")
257 if tostring(dq.qname) == 'tag-me-dns-2.tags.tests.powerdns.com.' then
258 dq:setTag("dns", "value2")
259 end
260 return DNSAction.None, ""
261 end
262
263 addAction(AllRule(), LuaAction(dqset))
264
265 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
266 addAction(TagRule("dns", "value2"), SpoofAction("1.2.3.4"))
267
268 """
269
270 def testSetTagDefault(self):
271
272 """
273 Tag: Test setTag overwrites existing value
274 """
275 name = 'tag-me-dns-1.tags.tests.powerdns.com.'
276 query = dns.message.make_query(name, 'A', 'IN')
277 # dnsdist set RA = RD for spoofed responses
278 query.flags &= ~dns.flags.RD
279 expectedResponse = dns.message.make_response(query)
280 rrset = dns.rrset.from_text(name,
281 60,
282 dns.rdataclass.IN,
283 dns.rdatatype.A,
284 '1.2.3.50')
285 expectedResponse.answer.append(rrset)
286
287 for method in ("sendUDPQuery", "sendTCPQuery"):
288 sender = getattr(self, method)
289 (_, receivedResponse) = sender(query, response=None, useQueue=False)
290 self.assertTrue(receivedResponse)
291 self.assertEqual(expectedResponse, receivedResponse)
292
293 def testSetTagOverwritten(self):
294
295 """
296 Tag: Test setTag overwrites existing value
297 """
298 name = 'tag-me-dns-2.tags.tests.powerdns.com.'
299 query = dns.message.make_query(name, 'A', 'IN')
300 # dnsdist set RA = RD for spoofed responses
301 query.flags &= ~dns.flags.RD
302 expectedResponse = dns.message.make_response(query)
303 rrset = dns.rrset.from_text(name,
304 60,
305 dns.rdataclass.IN,
306 dns.rdatatype.A,
307 '1.2.3.4')
308 expectedResponse.answer.append(rrset)
309
310 for method in ("sendUDPQuery", "sendTCPQuery"):
311 sender = getattr(self, method)
312 (_, receivedResponse) = sender(query, response=None, useQueue=False)
313 self.assertTrue(receivedResponse)
314 self.assertEqual(expectedResponse, receivedResponse)