]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Tags.py
3 import clientsubnetoption
4 from dnsdisttests
import DNSDistTest
6 class TestBasics(DNSDistTest
):
9 newServer{address="127.0.0.1:%s"}
12 return DNSAction.None, ""
14 addAction(AllRule(), LuaAction(lol))
16 addAction("tag-me-dns-1.tags.tests.powerdns.com.", TagAction("dns", "value1"))
17 addAction("tag-me-dns-2.tags.tests.powerdns.com.", TagAction("dns", "value2"))
18 addAction("tag-me-response-1.tags.tests.powerdns.com.", TagAction("response", "value1"))
19 addAction("tag-me-response-2.tags.tests.powerdns.com.", TagAction("response", "value2"))
21 addAction(TagRule("not-dns"), SpoofAction("1.2.3.4"))
22 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
23 addAction(TagRule("dns"), SpoofAction("1.2.3.100"))
25 function responseHandlerSetTC(dr)
27 return DNSResponseAction.HeaderModify, ""
30 function responseHandlerUnsetQR(dr)
32 return DNSResponseAction.HeaderModify, ""
35 addResponseAction(TagRule("not-dns"), DropResponseAction())
36 addResponseAction(TagRule("response", "value1"), LuaResponseAction(responseHandlerSetTC))
37 addResponseAction(TagRule("response", "no-match-value"), DropResponseAction())
39 addResponseAction("tag-me-response-3.tags.tests.powerdns.com.", TagResponseAction("response-tag", "value"))
40 addResponseAction(TagRule("response-tag"), LuaResponseAction(responseHandlerUnsetQR))
43 def testQuestionNoTag(self
):
47 name
= 'no-match.tags.tests.powerdns.com.'
48 query
= dns
.message
.make_query(name
, 'A', 'IN')
49 response
= dns
.message
.make_response(query
)
50 rrset
= dns
.rrset
.from_text(name
,
55 response
.answer
.append(rrset
)
57 for method
in ("sendUDPQuery", "sendTCPQuery"):
58 sender
= getattr(self
, method
)
59 (receivedQuery
, receivedResponse
) = sender(query
, response
)
60 self
.assertTrue(receivedQuery
)
61 self
.assertTrue(receivedResponse
)
62 receivedQuery
.id = query
.id
63 self
.assertEquals(query
, receivedQuery
)
64 self
.assertEquals(response
, receivedResponse
)
66 def testQuestionMatchTagAndValue(self
):
68 Tag: Name and value match
70 name
= 'tag-me-dns-1.tags.tests.powerdns.com.'
71 query
= dns
.message
.make_query(name
, 'A', 'IN')
72 # dnsdist set RA = RD for spoofed responses
73 query
.flags
&= ~dns
.flags
.RD
74 expectedResponse
= dns
.message
.make_response(query
)
75 rrset
= dns
.rrset
.from_text(name
,
80 expectedResponse
.answer
.append(rrset
)
82 for method
in ("sendUDPQuery", "sendTCPQuery"):
83 sender
= getattr(self
, method
)
84 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
85 self
.assertTrue(receivedResponse
)
86 self
.assertEquals(expectedResponse
, receivedResponse
)
88 def testQuestionMatchTagOnly(self
):
92 name
= 'tag-me-dns-2.tags.tests.powerdns.com.'
93 query
= dns
.message
.make_query(name
, 'A', 'IN')
94 # dnsdist set RA = RD for spoofed responses
95 query
.flags
&= ~dns
.flags
.RD
96 expectedResponse
= dns
.message
.make_response(query
)
97 rrset
= dns
.rrset
.from_text(name
,
102 expectedResponse
.answer
.append(rrset
)
104 for method
in ("sendUDPQuery", "sendTCPQuery"):
105 sender
= getattr(self
, method
)
106 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
107 self
.assertTrue(receivedResponse
)
108 self
.assertEquals(expectedResponse
, receivedResponse
)
110 def testResponseNoMatch(self
):
112 Tag: Tag set on query does not match anything
114 name
= 'tag-me-response-2.tags.tests.powerdns.com.'
115 query
= dns
.message
.make_query(name
, 'A', 'IN')
116 response
= dns
.message
.make_response(query
)
117 rrset
= dns
.rrset
.from_text(name
,
122 response
.answer
.append(rrset
)
124 for method
in ("sendUDPQuery", "sendTCPQuery"):
125 sender
= getattr(self
, method
)
126 (receivedQuery
, receivedResponse
) = sender(query
, response
)
127 self
.assertTrue(receivedQuery
)
128 self
.assertTrue(receivedResponse
)
129 receivedQuery
.id = query
.id
130 self
.assertEquals(query
, receivedQuery
)
131 self
.assertEquals(response
, receivedResponse
)
133 def testResponseMatchTagAndValue(self
):
135 Tag: Tag and value set on query matches on response
137 name
= 'tag-me-response-1.tags.tests.powerdns.com.'
138 query
= dns
.message
.make_query(name
, 'A', 'IN')
139 # dnsdist set RA = RD for spoofed responses
140 query
.flags
&= ~dns
.flags
.RD
141 response
= dns
.message
.make_response(query
)
142 rrset
= dns
.rrset
.from_text(name
,
147 response
.answer
.append(rrset
)
148 expectedResponse
= dns
.message
.make_response(query
)
149 expectedResponse
.answer
.append(rrset
)
150 # we will set TC if the tag matches
151 expectedResponse
.flags |
= dns
.flags
.TC
153 for method
in ("sendUDPQuery", "sendTCPQuery"):
154 sender
= getattr(self
, method
)
155 (receivedQuery
, receivedResponse
) = sender(query
, response
)
156 self
.assertTrue(receivedQuery
)
157 self
.assertTrue(receivedResponse
)
158 receivedQuery
.id = query
.id
159 self
.assertEquals(query
, receivedQuery
)
160 self
.assertEquals(expectedResponse
, receivedResponse
)
162 def testResponseMatchResponseTagMatches(self
):
164 Tag: Tag set on response matches
166 name
= 'tag-me-response-3.tags.tests.powerdns.com.'
167 query
= dns
.message
.make_query(name
, 'A', 'IN')
168 # dnsdist set RA = RD for spoofed responses
169 query
.flags
&= ~dns
.flags
.RD
170 response
= dns
.message
.make_response(query
)
171 rrset
= dns
.rrset
.from_text(name
,
176 response
.answer
.append(rrset
)
177 expectedResponse
= dns
.message
.make_response(query
)
178 expectedResponse
.answer
.append(rrset
)
179 # we will set QR=0 if the tag matches
180 expectedResponse
.flags
&= ~dns
.flags
.QR
182 for method
in ("sendUDPQuery", "sendTCPQuery"):
183 sender
= getattr(self
, method
)
184 (receivedQuery
, receivedResponse
) = sender(query
, response
)
185 self
.assertTrue(receivedQuery
)
186 self
.assertTrue(receivedResponse
)
187 receivedQuery
.id = query
.id
188 self
.assertEquals(query
, receivedQuery
)
189 self
.assertEquals(expectedResponse
, receivedResponse
)