msg.ParseFromString(data)
return msg
- def checkProtobufBase(self, msg, protocol, query, initiator, normalQueryResponse=True, v6=False):
+ def checkProtobufBase(self, msg, protocol, query, initiator, normalQueryResponse=True, v6=False, flags=None):
self.assertTrue(msg)
self.assertTrue(msg.HasField('timeSec'))
self.assertTrue(msg.HasField('socketFamily'))
self.assertTrue(msg.HasField('id'))
self.assertEqual(msg.id, query.id)
self.assertTrue(msg.HasField('inBytes'))
+ self.assertTrue(msg.HasField('headerFlags'))
+ queryFlags = flags or int.from_bytes(query.to_wire()[2:4], byteorder=sys.byteorder)
+ self.assertEqual(msg.headerFlags, queryFlags)
self.assertTrue(msg.HasField('serverIdentity'))
self.assertEqual(msg.serverIdentity, self._protobufServerID.encode('utf-8'))
# exclusive or of lists should be empty
self.assertEqual(len(listx), 0, "Protobuf tags don't match")
- def checkProtobufQueryConvertedToResponse(self, msg, protocol, response, initiator='127.0.0.0'):
+ def checkProtobufQueryConvertedToResponse(self, msg, protocol, response, initiator='127.0.0.0', flags=None):
self.assertEqual(msg.type, dnsmessage_pb2.PBDNSMessage.DNSResponseType)
# skip comparing inBytes (size of the query) with the length of the generated response
- self.checkProtobufBase(msg, protocol, response, initiator, False)
+ self.checkProtobufBase(msg, protocol, response, initiator, False, flags=flags)
self.assertTrue(msg.HasField('response'))
self.assertTrue(msg.response.HasField('queryTimeSec'))
# check the protobuf message corresponding to the UDP query
msg = self.getFirstProtobufMessage()
-
- self.checkProtobufQueryConvertedToResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, response, '127.0.0.0')
+ flags = int.from_bytes(query.to_wire()[2:4], byteorder=sys.byteorder)
+ self.checkProtobufQueryConvertedToResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, response, '127.0.0.0', flags=flags)
self.checkProtobufTags(msg.response.tags, [ u"TestLabel1,TestData1", u"TestLabel2,TestData2", u"TestLabel3,TestData3", u"Query,123"])
# check the protobuf message corresponding to the UDP response
# check the protobuf message corresponding to the TCP query
msg = self.getFirstProtobufMessage()
- self.checkProtobufQueryConvertedToResponse(msg, dnsmessage_pb2.PBDNSMessage.TCP, response, '127.0.0.0')
+ flags = int.from_bytes(query.to_wire()[2:4], byteorder=sys.byteorder)
+ self.checkProtobufQueryConvertedToResponse(msg, dnsmessage_pb2.PBDNSMessage.TCP, response, '127.0.0.0', flags=flags)
self.checkProtobufTags(msg.response.tags, [ u"TestLabel1,TestData1", u"TestLabel2,TestData2", u"TestLabel3,TestData3", u"Query,123"])
# check the protobuf message corresponding to the TCP response