raise TypeError("msg is not a dns.message.Message")
msgFlags = dns.flags.to_text(msg.flags)
- self.assertTrue('AD' in msgFlags, "No AD flag found in the message for %s" % msg.question[0].name)
+ self.assertIn('AD', msgFlags, "No AD flag found in the message for %s" % msg.question[0].name)
def assertRRsetInAnswer(self, msg, rrset):
"""Asserts the rrset (without comparing TTL) exists in the
'qps', 'queries', 'order', 'tcpLatency', 'responses', 'nonCompliantResponses']:
self.assertGreaterEqual(server[key], 0)
- self.assertTrue(server['state'] in ['up', 'down', 'UP', 'DOWN'])
+ self.assertIn(server['state'], ['up', 'down', 'UP', 'DOWN'])
for frontend in content['frontends']:
for key in ['id', 'address', 'udp', 'tcp', 'type', 'queries', 'nonCompliantQueries']:
'qps', 'queries', 'order', 'tcpLatency', 'responses', 'nonCompliantResponses']:
self.assertGreaterEqual(server[key], 0)
- self.assertTrue(server['state'] in ['up', 'down', 'UP', 'DOWN'])
+ self.assertIn(server['state'], ['up', 'down', 'UP', 'DOWN'])
def testServersIDontExist(self):
"""
self.assertTrue(r)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.headers.get('x-powered-by'), "dnsdist")
- self.assertTrue("x-frame-options" in r.headers)
+ self.assertIn("x-frame-options", r.headers)
class TestStatsWithoutAuthentication(APITestsBase):
__test__ = True
self.assertTrue(receivedResponse)
receivedQuery.id = expectedQuery.id
self.assertEqual(expectedQuery, receivedQuery)
- self.assertTrue((self._customResponseHeader1) in self._response_headers.decode())
- self.assertTrue((self._customResponseHeader2) in self._response_headers.decode())
+ self.assertIn(self._customResponseHeader1, self._response_headers.decode())
+ self.assertIn(self._customResponseHeader2, self._response_headers.decode())
self.assertFalse(('UPPERCASE: VaLuE' in self._response_headers.decode()))
- self.assertTrue(('uppercase: VaLuE' in self._response_headers.decode()))
- self.assertTrue(('cache-control: max-age=3600' in self._response_headers.decode()))
+ self.assertIn('uppercase: VaLuE', self._response_headers.decode())
+ self.assertIn('cache-control: max-age=3600', self._response_headers.decode())
self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
self.assertEqual(response, receivedResponse)
self.checkHasHeader('cache-control', 'max-age=3600')
self.assertTrue(receivedResponse)
self.assertEqual(receivedResponse, b'Plaintext answer')
self.assertEqual(self._rcode, 200)
- self.assertTrue('content-type: text/plain' in self._response_headers.decode())
+ self.assertIn('content-type: text/plain', self._response_headers.decode())
def testHTTPStatusAction307(self):
"""
(_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
self.assertTrue(receivedResponse)
self.assertEqual(self._rcode, 307)
- self.assertTrue('location: https://doh.powerdns.org' in self._response_headers.decode())
+ self.assertIn('location: https://doh.powerdns.org', self._response_headers.decode())
def testHTTPLuaResponse(self):
"""
self.assertTrue(receivedResponse)
self.assertEqual(receivedResponse, b'It works!')
self.assertEqual(self._rcode, 200)
- self.assertTrue('content-type: text/plain' in self._response_headers.decode())
+ self.assertIn('content-type: text/plain', self._response_headers.decode())
def testHTTPEarlyResponse(self):
"""
self.assertTrue(receivedResponse)
self.assertEqual(receivedResponse, b'It works!')
self.assertEqual(self._rcode, 200)
- self.assertTrue('content-type: text/plain' in self._response_headers.decode())
+ self.assertIn('content-type: text/plain', self._response_headers.decode())
class TestDOHFFINGHTTP2(DOHFFI, DNSDistDOHTest):
_dohLibrary = 'nghttp2'
# Ensure all attributes exist
for field in ot_data["resource_spans"][0]["resource"]["attributes"]:
- self.assertTrue(field["key"] in ["service.name"])
+ self.assertIn(field["key"], ["service.name"])
# Ensure the values are correct
# TODO: query.remote with port
def _checkStatsValues(self, results, queriesCountersValue):
for i in list(range(1, 5)) + list(range(6, 20)) + list(range(24, 35)) + [ 35 ] :
oid = self._snmpOID + '.1.' + str(i) + '.0'
- self.assertTrue(oid in results)
+ self.assertIn(oid, results)
self.assertTrue(isinstance(results[oid], Counter64))
for i in range(20, 23):
raise TypeError("msg is not a dns.message.Message")
msgFlags = dns.flags.to_text(msg.flags)
- self.assertTrue('AD' in msgFlags, "No AD flag found in the message for %s" % msg.question[0].name)
+ self.assertIn('AD', msgFlags, "No AD flag found in the message for %s" % msg.question[0].name)
def assertRRsetInAnswer(self, msg, rrset):
"""Asserts the rrset (without comparing TTL) exists in the
#print(msg.response.tags)
self.assertEqual(len(msg.response.tags), len(tags))
for tag in msg.response.tags:
- self.assertTrue(tag in tags)
+ self.assertIn(tag, tags)
def checkProtobufMetas(self, msg, metas):
#print(metas)
for m in msg.meta:
self.assertTrue(m.HasField('key'))
self.assertTrue(m.HasField('value'))
- self.assertTrue(m.key in metas)
+ self.assertIn(m.key, metas)
for i in m.value.intVal :
- self.assertTrue(i in metas[m.key]['intVal'])
+ self.assertIn(i, metas[m.key]['intVal'])
for s in m.value.stringVal :
- self.assertTrue(s in metas[m.key]['stringVal'])
+ self.assertIn(s, metas[m.key]['stringVal'])
def checkProtobufOutgoingQuery(self, msg, protocol, query, qclass, qtype, qname, initiator='127.0.0.1', length=None, expectedECS=None):
self.assertEqual(msg.type, dnsmessage_pb2.PBDNSMessage.DNSOutgoingQueryType)
self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1', receivedSize=len(raw))
self.assertEqual(len(msg.response.rrs), 5)
for rr in msg.response.rrs:
- self.assertTrue(rr.type in [dns.rdatatype.AAAA, dns.rdatatype.TXT, dns.rdatatype.MX, dns.rdatatype.SPF, dns.rdatatype.SRV])
+ self.assertIn(rr.type, [dns.rdatatype.AAAA, dns.rdatatype.TXT, dns.rdatatype.MX, dns.rdatatype.SPF, dns.rdatatype.SRV])
if rr.type == dns.rdatatype.AAAA:
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.AAAA, name, 15)
count = 161
for i in list(range(1, count)):
oid = self._snmpOID + '.1.' + str(i) + '.0'
- self.assertTrue(oid in results)
+ self.assertIn(oid, results)
self.assertTrue(isinstance(results[oid], Counter64))
oid = self._snmpOID + '.1.' + str(count + 1) + '.0'