]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Tags.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class TestBasics(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
13 return DNSAction.None, ""
15 addAction(AllRule(), LuaAction(lol))
17 addAction("tag-me-dns-1.tags.tests.powerdns.com.", TagAction("dns", "value1"))
18 addAction("tag-me-dns-2.tags.tests.powerdns.com.", TagAction("dns", "value2"))
19 addAction("tag-me-response-1.tags.tests.powerdns.com.", TagAction("response", "value1"))
20 addAction("tag-me-response-2.tags.tests.powerdns.com.", TagAction("response", "value2"))
22 addAction(TagRule("not-dns"), SpoofAction("1.2.3.4"))
23 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
24 addAction(TagRule("dns"), SpoofAction("1.2.3.100"))
26 function responseHandlerSetTC(dr)
28 return DNSResponseAction.HeaderModify, ""
31 function responseHandlerUnsetQR(dr)
33 return DNSResponseAction.HeaderModify, ""
36 addResponseAction(TagRule("not-dns"), DropResponseAction())
37 addResponseAction(TagRule("response", "value1"), LuaResponseAction(responseHandlerSetTC))
38 addResponseAction(TagRule("response", "no-match-value"), DropResponseAction())
40 addResponseAction("tag-me-response-3.tags.tests.powerdns.com.", TagResponseAction("response-tag", "value"))
41 addResponseAction(TagRule("response-tag"), LuaResponseAction(responseHandlerUnsetQR))
44 def testQuestionNoTag(self
):
48 name
= 'no-match.tags.tests.powerdns.com.'
49 query
= dns
.message
.make_query(name
, 'A', 'IN')
50 response
= dns
.message
.make_response(query
)
51 rrset
= dns
.rrset
.from_text(name
,
56 response
.answer
.append(rrset
)
58 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
59 self
.assertTrue(receivedQuery
)
60 self
.assertTrue(receivedResponse
)
61 receivedQuery
.id = query
.id
62 self
.assertEquals(query
, receivedQuery
)
63 self
.assertEquals(response
, receivedResponse
)
65 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
66 self
.assertTrue(receivedQuery
)
67 self
.assertTrue(receivedResponse
)
68 receivedQuery
.id = query
.id
69 self
.assertEquals(query
, receivedQuery
)
70 self
.assertEquals(response
, receivedResponse
)
72 def testQuestionMatchTagAndValue(self
):
74 Tag: Name and value match
76 name
= 'tag-me-dns-1.tags.tests.powerdns.com.'
77 query
= dns
.message
.make_query(name
, 'A', 'IN')
78 # dnsdist set RA = RD for spoofed responses
79 query
.flags
&= ~dns
.flags
.RD
80 expectedResponse
= dns
.message
.make_response(query
)
81 rrset
= dns
.rrset
.from_text(name
,
86 expectedResponse
.answer
.append(rrset
)
88 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
89 self
.assertTrue(receivedResponse
)
90 self
.assertEquals(expectedResponse
, receivedResponse
)
92 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
93 self
.assertTrue(receivedResponse
)
94 self
.assertEquals(expectedResponse
, receivedResponse
)
96 def testQuestionMatchTagOnly(self
):
100 name
= 'tag-me-dns-2.tags.tests.powerdns.com.'
101 query
= dns
.message
.make_query(name
, 'A', 'IN')
102 # dnsdist set RA = RD for spoofed responses
103 query
.flags
&= ~dns
.flags
.RD
104 expectedResponse
= dns
.message
.make_response(query
)
105 rrset
= dns
.rrset
.from_text(name
,
110 expectedResponse
.answer
.append(rrset
)
112 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
113 self
.assertTrue(receivedResponse
)
114 self
.assertEquals(expectedResponse
, receivedResponse
)
116 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
117 self
.assertTrue(receivedResponse
)
118 self
.assertEquals(expectedResponse
, receivedResponse
)
120 def testResponseNoMatch(self
):
122 Tag: Tag set on query does not match anything
124 name
= 'tag-me-response-2.tags.tests.powerdns.com.'
125 query
= dns
.message
.make_query(name
, 'A', 'IN')
126 response
= dns
.message
.make_response(query
)
127 rrset
= dns
.rrset
.from_text(name
,
132 response
.answer
.append(rrset
)
134 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
135 self
.assertTrue(receivedQuery
)
136 self
.assertTrue(receivedResponse
)
137 receivedQuery
.id = query
.id
138 self
.assertEquals(query
, receivedQuery
)
139 self
.assertEquals(response
, receivedResponse
)
141 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
142 self
.assertTrue(receivedQuery
)
143 self
.assertTrue(receivedResponse
)
144 receivedQuery
.id = query
.id
145 self
.assertEquals(query
, receivedQuery
)
146 self
.assertEquals(response
, receivedResponse
)
148 def testResponseMatchTagAndValue(self
):
150 Tag: Tag and value set on query matches on response
152 name
= 'tag-me-response-1.tags.tests.powerdns.com.'
153 query
= dns
.message
.make_query(name
, 'A', 'IN')
154 # dnsdist set RA = RD for spoofed responses
155 query
.flags
&= ~dns
.flags
.RD
156 response
= dns
.message
.make_response(query
)
157 rrset
= dns
.rrset
.from_text(name
,
162 response
.answer
.append(rrset
)
163 expectedResponse
= dns
.message
.make_response(query
)
164 expectedResponse
.answer
.append(rrset
)
165 # we will set TC if the tag matches
166 expectedResponse
.flags |
= dns
.flags
.TC
168 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
169 self
.assertTrue(receivedQuery
)
170 self
.assertTrue(receivedResponse
)
171 receivedQuery
.id = query
.id
172 self
.assertEquals(query
, receivedQuery
)
173 print(expectedResponse
)
174 print(receivedResponse
)
175 self
.assertEquals(expectedResponse
, receivedResponse
)
177 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
178 self
.assertTrue(receivedQuery
)
179 self
.assertTrue(receivedResponse
)
180 receivedQuery
.id = query
.id
181 self
.assertEquals(query
, receivedQuery
)
182 self
.assertEquals(expectedResponse
, receivedResponse
)
184 def testResponseMatchResponseTagMatches(self
):
186 Tag: Tag set on response matches
188 name
= 'tag-me-response-3.tags.tests.powerdns.com.'
189 query
= dns
.message
.make_query(name
, 'A', 'IN')
190 # dnsdist set RA = RD for spoofed responses
191 query
.flags
&= ~dns
.flags
.RD
192 response
= dns
.message
.make_response(query
)
193 rrset
= dns
.rrset
.from_text(name
,
198 response
.answer
.append(rrset
)
199 expectedResponse
= dns
.message
.make_response(query
)
200 expectedResponse
.answer
.append(rrset
)
201 # we will set QR=0 if the tag matches
202 expectedResponse
.flags
&= ~dns
.flags
.QR
204 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
205 self
.assertTrue(receivedQuery
)
206 self
.assertTrue(receivedResponse
)
207 receivedQuery
.id = query
.id
208 self
.assertEquals(query
, receivedQuery
)
209 self
.assertEquals(expectedResponse
, receivedResponse
)
211 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
212 self
.assertTrue(receivedQuery
)
213 self
.assertTrue(receivedResponse
)
214 receivedQuery
.id = query
.id
215 self
.assertEquals(query
, receivedQuery
)
216 self
.assertEquals(expectedResponse
, receivedResponse
)