self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @unittest.skipIf(not _ssl_available, "SSL not available")
+ def testQueryTLSWithContext(self):
+ for address in query_addresses:
+ qname = dns.name.from_text("dns.google.")
+
+ async def run():
+ ssl_context = ssl.create_default_context()
+ ssl_context.check_hostname = True
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ return await dns.asyncquery.tls(
+ q, address, timeout=2, ssl_context=ssl_context
+ )
+
+ response = self.async_run(run)
+ rrs = response.get_rrset(
+ response.answer, qname, dns.rdataclass.IN, dns.rdatatype.A
+ )
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue("8.8.8.8" in seen)
+ self.assertTrue("8.8.4.4" in seen)
+
@unittest.skipIf(not _ssl_available, "SSL not available")
def testQueryTLSWithSocket(self):
for address in query_addresses:
try:
- import trio
import sniffio
+ import trio
class TrioAsyncDetectionTests(AsyncDetectionTests):
sniff_result = "trio"
import dns.inet
import dns.message
import dns.name
+import dns.query
import dns.rdataclass
import dns.rdatatype
-import dns.query
import dns.tsigkeyring
import dns.zone
import tests.util
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @unittest.skipUnless(have_ssl, "No SSL support")
+ def testQueryTLSWithContext(self):
+ for address in query_addresses:
+ qname = dns.name.from_text("dns.google.")
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ ssl_context = ssl.create_default_context()
+ ssl_context.check_hostname = False
+ response = dns.query.tls(q, address, timeout=2, ssl_context=ssl_context)
+ rrs = response.get_rrset(
+ response.answer, qname, dns.rdataclass.IN, dns.rdatatype.A
+ )
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue("8.8.8.8" in seen)
+ self.assertTrue("8.8.4.4" in seen)
+
@unittest.skipUnless(have_ssl, "No SSL support")
def testQueryTLSWithSocket(self):
for address in query_addresses: