expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
query = dns.message.make_query(name, 'A', want_dnssec=True)
query.flags |= dns.flags.CD
- res = self.sendUDPQuery(query)
+ for method in ('sendUDPQuery', 'sendTCPQuery'):
+ sender = getattr(self, method)
+ res = sender(query)
- self.assertRRsetInAnswer(res, expected)
+ self.assertRRsetInAnswer(res, expected)
- # check the protobuf messages corresponding to the UDP query and answer
- msg = self.getFirstProtobufMessage()
- self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
- self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
- # then the response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1')
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- # we have max-cache-ttl set to 15
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
- self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkNoRemainingMessage()
+ # check the protobuf messages corresponding to the query and answer
+ msg = self.getFirstProtobufMessage()
+ if method == 'sendUDPQuery':
+ protocol = dnsmessage_pb2.PBDNSMessage.UDP
+ else:
+ protocol = dnsmessage_pb2.PBDNSMessage.TCP
+ self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+ # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
+ self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
+ # then the response
+ msg = self.getFirstProtobufMessage()
+ self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
+ self.assertEqual(len(msg.response.rrs), 1)
+ rr = msg.response.rrs[0]
+ # we have max-cache-ttl set to 15
+ if method == 'sendUDPQuery':
+ self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+ self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
+ self.checkNoRemainingMessage()
#
# again, for a PC cache hit
#
- res = self.sendUDPQuery(query)
+ for method in ('sendUDPQuery', 'sendTCPQuery'):
+ sender = getattr(self, method)
+ res = sender(query)
- self.assertRRsetInAnswer(res, expected)
+ self.assertRRsetInAnswer(res, expected)
- # check the protobuf messages corresponding to the UDP query and answer
- msg = self.getFirstProtobufMessage()
- self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
- self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
- # then the response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1')
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- # we have max-cache-ttl set to 15
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
- self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufOT(msg, True, True)
- self.checkProtobufEDE(msg, 0, '')
- self.checkNoRemainingMessage()
+ # check the protobuf messages corresponding to the UDP query and answer
+ msg = self.getFirstProtobufMessage()
+ if method == 'sendUDPQuery':
+ protocol = dnsmessage_pb2.PBDNSMessage.UDP
+ else:
+ protocol = dnsmessage_pb2.PBDNSMessage.TCP
+ self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+ # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
+ self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
+ # then the response
+ msg = self.getFirstProtobufMessage()
+ self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
+ self.assertEqual(len(msg.response.rrs), 1)
+ rr = msg.response.rrs[0]
+ # we have max-cache-ttl set to 15
+ if method == 'sendUDPQuery':
+ self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+ self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
+ self.checkProtobufOT(msg, True, True)
+ self.checkProtobufEDE(msg, 0, '')
+ self.checkNoRemainingMessage()
def testCNAME(self):
name = 'cname.example.'