sender = getattr(self, method)
(_, receivedResponse) = sender(query, response=None, useQueue=False)
self.assertEqual(receivedResponse, expectedResponse)
+
+class TestYamlNMGRule(DNSDistTest):
+
+ _yaml_config_template = """---
+binds:
+ - listen_address: "127.0.0.1:%d"
+ protocol: Do53
+
+backends:
+ - address: "127.0.0.1:%d"
+ protocol: Do53
+
+query_rules:
+ - name: "refuse queries from non-allowed netmasks"
+ selector:
+ type: "Not"
+ selector:
+ type: "NetmaskGroup"
+ netmasks:
+ - "192.0.2.1/32"
+ action:
+ type: "RCode"
+ rcode: "5"
+"""
+ _yaml_config_params = ['_dnsDistPort', '_testServerPort']
+ _config_params = []
+
+ def testYamlNMGRule(self):
+ """
+ YAML: NMGRule should refuse our queries
+ """
+ name = 'nmgrule.yaml.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.REFUSED)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEqual(receivedResponse, expectedResponse)