]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Tags.py
Merge pull request #5770 from rgacogne/remote-logger-first-packet
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Tags.py
1 #!/usr/bin/env python
2 import unittest
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 class TestBasics(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11
12 function lol(dq)
13 return DNSAction.None, ""
14 end
15 addAction(AllRule(), LuaAction(lol))
16
17 addAction("tag-me-dns-1.tags.tests.powerdns.com.", TagAction("dns", "value1"))
18 addAction("tag-me-dns-2.tags.tests.powerdns.com.", TagAction("dns", "value2"))
19 addAction("tag-me-response-1.tags.tests.powerdns.com.", TagAction("response", "value1"))
20 addAction("tag-me-response-2.tags.tests.powerdns.com.", TagAction("response", "value2"))
21
22 addAction(TagRule("not-dns"), SpoofAction("1.2.3.4"))
23 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
24 addAction(TagRule("dns"), SpoofAction("1.2.3.100"))
25
26 function responseHandlerSetTC(dr)
27 dr.dh:setTC(true)
28 return DNSResponseAction.HeaderModify, ""
29 end
30
31 function responseHandlerUnsetQR(dr)
32 dr.dh:setQR(false)
33 return DNSResponseAction.HeaderModify, ""
34 end
35
36 addResponseAction(TagRule("not-dns"), DropResponseAction())
37 addResponseAction(TagRule("response", "value1"), LuaResponseAction(responseHandlerSetTC))
38 addResponseAction(TagRule("response", "no-match-value"), DropResponseAction())
39
40 addResponseAction("tag-me-response-3.tags.tests.powerdns.com.", TagResponseAction("response-tag", "value"))
41 addResponseAction(TagRule("response-tag"), LuaResponseAction(responseHandlerUnsetQR))
42 """
43
44 def testQuestionNoTag(self):
45 """
46 Tag: No match
47 """
48 name = 'no-match.tags.tests.powerdns.com.'
49 query = dns.message.make_query(name, 'A', 'IN')
50 response = dns.message.make_response(query)
51 rrset = dns.rrset.from_text(name,
52 3600,
53 dns.rdataclass.IN,
54 dns.rdatatype.A,
55 '127.0.0.1')
56 response.answer.append(rrset)
57
58 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
59 self.assertTrue(receivedQuery)
60 self.assertTrue(receivedResponse)
61 receivedQuery.id = query.id
62 self.assertEquals(query, receivedQuery)
63 self.assertEquals(response, receivedResponse)
64
65 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
66 self.assertTrue(receivedQuery)
67 self.assertTrue(receivedResponse)
68 receivedQuery.id = query.id
69 self.assertEquals(query, receivedQuery)
70 self.assertEquals(response, receivedResponse)
71
72 def testQuestionMatchTagAndValue(self):
73 """
74 Tag: Name and value match
75 """
76 name = 'tag-me-dns-1.tags.tests.powerdns.com.'
77 query = dns.message.make_query(name, 'A', 'IN')
78 # dnsdist set RA = RD for spoofed responses
79 query.flags &= ~dns.flags.RD
80 expectedResponse = dns.message.make_response(query)
81 rrset = dns.rrset.from_text(name,
82 60,
83 dns.rdataclass.IN,
84 dns.rdatatype.A,
85 '1.2.3.50')
86 expectedResponse.answer.append(rrset)
87
88 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
89 self.assertTrue(receivedResponse)
90 self.assertEquals(expectedResponse, receivedResponse)
91
92 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
93 self.assertTrue(receivedResponse)
94 self.assertEquals(expectedResponse, receivedResponse)
95
96 def testQuestionMatchTagOnly(self):
97 """
98 Tag: Name matches
99 """
100 name = 'tag-me-dns-2.tags.tests.powerdns.com.'
101 query = dns.message.make_query(name, 'A', 'IN')
102 # dnsdist set RA = RD for spoofed responses
103 query.flags &= ~dns.flags.RD
104 expectedResponse = dns.message.make_response(query)
105 rrset = dns.rrset.from_text(name,
106 60,
107 dns.rdataclass.IN,
108 dns.rdatatype.A,
109 '1.2.3.100')
110 expectedResponse.answer.append(rrset)
111
112 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
113 self.assertTrue(receivedResponse)
114 self.assertEquals(expectedResponse, receivedResponse)
115
116 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
117 self.assertTrue(receivedResponse)
118 self.assertEquals(expectedResponse, receivedResponse)
119
120 def testResponseNoMatch(self):
121 """
122 Tag: Tag set on query does not match anything
123 """
124 name = 'tag-me-response-2.tags.tests.powerdns.com.'
125 query = dns.message.make_query(name, 'A', 'IN')
126 response = dns.message.make_response(query)
127 rrset = dns.rrset.from_text(name,
128 60,
129 dns.rdataclass.IN,
130 dns.rdatatype.A,
131 '192.0.2.1')
132 response.answer.append(rrset)
133
134 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
135 self.assertTrue(receivedQuery)
136 self.assertTrue(receivedResponse)
137 receivedQuery.id = query.id
138 self.assertEquals(query, receivedQuery)
139 self.assertEquals(response, receivedResponse)
140
141 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
142 self.assertTrue(receivedQuery)
143 self.assertTrue(receivedResponse)
144 receivedQuery.id = query.id
145 self.assertEquals(query, receivedQuery)
146 self.assertEquals(response, receivedResponse)
147
148 def testResponseMatchTagAndValue(self):
149 """
150 Tag: Tag and value set on query matches on response
151 """
152 name = 'tag-me-response-1.tags.tests.powerdns.com.'
153 query = dns.message.make_query(name, 'A', 'IN')
154 # dnsdist set RA = RD for spoofed responses
155 query.flags &= ~dns.flags.RD
156 response = dns.message.make_response(query)
157 rrset = dns.rrset.from_text(name,
158 60,
159 dns.rdataclass.IN,
160 dns.rdatatype.A,
161 '1.2.3.100')
162 response.answer.append(rrset)
163 expectedResponse = dns.message.make_response(query)
164 expectedResponse.answer.append(rrset)
165 # we will set TC if the tag matches
166 expectedResponse.flags |= dns.flags.TC
167
168 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
169 self.assertTrue(receivedQuery)
170 self.assertTrue(receivedResponse)
171 receivedQuery.id = query.id
172 self.assertEquals(query, receivedQuery)
173 print(expectedResponse)
174 print(receivedResponse)
175 self.assertEquals(expectedResponse, receivedResponse)
176
177 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
178 self.assertTrue(receivedQuery)
179 self.assertTrue(receivedResponse)
180 receivedQuery.id = query.id
181 self.assertEquals(query, receivedQuery)
182 self.assertEquals(expectedResponse, receivedResponse)
183
184 def testResponseMatchResponseTagMatches(self):
185 """
186 Tag: Tag set on response matches
187 """
188 name = 'tag-me-response-3.tags.tests.powerdns.com.'
189 query = dns.message.make_query(name, 'A', 'IN')
190 # dnsdist set RA = RD for spoofed responses
191 query.flags &= ~dns.flags.RD
192 response = dns.message.make_response(query)
193 rrset = dns.rrset.from_text(name,
194 60,
195 dns.rdataclass.IN,
196 dns.rdatatype.A,
197 '1.2.3.100')
198 response.answer.append(rrset)
199 expectedResponse = dns.message.make_response(query)
200 expectedResponse.answer.append(rrset)
201 # we will set QR=0 if the tag matches
202 expectedResponse.flags &= ~dns.flags.QR
203
204 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
205 self.assertTrue(receivedQuery)
206 self.assertTrue(receivedResponse)
207 receivedQuery.id = query.id
208 self.assertEquals(query, receivedQuery)
209 self.assertEquals(expectedResponse, receivedResponse)
210
211 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
212 self.assertTrue(receivedQuery)
213 self.assertTrue(receivedResponse)
214 receivedQuery.id = query.id
215 self.assertEquals(query, receivedQuery)
216 self.assertEquals(expectedResponse, receivedResponse)