raise AssertionError("RRSIG found in answers for:\n%s" % ret)
def assertAnswerEmpty(self, msg):
- self.assertTrue(len(msg.answer) == 0, "Data found in the the answer section for %s:\n%s" % (msg.question[0].to_text(), '\n'.join([i.to_text() for i in msg.answer])))
+ self.assertEqual(len(msg.answer), 0, "Data found in the the answer section for %s:\n%s" % (msg.question[0].to_text(), '\n'.join([i.to_text() for i in msg.answer])))
def assertAnswerNotEmpty(self, msg):
self.assertTrue(len(msg.answer) > 0, "Answer is empty")
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
self.assertEqual(receivedResponse, response)
for an in receivedResponse.answer:
- self.assertTrue(an.ttl == ttl)
+ self.assertEqual(an.ttl, ttl)
# now we wait a bit for the TTL to decrease
time.sleep(1)
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
self.assertEqual(receivedResponse, response)
for an in receivedResponse.answer:
- self.assertTrue(an.ttl == ttl)
+ self.assertEqual(an.ttl, ttl)
total = 0
for key in self._responsesCounter:
self.assertEqual(query, receivedQuery)
self.assertEqual(receivedResponse.question, response.question)
self.assertTrue(receivedResponse.flags & ~dns.flags.TC)
- self.assertTrue(len(receivedResponse.answer) == 0)
- self.assertTrue(len(receivedResponse.authority) == 0)
- self.assertTrue(len(receivedResponse.additional) == 0)
+ self.assertEqual(len(receivedResponse.answer), 0)
+ self.assertEqual(len(receivedResponse.authority), 0)
+ self.assertEqual(len(receivedResponse.additional), 0)
def testCertRotation(self):
"""
raise AssertionError("RRSIG found in answers for:\n%s" % ret)
def assertAnswerEmpty(self, msg):
- self.assertTrue(len(msg.answer) == 0, "Data found in the the answer section for %s:\n%s" % (msg.question[0].to_text(), '\n'.join([i.to_text() for i in msg.answer])))
+ self.assertEqual(len(msg.answer), 0, "Data found in the the answer section for %s:\n%s" % (msg.question[0].to_text(), '\n'.join([i.to_text() for i in msg.answer])))
def assertAdditionalEmpty(self, msg):
- self.assertTrue(len(msg.additional) == 0, "Data found in the the additional section for %s:\n%s" % (msg.question[0].to_text(), '\n'.join([i.to_text() for i in msg.additional])))
+ self.assertEqual(len(msg.additional), 0, "Data found in the the additional section for %s:\n%s" % (msg.question[0].to_text(), '\n'.join([i.to_text() for i in msg.additional])))
def assertRcodeEqual(self, msg, rcode):
if not isinstance(msg, dns.message.Message):
def checkProtobufIdentity(self, msg, requestorId, deviceId, deviceName):
#print(msg)
- self.assertTrue((requestorId == '') == (not msg.HasField('requestorId')))
- self.assertTrue((deviceId == b'') == (not msg.HasField('deviceId')))
- self.assertTrue((deviceName == '') == (not msg.HasField('deviceName')))
+ self.assertEqual(requestorId == '', not msg.HasField('requestorId'))
+ self.assertEqual(deviceId == b'', not msg.HasField('deviceId'))
+ self.assertEqual(deviceName == '', not msg.HasField('deviceName'))
self.assertEqual(msg.requestorId, requestorId)
self.assertEqual(msg.deviceId, deviceId)
self.assertEqual(msg.deviceName, deviceName)
def checkProtobufEDE(self, msg, ede, edeText):
#print(msg)
- self.assertTrue((ede == 0) == (not msg.HasField('ede')))
- self.assertTrue((edeText == '') == (not msg.HasField('edeText')))
+ self.assertEqual(ede == 0, not msg.HasField('ede'))
+ self.assertEqual(edeText == '', not msg.HasField('edeText'))
self.assertEqual(msg.ede, ede)
self.assertEqual(msg.edeText, edeText)
return opt
def checkProtobufOT(self, msg, openTelemetryData, openTelemetryTraceID):
- self.assertTrue(openTelemetryData == msg.HasField('openTelemetryData'))
- self.assertTrue(openTelemetryTraceID == msg.HasField('openTelemetryTraceID'))
+ self.assertEqual(openTelemetryData, msg.HasField('openTelemetryData'))
+ self.assertEqual(openTelemetryTraceID, msg.HasField('openTelemetryTraceID'))
def setUp(self):
super(TestRecursorProtobuf, self).setUp()