/* addOrReplaceEDNSOption will set it to false if there is already an existing option */
optionAdded = true;
addOrReplaceEDNSOption(options, optionToReplace, optionAdded, overrideExisting, newOptionContent);
- packetWriter.addOpt(recordHeader.d_class, edns0.extRCode, edns0.extFlags, options, edns0.version);
+ packetWriter.addOpt(recordHeader.d_class, edns0.extRCode, ntohs(edns0.extFlags), options, edns0.version);
}
}
BOOST_REQUIRE(getEDNS0Record(dnsQuestion.getData(), edns0));
BOOST_CHECK_EQUAL(edns0.version, 0U);
BOOST_CHECK_EQUAL(edns0.extRCode, 0U);
- BOOST_CHECK_EQUAL(edns0.extFlags, EDNS_HEADER_FLAG_DO);
+ BOOST_CHECK_EQUAL(ntohs(edns0.extFlags), EDNS_HEADER_FLAG_DO);
BOOST_REQUIRE(parseEDNSOptions(dnsQuestion));
BOOST_REQUIRE(dnsQuestion.ednsOptions != nullptr);
def checkMessageEDNSWithoutOptions(self, expected, received):
self.assertEqual(expected, received)
self.assertEqual(received.edns, 0)
+ self.assertEqual(expected.ednsflags, received.ednsflags)
self.assertEqual(expected.payload, received.payload)
def checkMessageEDNSWithoutECS(self, expected, received, withCookies=0):
self.assertEqual(expected, received)
self.assertEqual(received.edns, 0)
+ self.assertEqual(expected.ednsflags, received.ednsflags)
self.assertEqual(expected.payload, received.payload)
self.assertEqual(len(received.options), withCookies)
if withCookies:
def checkMessageEDNSWithECS(self, expected, received, additionalOptions=0):
self.assertEqual(expected, received)
self.assertEqual(received.edns, 0)
+ self.assertEqual(expected.ednsflags, received.ednsflags)
self.assertEqual(expected.payload, received.payload)
self.assertEqual(len(received.options), 1 + additionalOptions)
hasECS = False
def checkMessageEDNS(self, expected, received):
self.assertEqual(expected, received)
self.assertEqual(received.edns, 0)
+ self.assertEqual(expected.ednsflags, received.ednsflags)
self.assertEqual(expected.payload, received.payload)
self.assertEqual(len(expected.options), len(received.options))
self.compareOptions(expected.options, received.options)
query.id = 0
response = dns.message.make_response(query)
response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
+ response.want_dnssec(True)
rrset = dns.rrset.from_text(name,
3600,
dns.rdataclass.IN,
rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
query.id = 0
- expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
+ expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso], want_dnssec=True)
response = dns.message.make_response(query)
response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
+ response.want_dnssec(True)
rrset = dns.rrset.from_text(name,
3600,
dns.rdataclass.IN,
(_, receivedResponse) = sender(query, response=None, useQueue=False)
self.checkMessageEDNS(expectedResponse, receivedResponse)
+ def testExtendedErrorBackendResponse(self):
+ """
+ EDE: Backend response (DO)
+ """
+ name = 'backend-response-do.ede.tests.powerdns.com.'
+ ede = extendederrors.ExtendedErrorOption(16, b'my extended error status')
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=True, want_dnssec=True)
+
+ backendResponse = dns.message.make_response(query)
+ backendResponse.use_edns(edns=True, payload=4096, options=[])
+ backendResponse.want_dnssec(True)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+
+ backendResponse.answer.append(rrset)
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.use_edns(edns=True, payload=4096, options=[ede])
+ expectedResponse.want_dnssec(True)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ expectedResponse.answer.append(rrset)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, backendResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.checkMessageEDNS(expectedResponse, receivedResponse)
+
+ # testing the cache
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.checkMessageEDNS(expectedResponse, receivedResponse)
+
def testExtendedErrorBackendResponseWithExistingEDE(self):
"""
EDE: Backend response with existing EDE
query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query, our_payload=1042)
+ expectedResponse.want_dnssec(True)
expectedResponse.set_rcode(dns.rcode.REFUSED)
for method in ("sendUDPQuery", "sendTCPQuery"):
# dnsdist sets RA = RD for TC responses
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query, our_payload=1042)
+ expectedResponse.want_dnssec(True)
expectedResponse.flags |= dns.flags.TC
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
name = 'edns-do.lua.edns-self.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
expectedResponse = dns.message.make_response(query, our_payload=1042)
+ expectedResponse.want_dnssec(True)
expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
for method in ("sendUDPQuery", "sendTCPQuery"):
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query, our_payload=1042)
+ expectedResponse.want_dnssec(True)
expectedResponse.answer.append(dns.rrset.from_text(name,
60,
dns.rdataclass.IN,
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query, our_payload=1042)
expectedResponse.set_rcode(dns.rcode.REFUSED)
+ expectedResponse.want_dnssec(True)
for method in ("sendUDPQuery", "sendTCPQuery"):
sender = getattr(self, method)
# dnsdist sets RA = RD for TC responses
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query, our_payload=1042)
+ expectedResponse.want_dnssec(True)
expectedResponse.flags |= dns.flags.TC
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
name = 'edns-options.lua.edns-self.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
expectedResponse = dns.message.make_response(query, our_payload=1042)
+ expectedResponse.want_dnssec(True)
expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
for method in ("sendUDPQuery", "sendTCPQuery"):
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query, our_payload=1042)
+ expectedResponse.want_dnssec(True)
expectedResponse.answer.append(dns.rrset.from_text(name,
60,
dns.rdataclass.IN,