]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Tags.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class TestBasics(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
13 return DNSAction.None, ""
15 addAction(AllRule(), LuaAction(lol))
17 addAction("tag-me-dns-1.tags.tests.powerdns.com.", TagAction("dns", "value1"))
18 addAction("tag-me-dns-2.tags.tests.powerdns.com.", TagAction("dns", "value2"))
19 addAction("tag-me-response-1.tags.tests.powerdns.com.", TagAction("response", "value1"))
20 addAction("tag-me-response-2.tags.tests.powerdns.com.", TagAction("response", "value2"))
22 addAction(TagRule("not-dns"), SpoofAction("1.2.3.4"))
23 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
24 addAction(TagRule("dns"), SpoofAction("1.2.3.100"))
26 function responseHandlerSetTC(dr)
28 return DNSResponseAction.HeaderModify, ""
31 function responseHandlerUnsetQR(dr)
33 return DNSResponseAction.HeaderModify, ""
36 addResponseAction(TagRule("not-dns"), DropResponseAction())
37 addResponseAction(TagRule("response", "value1"), LuaResponseAction(responseHandlerSetTC))
38 addResponseAction(TagRule("response", "no-match-value"), DropResponseAction())
40 addResponseAction("tag-me-response-3.tags.tests.powerdns.com.", TagResponseAction("response-tag", "value"))
41 addResponseAction(TagRule("response-tag"), LuaResponseAction(responseHandlerUnsetQR))
44 def testQuestionNoTag(self
):
48 name
= 'no-match.tags.tests.powerdns.com.'
49 query
= dns
.message
.make_query(name
, 'A', 'IN')
50 response
= dns
.message
.make_response(query
)
51 rrset
= dns
.rrset
.from_text(name
,
56 response
.answer
.append(rrset
)
58 for method
in ("sendUDPQuery", "sendTCPQuery"):
59 sender
= getattr(self
, method
)
60 (receivedQuery
, receivedResponse
) = sender(query
, response
)
61 self
.assertTrue(receivedQuery
)
62 self
.assertTrue(receivedResponse
)
63 receivedQuery
.id = query
.id
64 self
.assertEquals(query
, receivedQuery
)
65 self
.assertEquals(response
, receivedResponse
)
67 def testQuestionMatchTagAndValue(self
):
69 Tag: Name and value match
71 name
= 'tag-me-dns-1.tags.tests.powerdns.com.'
72 query
= dns
.message
.make_query(name
, 'A', 'IN')
73 # dnsdist set RA = RD for spoofed responses
74 query
.flags
&= ~dns
.flags
.RD
75 expectedResponse
= dns
.message
.make_response(query
)
76 rrset
= dns
.rrset
.from_text(name
,
81 expectedResponse
.answer
.append(rrset
)
83 for method
in ("sendUDPQuery", "sendTCPQuery"):
84 sender
= getattr(self
, method
)
85 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
86 self
.assertTrue(receivedResponse
)
87 self
.assertEquals(expectedResponse
, receivedResponse
)
89 def testQuestionMatchTagOnly(self
):
93 name
= 'tag-me-dns-2.tags.tests.powerdns.com.'
94 query
= dns
.message
.make_query(name
, 'A', 'IN')
95 # dnsdist set RA = RD for spoofed responses
96 query
.flags
&= ~dns
.flags
.RD
97 expectedResponse
= dns
.message
.make_response(query
)
98 rrset
= dns
.rrset
.from_text(name
,
103 expectedResponse
.answer
.append(rrset
)
105 for method
in ("sendUDPQuery", "sendTCPQuery"):
106 sender
= getattr(self
, method
)
107 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
108 self
.assertTrue(receivedResponse
)
109 self
.assertEquals(expectedResponse
, receivedResponse
)
111 def testResponseNoMatch(self
):
113 Tag: Tag set on query does not match anything
115 name
= 'tag-me-response-2.tags.tests.powerdns.com.'
116 query
= dns
.message
.make_query(name
, 'A', 'IN')
117 response
= dns
.message
.make_response(query
)
118 rrset
= dns
.rrset
.from_text(name
,
123 response
.answer
.append(rrset
)
125 for method
in ("sendUDPQuery", "sendTCPQuery"):
126 sender
= getattr(self
, method
)
127 (receivedQuery
, receivedResponse
) = sender(query
, response
)
128 self
.assertTrue(receivedQuery
)
129 self
.assertTrue(receivedResponse
)
130 receivedQuery
.id = query
.id
131 self
.assertEquals(query
, receivedQuery
)
132 self
.assertEquals(response
, receivedResponse
)
134 def testResponseMatchTagAndValue(self
):
136 Tag: Tag and value set on query matches on response
138 name
= 'tag-me-response-1.tags.tests.powerdns.com.'
139 query
= dns
.message
.make_query(name
, 'A', 'IN')
140 # dnsdist set RA = RD for spoofed responses
141 query
.flags
&= ~dns
.flags
.RD
142 response
= dns
.message
.make_response(query
)
143 rrset
= dns
.rrset
.from_text(name
,
148 response
.answer
.append(rrset
)
149 expectedResponse
= dns
.message
.make_response(query
)
150 expectedResponse
.answer
.append(rrset
)
151 # we will set TC if the tag matches
152 expectedResponse
.flags |
= dns
.flags
.TC
154 for method
in ("sendUDPQuery", "sendTCPQuery"):
155 sender
= getattr(self
, method
)
156 (receivedQuery
, receivedResponse
) = sender(query
, response
)
157 self
.assertTrue(receivedQuery
)
158 self
.assertTrue(receivedResponse
)
159 receivedQuery
.id = query
.id
160 self
.assertEquals(query
, receivedQuery
)
161 self
.assertEquals(expectedResponse
, receivedResponse
)
163 def testResponseMatchResponseTagMatches(self
):
165 Tag: Tag set on response matches
167 name
= 'tag-me-response-3.tags.tests.powerdns.com.'
168 query
= dns
.message
.make_query(name
, 'A', 'IN')
169 # dnsdist set RA = RD for spoofed responses
170 query
.flags
&= ~dns
.flags
.RD
171 response
= dns
.message
.make_response(query
)
172 rrset
= dns
.rrset
.from_text(name
,
177 response
.answer
.append(rrset
)
178 expectedResponse
= dns
.message
.make_response(query
)
179 expectedResponse
.answer
.append(rrset
)
180 # we will set QR=0 if the tag matches
181 expectedResponse
.flags
&= ~dns
.flags
.QR
183 for method
in ("sendUDPQuery", "sendTCPQuery"):
184 sender
= getattr(self
, method
)
185 (receivedQuery
, receivedResponse
) = sender(query
, response
)
186 self
.assertTrue(receivedQuery
)
187 self
.assertTrue(receivedResponse
)
188 receivedQuery
.id = query
.id
189 self
.assertEquals(query
, receivedQuery
)
190 self
.assertEquals(expectedResponse
, receivedResponse
)