]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Tags.py
Merge pull request #14078 from rgacogne/ddist-harvest-quic
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Tags.py
1 #!/usr/bin/env python
2 import unittest
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 class TestBasics(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11
12 function lol(dq)
13 return DNSAction.None, ""
14 end
15 addAction(AllRule(), LuaAction(lol))
16
17 addAction("tag-me-dns-1.tags.tests.powerdns.com.", TagAction("dns", "value1"))
18 addAction("tag-me-dns-2.tags.tests.powerdns.com.", TagAction("dns", "value2"))
19 addAction("tag-me-response-1.tags.tests.powerdns.com.", TagAction("response", "value1"))
20 addAction("tag-me-response-2.tags.tests.powerdns.com.", TagAction("response", "value2"))
21
22 addAction(TagRule("not-dns"), SpoofAction("1.2.3.4"))
23 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
24 addAction(TagRule("dns"), SpoofAction("1.2.3.100"))
25
26 function responseHandlerSetTC(dr)
27 dr.dh:setTC(true)
28 return DNSResponseAction.HeaderModify, ""
29 end
30
31 function responseHandlerUnsetQR(dr)
32 dr.dh:setQR(false)
33 return DNSResponseAction.HeaderModify, ""
34 end
35
36 addResponseAction(TagRule("not-dns"), DropResponseAction())
37 addResponseAction(TagRule("response", "value1"), LuaResponseAction(responseHandlerSetTC))
38 addResponseAction(TagRule("response", "no-match-value"), DropResponseAction())
39
40 addResponseAction("tag-me-response-3.tags.tests.powerdns.com.", TagResponseAction("response-tag", "value"))
41 addResponseAction(TagRule("response-tag"), LuaResponseAction(responseHandlerUnsetQR))
42 """
43
44 def testQuestionNoTag(self):
45 """
46 Tag: No match
47 """
48 name = 'no-match.tags.tests.powerdns.com.'
49 query = dns.message.make_query(name, 'A', 'IN')
50 response = dns.message.make_response(query)
51 rrset = dns.rrset.from_text(name,
52 3600,
53 dns.rdataclass.IN,
54 dns.rdatatype.A,
55 '127.0.0.1')
56 response.answer.append(rrset)
57
58 for method in ("sendUDPQuery", "sendTCPQuery"):
59 sender = getattr(self, method)
60 (receivedQuery, receivedResponse) = sender(query, response)
61 self.assertTrue(receivedQuery)
62 self.assertTrue(receivedResponse)
63 receivedQuery.id = query.id
64 self.assertEquals(query, receivedQuery)
65 self.assertEquals(response, receivedResponse)
66
67 def testQuestionMatchTagAndValue(self):
68 """
69 Tag: Name and value match
70 """
71 name = 'tag-me-dns-1.tags.tests.powerdns.com.'
72 query = dns.message.make_query(name, 'A', 'IN')
73 # dnsdist set RA = RD for spoofed responses
74 query.flags &= ~dns.flags.RD
75 expectedResponse = dns.message.make_response(query)
76 rrset = dns.rrset.from_text(name,
77 60,
78 dns.rdataclass.IN,
79 dns.rdatatype.A,
80 '1.2.3.50')
81 expectedResponse.answer.append(rrset)
82
83 for method in ("sendUDPQuery", "sendTCPQuery"):
84 sender = getattr(self, method)
85 (_, receivedResponse) = sender(query, response=None, useQueue=False)
86 self.assertTrue(receivedResponse)
87 self.assertEquals(expectedResponse, receivedResponse)
88
89 def testQuestionMatchTagOnly(self):
90 """
91 Tag: Name matches
92 """
93 name = 'tag-me-dns-2.tags.tests.powerdns.com.'
94 query = dns.message.make_query(name, 'A', 'IN')
95 # dnsdist set RA = RD for spoofed responses
96 query.flags &= ~dns.flags.RD
97 expectedResponse = dns.message.make_response(query)
98 rrset = dns.rrset.from_text(name,
99 60,
100 dns.rdataclass.IN,
101 dns.rdatatype.A,
102 '1.2.3.100')
103 expectedResponse.answer.append(rrset)
104
105 for method in ("sendUDPQuery", "sendTCPQuery"):
106 sender = getattr(self, method)
107 (_, receivedResponse) = sender(query, response=None, useQueue=False)
108 self.assertTrue(receivedResponse)
109 self.assertEquals(expectedResponse, receivedResponse)
110
111 def testResponseNoMatch(self):
112 """
113 Tag: Tag set on query does not match anything
114 """
115 name = 'tag-me-response-2.tags.tests.powerdns.com.'
116 query = dns.message.make_query(name, 'A', 'IN')
117 response = dns.message.make_response(query)
118 rrset = dns.rrset.from_text(name,
119 60,
120 dns.rdataclass.IN,
121 dns.rdatatype.A,
122 '192.0.2.1')
123 response.answer.append(rrset)
124
125 for method in ("sendUDPQuery", "sendTCPQuery"):
126 sender = getattr(self, method)
127 (receivedQuery, receivedResponse) = sender(query, response)
128 self.assertTrue(receivedQuery)
129 self.assertTrue(receivedResponse)
130 receivedQuery.id = query.id
131 self.assertEquals(query, receivedQuery)
132 self.assertEquals(response, receivedResponse)
133
134 def testResponseMatchTagAndValue(self):
135 """
136 Tag: Tag and value set on query matches on response
137 """
138 name = 'tag-me-response-1.tags.tests.powerdns.com.'
139 query = dns.message.make_query(name, 'A', 'IN')
140 # dnsdist set RA = RD for spoofed responses
141 query.flags &= ~dns.flags.RD
142 response = dns.message.make_response(query)
143 rrset = dns.rrset.from_text(name,
144 60,
145 dns.rdataclass.IN,
146 dns.rdatatype.A,
147 '1.2.3.100')
148 response.answer.append(rrset)
149 expectedResponse = dns.message.make_response(query)
150 expectedResponse.answer.append(rrset)
151 # we will set TC if the tag matches
152 expectedResponse.flags |= dns.flags.TC
153
154 for method in ("sendUDPQuery", "sendTCPQuery"):
155 sender = getattr(self, method)
156 (receivedQuery, receivedResponse) = sender(query, response)
157 self.assertTrue(receivedQuery)
158 self.assertTrue(receivedResponse)
159 receivedQuery.id = query.id
160 self.assertEquals(query, receivedQuery)
161 self.assertEquals(expectedResponse, receivedResponse)
162
163 def testResponseMatchResponseTagMatches(self):
164 """
165 Tag: Tag set on response matches
166 """
167 name = 'tag-me-response-3.tags.tests.powerdns.com.'
168 query = dns.message.make_query(name, 'A', 'IN')
169 # dnsdist set RA = RD for spoofed responses
170 query.flags &= ~dns.flags.RD
171 response = dns.message.make_response(query)
172 rrset = dns.rrset.from_text(name,
173 60,
174 dns.rdataclass.IN,
175 dns.rdatatype.A,
176 '1.2.3.100')
177 response.answer.append(rrset)
178 expectedResponse = dns.message.make_response(query)
179 expectedResponse.answer.append(rrset)
180 # we will set QR=0 if the tag matches
181 expectedResponse.flags &= ~dns.flags.QR
182
183 for method in ("sendUDPQuery", "sendTCPQuery"):
184 sender = getattr(self, method)
185 (receivedQuery, receivedResponse) = sender(query, response)
186 self.assertTrue(receivedQuery)
187 self.assertTrue(receivedResponse)
188 receivedQuery.id = query.id
189 self.assertEquals(query, receivedQuery)
190 self.assertEquals(expectedResponse, receivedResponse)