receivedQuery.id = query.id
self.assertEqual(query, receivedQuery)
self.assertEqual(response, receivedResponse)
+
+class TestDynBlockGroupQSuffixMatchYAML(DynBlocksTest):
+
+ _yaml_config_template = """---
+dynamic_rules:
+ - name: "Check Suffix Match visitor from YAML"
+ rules:
+ - type: "suffix-match"
+ seconds: %d
+ action_duration: %d
+ comment: "Suffix-match"
+ visitor_function_code: |
+ visitor_called = false
+ return function(parentStats, nodeStats)
+ visitor_called = true
+ return false
+ end
+
+query_rules:
+ - name: "check that the visitor function has been called"
+ selector:
+ type: "QNameSet"
+ qnames:
+ - "check-visitor.group.dynblocks.tests.powerdns.com."
+ action:
+ type: "Lua"
+ name: "Return 192.0.2.1 if the visitor function has been called, 192.0.2.2 otherwise"
+ function_code: |
+ return function(dq)
+ if visitor_called then
+ return DNSAction.Spoof, "192.0.2.1"
+ end
+ return DNSAction.Spoof, "192.0.2.2"
+ end
+
+backends:
+ - address: "127.0.0.1:%d"
+ protocol: Do53
+"""
+ _config_params = []
+ _yaml_config_params = ['_dynBlockPeriod', '_dynBlockDuration', '_testServerPort']
+
+ def testSuffixMatchVisitorCalled(self):
+ """
+ Dyn Blocks (Group / YAML): Visitor called
+ """
+ name = 'check-visitor.group.dynblocks.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.NOERROR)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ expectedResponse.answer.append(rrset)
+
+ method = "sendUDPQuery"
+ sender = getattr(self, method)
+ for _ in range(4):
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ if receivedResponse == expectedResponse:
+ break
+ time.sleep(1)
+
+ self.assertEqual(receivedResponse, expectedResponse)