]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Tags.py
3 import clientsubnetoption
4 from dnsdisttests
import DNSDistTest
6 class TestTags(DNSDistTest
):
9 newServer{address="127.0.0.1:%s"}
12 return DNSAction.None, ""
14 addAction(AllRule(), LuaAction(lol))
16 addAction("tag-me-dns-1.tags.tests.powerdns.com.", SetTagAction("dns", "value1"))
17 addAction("tag-me-dns-2.tags.tests.powerdns.com.", SetTagAction("dns", "value2"))
18 addAction("tag-me-response-1.tags.tests.powerdns.com.", SetTagAction("response", "value1"))
19 addAction("tag-me-response-2.tags.tests.powerdns.com.", SetTagAction("response", "value2"))
21 addAction(TagRule("not-dns"), SpoofAction("1.2.3.4"))
22 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
23 addAction(TagRule("dns"), SpoofAction("1.2.3.100"))
25 function responseHandlerSetTC(dr)
27 return DNSResponseAction.HeaderModify, ""
30 function responseHandlerUnsetQR(dr)
32 return DNSResponseAction.HeaderModify, ""
35 addResponseAction(TagRule("not-dns"), DropResponseAction())
36 addResponseAction(TagRule("response", "value1"), LuaResponseAction(responseHandlerSetTC))
37 addResponseAction(TagRule("response", "no-match-value"), DropResponseAction())
39 addResponseAction("tag-me-response-3.tags.tests.powerdns.com.", SetTagResponseAction("response-tag", "value"))
40 addResponseAction(TagRule("response-tag"), LuaResponseAction(responseHandlerUnsetQR))
43 def testQuestionNoTag(self
):
47 name
= 'no-match.tags.tests.powerdns.com.'
48 query
= dns
.message
.make_query(name
, 'A', 'IN')
49 response
= dns
.message
.make_response(query
)
50 rrset
= dns
.rrset
.from_text(name
,
55 response
.answer
.append(rrset
)
57 for method
in ("sendUDPQuery", "sendTCPQuery"):
58 sender
= getattr(self
, method
)
59 (receivedQuery
, receivedResponse
) = sender(query
, response
)
60 self
.assertTrue(receivedQuery
)
61 self
.assertTrue(receivedResponse
)
62 receivedQuery
.id = query
.id
63 self
.assertEqual(query
, receivedQuery
)
64 self
.assertEqual(response
, receivedResponse
)
66 def testQuestionMatchTagAndValue(self
):
68 Tag: Name and value match
70 name
= 'tag-me-dns-1.tags.tests.powerdns.com.'
71 query
= dns
.message
.make_query(name
, 'A', 'IN')
72 # dnsdist set RA = RD for spoofed responses
73 query
.flags
&= ~dns
.flags
.RD
74 expectedResponse
= dns
.message
.make_response(query
)
75 rrset
= dns
.rrset
.from_text(name
,
80 expectedResponse
.answer
.append(rrset
)
82 for method
in ("sendUDPQuery", "sendTCPQuery"):
83 sender
= getattr(self
, method
)
84 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
85 self
.assertTrue(receivedResponse
)
86 self
.assertEqual(expectedResponse
, receivedResponse
)
88 def testQuestionMatchTagOnly(self
):
92 name
= 'tag-me-dns-2.tags.tests.powerdns.com.'
93 query
= dns
.message
.make_query(name
, 'A', 'IN')
94 # dnsdist set RA = RD for spoofed responses
95 query
.flags
&= ~dns
.flags
.RD
96 expectedResponse
= dns
.message
.make_response(query
)
97 rrset
= dns
.rrset
.from_text(name
,
102 expectedResponse
.answer
.append(rrset
)
104 for method
in ("sendUDPQuery", "sendTCPQuery"):
105 sender
= getattr(self
, method
)
106 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
107 self
.assertTrue(receivedResponse
)
108 self
.assertEqual(expectedResponse
, receivedResponse
)
110 def testResponseNoMatch(self
):
112 Tag: Tag set on query does not match anything
114 name
= 'tag-me-response-2.tags.tests.powerdns.com.'
115 query
= dns
.message
.make_query(name
, 'A', 'IN')
116 response
= dns
.message
.make_response(query
)
117 rrset
= dns
.rrset
.from_text(name
,
122 response
.answer
.append(rrset
)
124 for method
in ("sendUDPQuery", "sendTCPQuery"):
125 sender
= getattr(self
, method
)
126 (receivedQuery
, receivedResponse
) = sender(query
, response
)
127 self
.assertTrue(receivedQuery
)
128 self
.assertTrue(receivedResponse
)
129 receivedQuery
.id = query
.id
130 self
.assertEqual(query
, receivedQuery
)
131 self
.assertEqual(response
, receivedResponse
)
133 def testResponseMatchTagAndValue(self
):
135 Tag: Tag and value set on query matches on response
137 name
= 'tag-me-response-1.tags.tests.powerdns.com.'
138 query
= dns
.message
.make_query(name
, 'A', 'IN')
139 # dnsdist set RA = RD for spoofed responses
140 query
.flags
&= ~dns
.flags
.RD
141 response
= dns
.message
.make_response(query
)
142 rrset
= dns
.rrset
.from_text(name
,
147 response
.answer
.append(rrset
)
148 expectedResponse
= dns
.message
.make_response(query
)
149 expectedResponse
.answer
.append(rrset
)
150 # we will set TC if the tag matches
151 expectedResponse
.flags |
= dns
.flags
.TC
153 for method
in ("sendUDPQuery", "sendTCPQuery"):
154 sender
= getattr(self
, method
)
155 (receivedQuery
, receivedResponse
) = sender(query
, response
)
156 self
.assertTrue(receivedQuery
)
157 self
.assertTrue(receivedResponse
)
158 receivedQuery
.id = query
.id
159 self
.assertEqual(query
, receivedQuery
)
160 self
.assertEqual(expectedResponse
, receivedResponse
)
162 def testResponseMatchResponseTagMatches(self
):
164 Tag: Tag set on response matches
166 name
= 'tag-me-response-3.tags.tests.powerdns.com.'
167 query
= dns
.message
.make_query(name
, 'A', 'IN')
168 # dnsdist set RA = RD for spoofed responses
169 query
.flags
&= ~dns
.flags
.RD
170 response
= dns
.message
.make_response(query
)
171 rrset
= dns
.rrset
.from_text(name
,
176 response
.answer
.append(rrset
)
177 expectedResponse
= dns
.message
.make_response(query
)
178 expectedResponse
.answer
.append(rrset
)
179 # we will set QR=0 if the tag matches
180 expectedResponse
.flags
&= ~dns
.flags
.QR
182 for method
in ("sendUDPQuery", "sendTCPQuery"):
183 sender
= getattr(self
, method
)
184 (receivedQuery
, receivedResponse
) = sender(query
, response
)
185 self
.assertTrue(receivedQuery
)
186 self
.assertTrue(receivedResponse
)
187 receivedQuery
.id = query
.id
188 self
.assertEqual(query
, receivedQuery
)
189 self
.assertEqual(expectedResponse
, receivedResponse
)
191 class TestSetTagAction(DNSDistTest
):
193 _config_template
= """
194 newServer{address="127.0.0.1:%s"}
196 addAction(AllRule(), SetTagAction("dns", "value1"))
197 addAction("tag-me-dns-2.tags.tests.powerdns.com.", SetTagAction("dns", "value2"))
199 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
200 addAction(TagRule("dns", "value2"), SpoofAction("1.2.3.4"))
204 def testSetTagDefault(self
):
207 Tag: Test setTag overwrites existing value
209 name
= 'tag-me-dns-1.tags.tests.powerdns.com.'
210 query
= dns
.message
.make_query(name
, 'A', 'IN')
211 # dnsdist set RA = RD for spoofed responses
212 query
.flags
&= ~dns
.flags
.RD
213 expectedResponse
= dns
.message
.make_response(query
)
214 rrset
= dns
.rrset
.from_text(name
,
219 expectedResponse
.answer
.append(rrset
)
221 for method
in ("sendUDPQuery", "sendTCPQuery"):
222 sender
= getattr(self
, method
)
223 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
224 self
.assertTrue(receivedResponse
)
225 self
.assertEqual(expectedResponse
, receivedResponse
)
227 def testSetTagOverwritten(self
):
230 Tag: Test setTag overwrites existing value
232 name
= 'tag-me-dns-2.tags.tests.powerdns.com.'
233 query
= dns
.message
.make_query(name
, 'A', 'IN')
234 # dnsdist set RA = RD for spoofed responses
235 query
.flags
&= ~dns
.flags
.RD
236 expectedResponse
= dns
.message
.make_response(query
)
237 rrset
= dns
.rrset
.from_text(name
,
242 expectedResponse
.answer
.append(rrset
)
244 for method
in ("sendUDPQuery", "sendTCPQuery"):
245 sender
= getattr(self
, method
)
246 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
247 self
.assertTrue(receivedResponse
)
248 self
.assertEqual(expectedResponse
, receivedResponse
)
250 class TestSetTag(DNSDistTest
):
252 _config_template
= """
253 newServer{address="127.0.0.1:%s"}
256 dq:setTag("dns", "value1")
257 if tostring(dq.qname) == 'tag-me-dns-2.tags.tests.powerdns.com.' then
258 dq:setTag("dns", "value2")
260 return DNSAction.None, ""
263 addAction(AllRule(), LuaAction(dqset))
265 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
266 addAction(TagRule("dns", "value2"), SpoofAction("1.2.3.4"))
270 def testSetTagDefault(self
):
273 Tag: Test setTag overwrites existing value
275 name
= 'tag-me-dns-1.tags.tests.powerdns.com.'
276 query
= dns
.message
.make_query(name
, 'A', 'IN')
277 # dnsdist set RA = RD for spoofed responses
278 query
.flags
&= ~dns
.flags
.RD
279 expectedResponse
= dns
.message
.make_response(query
)
280 rrset
= dns
.rrset
.from_text(name
,
285 expectedResponse
.answer
.append(rrset
)
287 for method
in ("sendUDPQuery", "sendTCPQuery"):
288 sender
= getattr(self
, method
)
289 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
290 self
.assertTrue(receivedResponse
)
291 self
.assertEqual(expectedResponse
, receivedResponse
)
293 def testSetTagOverwritten(self
):
296 Tag: Test setTag overwrites existing value
298 name
= 'tag-me-dns-2.tags.tests.powerdns.com.'
299 query
= dns
.message
.make_query(name
, 'A', 'IN')
300 # dnsdist set RA = RD for spoofed responses
301 query
.flags
&= ~dns
.flags
.RD
302 expectedResponse
= dns
.message
.make_response(query
)
303 rrset
= dns
.rrset
.from_text(name
,
308 expectedResponse
.answer
.append(rrset
)
310 for method
in ("sendUDPQuery", "sendTCPQuery"):
311 sender
= getattr(self
, method
)
312 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
313 self
.assertTrue(receivedResponse
)
314 self
.assertEqual(expectedResponse
, receivedResponse
)