def async_run(self, afunc):
return asyncio.run(afunc())
+ @tests.util.retry_on_timeout
def testResolve(self):
async def run():
answer = await dns.asyncresolver.resolve("dns.google.", "A")
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def testResolveAddress(self):
async def run():
return await dns.asyncresolver.resolve_address("8.8.8.8")
dnsgoogle = dns.name.from_text("dns.google.")
self.assertEqual(answer[0].target, dnsgoogle)
+ @tests.util.retry_on_timeout
def testResolveName(self):
async def run1():
return await dns.asyncresolver.resolve_name("dns.google.")
with self.assertRaises(dns.resolver.NoAnswer):
self.async_run(run5)
+ @tests.util.retry_on_timeout
def testCanonicalNameNoCNAME(self):
cname = dns.name.from_text("www.google.com")
self.assertEqual(self.async_run(run), cname)
+ @tests.util.retry_on_timeout
def testCanonicalNameCNAME(self):
name = dns.name.from_text("www.dnspython.org")
cname = dns.name.from_text("dmfrjf4ips8xa.cloudfront.net")
@unittest.skipIf(
_systemd_resolved_present or _is_docker, "systemd-resolved or docker in use"
)
+ @tests.util.retry_on_timeout
def testCanonicalNameDangling(self):
name = dns.name.from_text("dangling-cname.dnspython.org")
cname = dns.name.from_text("dangling-target.dnspython.org")
self.assertEqual(self.async_run(run), cname)
+ @tests.util.retry_on_timeout
def testZoneForName1(self):
async def run():
name = dns.name.from_text("www.dnspython.org.")
zname = self.async_run(run)
self.assertEqual(zname, ezname)
+ @tests.util.retry_on_timeout
def testZoneForName2(self):
async def run():
name = dns.name.from_text("a.b.www.dnspython.org.")
zname = self.async_run(run)
self.assertEqual(zname, ezname)
+ @tests.util.retry_on_timeout
def testZoneForName3(self):
async def run():
name = dns.name.from_text("dnspython.org.")
self.assertRaises(dns.resolver.NotAbsolute, bad)
+ @tests.util.retry_on_timeout
def testQueryUDP(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def testQueryUDPWithSocket(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def testQueryTCP(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def testQueryTCPWithSocket(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.4.4" in seen)
@unittest.skipIf(not _ssl_available, "SSL not available")
+ @tests.util.retry_on_timeout
def testQueryTLS(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.4.4" in seen)
@unittest.skipIf(not _ssl_available, "SSL not available")
+ @tests.util.retry_on_timeout
def testQueryTLSWithContext(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.4.4" in seen)
@unittest.skipIf(not _ssl_available, "SSL not available")
+ @tests.util.retry_on_timeout
def testQueryTLSWithSocket(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def testQueryUDPFallback(self):
for address in query_addresses:
qname = dns.name.from_text(".")
(_, tcp) = self.async_run(run)
self.assertTrue(tcp)
+ @tests.util.retry_on_timeout
def testQueryUDPFallbackNoFallback(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
(_, tcp) = self.async_run(run)
self.assertFalse(tcp)
+ @tests.util.retry_on_timeout
def testUDPReceiveQuery(self):
async def run():
async with await self.backend.make_socket(
self.assertRaises(dns.exception.Timeout, run)
@unittest.skipIf(not dns.query._have_httpx, "httpx not available")
+ @tests.util.retry_on_timeout
def testDOHGetRequest(self):
async def run():
nameserver_url = random.choice(KNOWN_ANYCAST_DOH_RESOLVER_URLS)
self.async_run(run)
@unittest.skipIf(not dns.query._have_httpx, "httpx not available")
+ @tests.util.retry_on_timeout
def testDOHPostRequest(self):
async def run():
nameserver_url = random.choice(KNOWN_ANYCAST_DOH_RESOLVER_URLS)
self.async_run(run)
@unittest.skipIf(not dns.quic.have_quic, "aioquic not available")
+ @tests.util.retry_on_timeout
def testDoH3GetRequest(self):
async def run():
nameserver_url = random.choice(KNOWN_ANYCAST_DOH3_RESOLVER_URLS)
self.async_run(run)
@unittest.skipIf(not dns.quic.have_quic, "aioquic not available")
+ @tests.util.retry_on_timeout
def TestDoH3PostRequest(self):
async def run():
nameserver_url = random.choice(KNOWN_ANYCAST_DOH3_RESOLVER_URLS)
self.async_run(run)
@unittest.skipIf(not dns.quic.have_quic, "aioquic not available")
+ @tests.util.retry_on_timeout
def TestDoH3QueryIP(self):
async def run():
nameserver_ip = "8.8.8.8"
self.async_run(run)
@unittest.skipIf(not dns.query._have_httpx, "httpx not available")
+ @tests.util.retry_on_timeout
def testResolverDOH(self):
async def run():
res = dns.asyncresolver.Resolver(configure=False)
self.async_run(run)
@unittest.skipIf(not tests.util.have_ipv4(), "IPv4 not reachable")
+ @tests.util.retry_on_timeout
def testResolveAtAddress(self):
async def run():
answer = await dns.asyncresolver.resolve_at("8.8.8.8", "dns.google.", "A")
self.async_run(run)
@unittest.skipIf(not tests.util.have_ipv4(), "IPv4 not reachable")
+ @tests.util.retry_on_timeout
def testResolveAtName(self):
async def run():
answer = await dns.asyncresolver.resolve_at(
def async_run(self, afunc):
return asyncio.run(afunc())
+ @tests.util.retry_on_timeout
def testUseAfterTimeout(self):
# Test #843 fix.
async def run():
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import asyncio
-import time
import pytest
import dns.asyncbackend
import dns.asyncresolver
-import dns.resolver
import dns.nameserver
-
+import dns.resolver
import tests.util
@pytest.mark.skipif(
not tests.util.is_internet_reachable(), reason="Internet not reachable"
)
+@tests.util.retry_on_timeout
def test_basic_ddr_sync():
for nameserver in ["1.1.1.1", "8.8.8.8"]:
res = dns.resolver.Resolver(configure=False)
@pytest.mark.skipif(
not tests.util.is_internet_reachable(), reason="Internet not reachable"
)
+@tests.util.retry_on_timeout
def test_basic_ddr_async():
async def run():
dns.asyncbackend._default_backend = None
import socket
import unittest
+import dns.exception
+
try:
import ssl
def tearDown(self):
self.session.close()
+ @tests.util.retry_on_timeout
def test_get_request(self):
nameserver_url = random.choice(KNOWN_ANYCAST_DOH_RESOLVER_URLS)
q = dns.message.make_query("example.com.", dns.rdatatype.A)
)
self.assertTrue(q.is_response(r))
+ @tests.util.retry_on_timeout
def test_post_request(self):
nameserver_url = random.choice(KNOWN_ANYCAST_DOH_RESOLVER_URLS)
q = dns.message.make_query("example.com.", dns.rdatatype.A)
)
self.assertTrue(q.is_response(r))
+ @tests.util.retry_on_timeout
def test_build_url_from_ip(self):
self.assertTrue(resolver_v4_addresses or resolver_v6_addresses)
if resolver_v4_addresses:
# )
# self.assertTrue(q.is_response(r))
+ @tests.util.retry_on_timeout
def test_new_session(self):
nameserver_url = random.choice(KNOWN_ANYCAST_DOH_RESOLVER_URLS)
q = dns.message.make_query("example.com.", dns.rdatatype.A)
r = dns.query.https(q, nameserver_url, timeout=4)
self.assertTrue(q.is_response(r))
+ @tests.util.retry_on_timeout
def test_resolver(self):
res = dns.resolver.Resolver(configure=False)
res.nameservers = ["https://dns.google/dns-query"]
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def test_padded_get(self):
nameserver_url = random.choice(KNOWN_PAD_AWARE_DOH_RESOLVER_URLS)
q = dns.message.make_query("example.com.", dns.rdatatype.A, use_edns=0, pad=128)
"Aioquic cannot be imported; no DNS over HTTP3 (DOH3)",
)
class DNSOverHTTP3TestCase(unittest.TestCase):
+ @tests.util.retry_on_timeout
def testDoH3GetRequest(self):
nameserver_url = random.choice(KNOWN_ANYCAST_DOH3_RESOLVER_URLS)
q = dns.message.make_query("dns.google.", dns.rdatatype.A)
)
self.assertTrue(q.is_response(r))
+ @tests.util.retry_on_timeout
def testDoH3PostRequest(self):
nameserver_url = random.choice(KNOWN_ANYCAST_DOH3_RESOLVER_URLS)
q = dns.message.make_query("dns.google.", dns.rdatatype.A)
)
self.assertTrue(q.is_response(r))
+ @tests.util.retry_on_timeout
def test_build_url_from_ip(self):
self.assertTrue(resolver_v4_addresses or resolver_v6_addresses)
if resolver_v4_addresses:
@unittest.skipIf(not tests.util.is_internet_reachable(), "Internet not reachable")
class QueryTests(unittest.TestCase):
+ @tests.util.retry_on_timeout
def testQueryUDP(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def testQueryUDPWithSocket(self):
for address in query_addresses:
with socket.socket(
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def testQueryTCP(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def testQueryTCPWithSocket(self):
for address in query_addresses:
with socket.socket(
self.assertTrue("8.8.4.4" in seen)
@unittest.skipUnless(have_ssl, "No SSL support")
+ @tests.util.retry_on_timeout
def testQueryTLS(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.4.4" in seen)
@unittest.skipUnless(have_ssl, "No SSL support")
+ @tests.util.retry_on_timeout
def testQueryTLSWithContext(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
self.assertTrue("8.8.4.4" in seen)
@unittest.skipUnless(have_ssl, "No SSL support")
+ @tests.util.retry_on_timeout
def testQueryTLSWithSocket(self):
for address in query_addresses:
with socket.socket(
self.assertTrue("8.8.4.4" in seen)
@unittest.skipUnless(have_ssl, "No SSL support")
+ @tests.util.retry_on_timeout
def testQueryTLSwithPadding(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
has_pad = True
self.assertTrue(has_pad)
+ @tests.util.retry_on_timeout
def testQueryUDPFallback(self):
for address in query_addresses:
qname = dns.name.from_text(".")
(_, tcp) = dns.query.udp_with_fallback(q, address, timeout=4)
self.assertTrue(tcp)
+ @tests.util.retry_on_timeout
def testQueryUDPFallbackWithSocket(self):
for address in query_addresses:
af = dns.inet.af_for_address(address)
)
self.assertTrue(tcp)
+ @tests.util.retry_on_timeout
def testQueryUDPFallbackNoFallback(self):
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
(_, tcp) = dns.query.udp_with_fallback(q, address, timeout=2)
self.assertFalse(tcp)
+ @tests.util.retry_on_timeout
def testUDPReceiveQuery(self):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as listener:
listener.bind(("127.0.0.1", 0))
@unittest.skipIf(not tests.util.is_internet_reachable(), "Internet not reachable")
class LiveResolverTests(unittest.TestCase):
+ @tests.util.retry_on_timeout
def testZoneForName1(self):
name = dns.name.from_text("www.dnspython.org.")
ezname = dns.name.from_text("dnspython.org.")
zname = dns.resolver.zone_for_name(name)
self.assertEqual(zname, ezname)
+ @tests.util.retry_on_timeout
def testZoneForName2(self):
name = dns.name.from_text("a.b.www.dnspython.org.")
ezname = dns.name.from_text("dnspython.org.")
zname = dns.resolver.zone_for_name(name)
self.assertEqual(zname, ezname)
+ @tests.util.retry_on_timeout
def testZoneForName3(self):
ezname = dns.name.from_text("dnspython.org.")
zname = dns.resolver.zone_for_name("dnspython.org.")
self.assertRaises(dns.resolver.NotAbsolute, bad)
+ @tests.util.retry_on_timeout
def testResolve(self):
answer = dns.resolver.resolve("dns.google.", "A")
seen = set([rdata.address for rdata in answer])
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def testResolveTCP(self):
answer = dns.resolver.resolve("dns.google.", "A", tcp=True)
seen = set([rdata.address for rdata in answer])
self.assertTrue("8.8.8.8" in seen)
self.assertTrue("8.8.4.4" in seen)
+ @tests.util.retry_on_timeout
def testResolveAddress(self):
answer = dns.resolver.resolve_address("8.8.8.8")
dnsgoogle = dns.name.from_text("dns.google.")
self.assertEqual(answer[0].target, dnsgoogle)
+ @tests.util.retry_on_timeout
def testResolveName(self):
answers = dns.resolver.resolve_name("dns.google.")
seen = set(answers.addresses())
with self.assertRaises(dns.resolver.NoAnswer):
dns.resolver.resolve_name(dns.reversename.from_address("8.8.8.8"))
+ @tests.util.retry_on_timeout
@patch.object(dns.message.Message, "use_edns")
def testResolveEdnsOptions(self, message_use_edns_mock):
resolver = dns.resolver.Resolver()
resolver.resolve("dns.google.", "A")
assert {"options": options} in message_use_edns_mock.call_args
+ @tests.util.retry_on_timeout
def testResolveNodataException(self):
def bad():
dns.resolver.resolve("dnspython.org.", "SRV")
self.assertRaises(dns.resolver.NoAnswer, bad)
+ @tests.util.retry_on_timeout
def testResolveNodataAnswer(self):
qname = dns.name.from_text("dnspython.org")
qclass = dns.rdataclass.from_text("IN")
),
)
+ @tests.util.retry_on_timeout
def testResolveNXDOMAIN(self):
qname = dns.name.from_text("nxdomain.dnspython.org")
qclass = dns.rdataclass.from_text("IN")
self.assertGreaterEqual(len(nx.responses()), 1)
@unittest.skipIf(not tests.util.have_ipv4(), "IPv4 not reachable")
+ @tests.util.retry_on_timeout
def testResolveCacheHit(self):
res = dns.resolver.Resolver(configure=False)
res.nameservers = ["8.8.8.8"]
self.assertIs(answer2, answer1)
@unittest.skipIf(not tests.util.have_ipv4(), "IPv4 not reachable")
+ @tests.util.retry_on_timeout
def testTLSNameserver(self):
res = dns.resolver.Resolver(configure=False)
res.nameservers = [dns.nameserver.DoTNameserver("8.8.8.8", 853)]
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import enum
+import functools
import inspect
import os
+import dns.exception
import dns.message
import dns.name
import dns.query
return os.path.isfile("/.dockerenv")
except Exception:
return False
+
+
+def retry_on_timeout(f):
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ bad = True
+ for i in range(3):
+ try:
+ print("TRY", i, "for", f.__name__)
+ f(*args, **kwargs)
+ bad = False
+ break
+ except dns.exception.Timeout:
+ pass
+ if bad:
+ raise dns.exception.Timeout
+
+ return wrapper