]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Tags.py
Merge pull request #8795 from omoerbeek/rec-lua-docs-policytag
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Tags.py
CommitLineData
a76b0d63 1#!/usr/bin/env python
a76b0d63
RG
2import dns
3import clientsubnetoption
4from dnsdisttests import DNSDistTest
5
6class TestBasics(DNSDistTest):
7
8 _config_template = """
9 newServer{address="127.0.0.1:%s"}
10
11 function lol(dq)
12 return DNSAction.None, ""
13 end
14 addAction(AllRule(), LuaAction(lol))
15
16 addAction("tag-me-dns-1.tags.tests.powerdns.com.", TagAction("dns", "value1"))
17 addAction("tag-me-dns-2.tags.tests.powerdns.com.", TagAction("dns", "value2"))
18 addAction("tag-me-response-1.tags.tests.powerdns.com.", TagAction("response", "value1"))
19 addAction("tag-me-response-2.tags.tests.powerdns.com.", TagAction("response", "value2"))
20
21 addAction(TagRule("not-dns"), SpoofAction("1.2.3.4"))
22 addAction(TagRule("dns", "value1"), SpoofAction("1.2.3.50"))
23 addAction(TagRule("dns"), SpoofAction("1.2.3.100"))
24
25 function responseHandlerSetTC(dr)
26 dr.dh:setTC(true)
27 return DNSResponseAction.HeaderModify, ""
28 end
29
30 function responseHandlerUnsetQR(dr)
31 dr.dh:setQR(false)
32 return DNSResponseAction.HeaderModify, ""
33 end
34
35 addResponseAction(TagRule("not-dns"), DropResponseAction())
36 addResponseAction(TagRule("response", "value1"), LuaResponseAction(responseHandlerSetTC))
37 addResponseAction(TagRule("response", "no-match-value"), DropResponseAction())
38
39 addResponseAction("tag-me-response-3.tags.tests.powerdns.com.", TagResponseAction("response-tag", "value"))
40 addResponseAction(TagRule("response-tag"), LuaResponseAction(responseHandlerUnsetQR))
41 """
42
43 def testQuestionNoTag(self):
44 """
45 Tag: No match
46 """
47 name = 'no-match.tags.tests.powerdns.com.'
48 query = dns.message.make_query(name, 'A', 'IN')
49 response = dns.message.make_response(query)
50 rrset = dns.rrset.from_text(name,
51 3600,
52 dns.rdataclass.IN,
53 dns.rdatatype.A,
54 '127.0.0.1')
55 response.answer.append(rrset)
56
6ca2e796
RG
57 for method in ("sendUDPQuery", "sendTCPQuery"):
58 sender = getattr(self, method)
59 (receivedQuery, receivedResponse) = sender(query, response)
60 self.assertTrue(receivedQuery)
61 self.assertTrue(receivedResponse)
62 receivedQuery.id = query.id
63 self.assertEquals(query, receivedQuery)
64 self.assertEquals(response, receivedResponse)
a76b0d63
RG
65
66 def testQuestionMatchTagAndValue(self):
67 """
68 Tag: Name and value match
69 """
70 name = 'tag-me-dns-1.tags.tests.powerdns.com.'
71 query = dns.message.make_query(name, 'A', 'IN')
72 # dnsdist set RA = RD for spoofed responses
73 query.flags &= ~dns.flags.RD
74 expectedResponse = dns.message.make_response(query)
75 rrset = dns.rrset.from_text(name,
76 60,
77 dns.rdataclass.IN,
78 dns.rdatatype.A,
79 '1.2.3.50')
80 expectedResponse.answer.append(rrset)
81
6ca2e796
RG
82 for method in ("sendUDPQuery", "sendTCPQuery"):
83 sender = getattr(self, method)
84 (_, receivedResponse) = sender(query, response=None, useQueue=False)
85 self.assertTrue(receivedResponse)
86 self.assertEquals(expectedResponse, receivedResponse)
a76b0d63
RG
87
88 def testQuestionMatchTagOnly(self):
89 """
90 Tag: Name matches
91 """
92 name = 'tag-me-dns-2.tags.tests.powerdns.com.'
93 query = dns.message.make_query(name, 'A', 'IN')
94 # dnsdist set RA = RD for spoofed responses
95 query.flags &= ~dns.flags.RD
96 expectedResponse = dns.message.make_response(query)
97 rrset = dns.rrset.from_text(name,
98 60,
99 dns.rdataclass.IN,
100 dns.rdatatype.A,
101 '1.2.3.100')
102 expectedResponse.answer.append(rrset)
103
6ca2e796
RG
104 for method in ("sendUDPQuery", "sendTCPQuery"):
105 sender = getattr(self, method)
106 (_, receivedResponse) = sender(query, response=None, useQueue=False)
107 self.assertTrue(receivedResponse)
108 self.assertEquals(expectedResponse, receivedResponse)
a76b0d63
RG
109
110 def testResponseNoMatch(self):
111 """
112 Tag: Tag set on query does not match anything
113 """
114 name = 'tag-me-response-2.tags.tests.powerdns.com.'
115 query = dns.message.make_query(name, 'A', 'IN')
116 response = dns.message.make_response(query)
117 rrset = dns.rrset.from_text(name,
118 60,
119 dns.rdataclass.IN,
120 dns.rdatatype.A,
121 '192.0.2.1')
122 response.answer.append(rrset)
123
6ca2e796
RG
124 for method in ("sendUDPQuery", "sendTCPQuery"):
125 sender = getattr(self, method)
126 (receivedQuery, receivedResponse) = sender(query, response)
127 self.assertTrue(receivedQuery)
128 self.assertTrue(receivedResponse)
129 receivedQuery.id = query.id
130 self.assertEquals(query, receivedQuery)
131 self.assertEquals(response, receivedResponse)
a76b0d63
RG
132
133 def testResponseMatchTagAndValue(self):
134 """
135 Tag: Tag and value set on query matches on response
136 """
137 name = 'tag-me-response-1.tags.tests.powerdns.com.'
138 query = dns.message.make_query(name, 'A', 'IN')
139 # dnsdist set RA = RD for spoofed responses
140 query.flags &= ~dns.flags.RD
141 response = dns.message.make_response(query)
142 rrset = dns.rrset.from_text(name,
143 60,
144 dns.rdataclass.IN,
145 dns.rdatatype.A,
146 '1.2.3.100')
147 response.answer.append(rrset)
148 expectedResponse = dns.message.make_response(query)
149 expectedResponse.answer.append(rrset)
150 # we will set TC if the tag matches
151 expectedResponse.flags |= dns.flags.TC
152
6ca2e796
RG
153 for method in ("sendUDPQuery", "sendTCPQuery"):
154 sender = getattr(self, method)
155 (receivedQuery, receivedResponse) = sender(query, response)
156 self.assertTrue(receivedQuery)
157 self.assertTrue(receivedResponse)
158 receivedQuery.id = query.id
159 self.assertEquals(query, receivedQuery)
160 self.assertEquals(expectedResponse, receivedResponse)
a76b0d63
RG
161
162 def testResponseMatchResponseTagMatches(self):
163 """
164 Tag: Tag set on response matches
165 """
166 name = 'tag-me-response-3.tags.tests.powerdns.com.'
167 query = dns.message.make_query(name, 'A', 'IN')
168 # dnsdist set RA = RD for spoofed responses
169 query.flags &= ~dns.flags.RD
170 response = dns.message.make_response(query)
171 rrset = dns.rrset.from_text(name,
172 60,
173 dns.rdataclass.IN,
174 dns.rdatatype.A,
175 '1.2.3.100')
176 response.answer.append(rrset)
177 expectedResponse = dns.message.make_response(query)
178 expectedResponse.answer.append(rrset)
179 # we will set QR=0 if the tag matches
180 expectedResponse.flags &= ~dns.flags.QR
181
6ca2e796
RG
182 for method in ("sendUDPQuery", "sendTCPQuery"):
183 sender = getattr(self, method)
184 (receivedQuery, receivedResponse) = sender(query, response)
185 self.assertTrue(receivedQuery)
186 self.assertTrue(receivedResponse)
187 receivedQuery.id = query.id
188 self.assertEquals(query, receivedQuery)
189 self.assertEquals(expectedResponse, receivedResponse)