"""
name = 'opcodeupdate.advanced.tests.powerdns.com.'
- query = dns.message.make_query(name, 'A', 'IN')
+ query = dns.message.make_query(name, 'SOA', 'IN')
query.set_opcode(dns.opcode.UPDATE)
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
(receivedQuery, receivedResponse) = sender(query, response)
expectedQuery.id = receivedQuery.id
self.assertEquals(receivedQuery, expectedQuery)
- print(receivedResponse)
- print(expectedResponse)
self.assertEquals(receivedResponse, expectedResponse)
class TestAdvancedSetNegativeAndSOA(DNSDistTest):
+ _selfGeneratedPayloadSize = 1232
_config_template = """
addAction("nxd.setnegativeandsoa.advanced.tests.powerdns.com.", SetNegativeAndSOAAction(true, "auth.", 42, "mname", "rname", 5, 4, 3, 2, 1))
addAction("nodata.setnegativeandsoa.advanced.tests.powerdns.com.", SetNegativeAndSOAAction(false, "another-auth.", 42, "mname", "rname", 1, 2, 3, 4, 5))
+ setPayloadSizeOnSelfGeneratedAnswers(%d)
newServer{address="127.0.0.1:%s"}
"""
+ _config_params = ['_selfGeneratedPayloadSize', '_testServerPort']
+
def testAdvancedNegativeAndSOANXD(self):
"""
Advanced: SetNegativeAndSOAAction NXD
"""
name = 'nxd.setnegativeandsoa.advanced.tests.powerdns.com.'
- query = dns.message.make_query(name, 'A', 'IN')
+ # no EDNS
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
- soa = dns.rrset.from_text("auth",
+ soa = dns.rrset.from_text("auth.",
42,
dns.rdataclass.IN,
dns.rdatatype.SOA,
for method in ("sendUDPQuery", "sendTCPQuery"):
sender = getattr(self, method)
(_, receivedResponse) = sender(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
+ self.checkMessageNoEDNS(expectedResponse, receivedResponse)
+
+ # withEDNS
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query, our_payload=self._selfGeneratedPayloadSize)
+ expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
+ soa = dns.rrset.from_text("auth.",
+ 42,
+ dns.rdataclass.IN,
+ dns.rdatatype.SOA,
+ 'mname. rname. 5 4 3 2 1')
+ expectedResponse.additional.append(soa)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
def testAdvancedNegativeAndSOANoData(self):
"""
Advanced: SetNegativeAndSOAAction NoData
"""
name = 'nodata.setnegativeandsoa.advanced.tests.powerdns.com.'
- query = dns.message.make_query(name, 'A', 'IN')
+ # no EDNS
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.NOERROR)
- soa = dns.rrset.from_text("another-auth",
+ soa = dns.rrset.from_text("another-auth.",
42,
dns.rdataclass.IN,
dns.rdatatype.SOA,
for method in ("sendUDPQuery", "sendTCPQuery"):
sender = getattr(self, method)
(_, receivedResponse) = sender(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
+ self.checkMessageNoEDNS(expectedResponse, receivedResponse)
+
+ # with EDNS
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query, our_payload=self._selfGeneratedPayloadSize)
+ expectedResponse.set_rcode(dns.rcode.NOERROR)
+ soa = dns.rrset.from_text("another-auth.",
+ 42,
+ dns.rdataclass.IN,
+ dns.rdatatype.SOA,
+ 'mname. rname. 1 2 3 4 5')
+ expectedResponse.additional.append(soa)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
class TestAdvancedLuaRule(DNSDistTest):
# and finally the ports, zeroed because we have no way to know them beforehand
xpfData = "\# 14 04117f0000017f00000100000000"
rdata = dns.rdata.from_text(dns.rdataclass.IN, self._xpfCode, xpfData)
- rrset = dns.rrset.from_rdata(name, 60, rdata)
+ rrset = dns.rrset.from_rdata(".", 0, rdata)
expectedQuery.additional.append(rrset)
response = dns.message.make_response(expectedQuery)
receivedQuery.id = expectedQuery.id
receivedResponse.id = response.id
- self.assertEquals(receivedQuery, expectedQuery)
self.checkMessageHasXPF(receivedQuery, xpfData)
self.assertEquals(response, receivedResponse)
# and finally the ports, zeroed because we have no way to know them beforehand
xpfData = "\# 14 04067f0000017f00000100000000"
rdata = dns.rdata.from_text(dns.rdataclass.IN, self._xpfCode, xpfData)
- rrset = dns.rrset.from_rdata(name, 60, rdata)
+ rrset = dns.rrset.from_rdata(".", 0, rdata)
expectedQuery.additional.append(rrset)
response = dns.message.make_response(expectedQuery)
receivedQuery.id = expectedQuery.id
receivedResponse.id = response.id
- self.assertEquals(receivedQuery, expectedQuery)
self.checkMessageHasXPF(receivedQuery, xpfData)
self.assertEquals(response, receivedResponse)