(_, receivedResponse) = sender(query, response=None, useQueue=False)
self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
+
+class TestAdvancedNegativeAndSOAAuthSection(DNSDistTest):
+
+ _selfGeneratedPayloadSize = 1232
+ _config_template = """
+ addAction("nxd.negativeandsoa.advanced.tests.powerdns.com.", NegativeAndSOAAction(true, "auth.", 42, "mname", "rname", 5, 4, 3, 2, 1, { soaInAuthoritySection=true }))
+ addAction("nodata.negativeandsoa.advanced.tests.powerdns.com.", NegativeAndSOAAction(false, "another-auth.", 42, "mname", "rname", 1, 2, 3, 4, 5, { soaInAuthoritySection=true }))
+ setPayloadSizeOnSelfGeneratedAnswers(%d)
+ newServer{address="127.0.0.1:%s"}
+ """
+ _config_params = ['_selfGeneratedPayloadSize', '_testServerPort']
+
+
+ def testAdvancedNegativeAndSOANXD(self):
+ """
+ Advanced: NegativeAndSOAAction NXD
+ """
+ name = 'nxd.negativeandsoa.advanced.tests.powerdns.com.'
+ # no EDNS
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
+ soa = dns.rrset.from_text("auth.",
+ 42,
+ dns.rdataclass.IN,
+ dns.rdatatype.SOA,
+ 'mname. rname. 5 4 3 2 1')
+ expectedResponse.authority.append(soa)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.checkMessageNoEDNS(expectedResponse, receivedResponse)
+
+ # withEDNS
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query, our_payload=self._selfGeneratedPayloadSize)
+ expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
+ soa = dns.rrset.from_text("auth.",
+ 42,
+ dns.rdataclass.IN,
+ dns.rdatatype.SOA,
+ 'mname. rname. 5 4 3 2 1')
+ expectedResponse.authority.append(soa)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
+
+ def testAdvancedNegativeAndSOANoData(self):
+ """
+ Advanced: NegativeAndSOAAction NoData
+ """
+ name = 'nodata.negativeandsoa.advanced.tests.powerdns.com.'
+ # no EDNS
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.NOERROR)
+ soa = dns.rrset.from_text("another-auth.",
+ 42,
+ dns.rdataclass.IN,
+ dns.rdatatype.SOA,
+ 'mname. rname. 1 2 3 4 5')
+ expectedResponse.authority.append(soa)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.checkMessageNoEDNS(expectedResponse, receivedResponse)
+
+ # with EDNS
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query, our_payload=self._selfGeneratedPayloadSize)
+ expectedResponse.set_rcode(dns.rcode.NOERROR)
+ soa = dns.rrset.from_text("another-auth.",
+ 42,
+ dns.rdataclass.IN,
+ dns.rdatatype.SOA,
+ 'mname. rname. 1 2 3 4 5')
+ expectedResponse.authority.append(soa)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
+
+
class TestAdvancedLuaRule(DNSDistTest):
_config_template = """