"""Talk to a DNS server."""
-import os
import socket
import struct
import time
-import base64
-import ipaddress
import dns.asyncbackend
import dns.exception
import dns.rdataclass
import dns.rdatatype
-from dns.query import _addresses_equal, _destination_and_source, \
- _compute_times, UnexpectedSource
+from dns.query import _addresses_equal, _compute_times, UnexpectedSource, \
+ BadResponse
# for brevity
async def udp_with_fallback(q, where, timeout=None, port=53, source=None,
source_port=0, ignore_unexpected=False,
one_rr_per_rrset=False, ignore_trailing=False,
- udp_sock=None, tcp_sock=None):
+ udp_sock=None, tcp_sock=None, backend=None):
"""Return the response to the query, trying UDP first and falling back
to TCP if UDP results in a truncated response.
socket is created. Note that if a socket is provided *where*,
*source* and *source_port* are ignored for the TCP query.
+ *backend*, a ``dns.asyncbackend.Backend``, or ``None``. If ``None``,
+ the default, then dnspython will use the default backend.
+
Returns a (``dns.message.Message``, tcp) tuple where tcp is ``True``
if and only if TCP was used.
"""
try:
response = await udp(q, where, timeout, port, source, source_port,
ignore_unexpected, one_rr_per_rrset,
- ignore_trailing, True, udp_sock)
+ ignore_trailing, True, udp_sock, backend)
return (response, False)
except dns.message.Truncated:
response = await tcp(q, where, timeout, port, source, source_port,
- one_rr_per_rrset, ignore_trailing, tcp_sock)
+ one_rr_per_rrset, ignore_trailing, tcp_sock,
+ backend)
return (response, True)
-
async def send_tcp(sock, what, expiration=None):
"""Send a DNS message to the specified TCP socket.
if answer is not None:
# cache hit!
return answer
- loops = 1
done = False
while not done:
(nameserver, port, tcp, backoff) = resolution.next_nameserver()
async def resolve(qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
tcp=False, source=None, raise_on_no_answer=True,
- source_port=0, search=None):
+ source_port=0, search=None, backend=None):
"""Query nameservers asynchronously to find the answer to the question.
This is a convenience function that uses the default resolver
return await get_default_resolver().resolve(qname, rdtype, rdclass, tcp,
source, raise_on_no_answer,
- source_port, search)
+ source_port, search, backend)
async def resolve_address(ipaddr, *args, **kwargs):
async def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False,
- resolver=None):
+ resolver=None, backend=None):
"""Find the name of the zone which contains the specified name.
*name*, an absolute ``dns.name.Name`` or ``str``, the query name.
*resolver*, a ``dns.asyncresolver.Resolver`` or ``None``, the
resolver to use. If ``None``, the default resolver is used.
+ *backend*, a ``dns.asyncbackend.Backend``, or ``None``. If ``None``,
+ the default, then dnspython will use the default backend.
+
Raises ``dns.resolver.NoRootSOA`` if there is no SOA RR at the DNS
root. (This is only likely to happen if you're using non-default
root servers in your network and they are misconfigured.)
while True:
try:
answer = await resolver.resolve(name, dns.rdatatype.SOA, rdclass,
- tcp)
+ tcp, backend=backend)
if answer.rrset.name == name:
return name
# otherwise we were CNAMEd or DNAMEd and need to look higher