import sys
import time
import random
+import warnings
try:
import threading as _threading
except ImportError:
raise Timeout(timeout=duration)
return min(lifetime - duration, self.timeout)
- def query(self, qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
- tcp=False, source=None, raise_on_no_answer=True, source_port=0,
- lifetime=None):
+ def _get_qnames_to_try(self, qname, search):
+ # This is a separate method so we can unit test the search
+ # rules without requiring the Internet.
+ qnames_to_try = []
+ if qname.is_absolute():
+ qnames_to_try.append(qname)
+ else:
+ if len(qname) > 1:
+ qnames_to_try.append(qname.concatenate(dns.name.root))
+ if search and self.search:
+ for suffix in self.search:
+ if self.ndots is None or len(qname.labels) >= self.ndots:
+ qnames_to_try.append(qname.concatenate(suffix))
+ else:
+ qnames_to_try.append(qname.concatenate(self.domain))
+ return qnames_to_try
+
+ def resolve(self, qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
+ tcp=False, source=None, raise_on_no_answer=True, source_port=0,
+ lifetime=None, search=False):
"""Query nameservers to find the answer to the question.
The *qname*, *rdtype*, and *rdclass* parameters may be objects
*source_port*, an ``int``, the port from which to send the message.
- *lifetime*, a ``float``, how many seconds a query should run before timing out.
+ *lifetime*, a ``float``, how many seconds a query should run
+ before timing out.
+
+ *search*, a ``bool``, determines whether search lists configured
+ in the system's resolver configuration are used. The default is
+ ``False``.
Raises ``dns.exception.Timeout`` if no answers could be found
in the specified lifetime.
nameservers are available to answer the question.
Returns a ``dns.resolver.Answer`` instance.
+
"""
if isinstance(qname, str):
rdclass = dns.rdataclass.from_text(rdclass)
if dns.rdataclass.is_metaclass(rdclass):
raise NoMetaqueries
- qnames_to_try = []
- if qname.is_absolute():
- qnames_to_try.append(qname)
- else:
- if len(qname) > 1:
- qnames_to_try.append(qname.concatenate(dns.name.root))
- if self.search:
- for suffix in self.search:
- if self.ndots is None or len(qname.labels) >= self.ndots:
- qnames_to_try.append(qname.concatenate(suffix))
- else:
- qnames_to_try.append(qname.concatenate(self.domain))
+ qnames_to_try = self._get_qnames_to_try(qname, search)
all_nxdomain = True
nxdomain_responses = {}
start = time.time()
self.cache.put((_qname, rdtype, rdclass), answer)
return answer
- def reverse_query(self, ipaddr, *args, **kwargs):
- """Use a resolver to run a Reverse IP Query for PTR records.
+ def query(self, qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
+ tcp=False, source=None, raise_on_no_answer=True, source_port=0,
+ lifetime=None):
+ """Query nameservers to find the answer to the question.
+
+ This method calls resolve() with ``search=True``, and is
+ provided for backwards compatbility with prior versions of
+ dnspython. See the documentation for the resolve() method for
+ further details.
+ """
+ warnings.warn('please use dns.resolver.Resolver.resolve() instead',
+ DeprecationWarning, stacklevel=2)
+ return self.resolve(qname, rdtype, rdclass, tcp, source,
+ raise_on_no_answer, source_port, lifetime,
+ True)
+
+ def resolve_address(self, ipaddr, *args, **kwargs):
+ """Use a resolver to run a reverse query for PTR records.
- This utilizes the in-built query function to perform a PTR lookup on the
+ This utilizes the resolve() method to perform a PTR lookup on the
specified IP address.
- *ipaddr*, a ``str``, the IP address you want to get the PTR record for.
+ *ipaddr*, a ``str``, the IPv4 or IPv6 address you want to get
+ the PTR record for.
- All other arguments that can be passed to the query function except for
- rdtype and rdclass are also supported by this function.
+ All other arguments that can be passed to the resolve() function
+ except for rdtype and rdclass are also supported by this
+ function.
"""
- return self.query(dns.reversename.from_address(ipaddr),
- rdtype=dns.rdatatype.PTR,
- rdclass=dns.rdataclass.IN,
- *args, **kwargs)
+ return self.resolve(dns.reversename.from_address(ipaddr),
+ rdtype=dns.rdatatype.PTR,
+ rdclass=dns.rdataclass.IN,
+ *args, **kwargs)
def use_tsig(self, keyring, keyname=None,
algorithm=dns.tsig.default_algorithm):
default_resolver = Resolver()
-def query(qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
- tcp=False, source=None, raise_on_no_answer=True,
- source_port=0, lifetime=None):
+def resolve(qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
+ tcp=False, source=None, raise_on_no_answer=True,
+ source_port=0, lifetime=None, search=False):
"""Query nameservers to find the answer to the question.
This is a convenience function that uses the default resolver
object to make the query.
- See ``dns.resolver.Resolver.query`` for more information on the
+ See ``dns.resolver.Resolver.resolve`` for more information on the
parameters.
"""
- return get_default_resolver().query(qname, rdtype, rdclass, tcp, source,
- raise_on_no_answer, source_port,
- lifetime)
+ return get_default_resolver().resolve(qname, rdtype, rdclass, tcp, source,
+ raise_on_no_answer, source_port,
+ lifetime, search)
+
+def query(qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
+ tcp=False, source=None, raise_on_no_answer=True,
+ source_port=0, lifetime=None):
+ """Query nameservers to find the answer to the question.
+
+ This method calls resolve() with ``search=True``, and is
+ provided for backwards compatbility with prior versions of
+ dnspython. See the documentation for the resolve() method for
+ further details.
+ """
+ warnings.warn('please use dns.resolver.resolve() instead',
+ DeprecationWarning, stacklevel=2)
+ return resolve(qname, rdtype, rdclass, tcp, source,
+ raise_on_no_answer, source_port, lifetime,
+ True)
def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None):
raise NotAbsolute(name)
while 1:
try:
- answer = resolver.query(name, dns.rdatatype.SOA, rdclass, tcp)
+ answer = resolver.resolve(name, dns.rdatatype.SOA, rdclass, tcp)
if answer.rrset.name == name:
return name
# otherwise we were CNAMEd or DNAMEd and need to look higher
class NoResolverConfiguration(exception.DNSException): ...
Timeout = exception.Timeout
-def query(qname : str, rdtype : Union[int,str] = 0, rdclass : Union[int,str] = 0,
+def resolve(qname : str, rdtype : Union[int,str] = 0,
+ rdclass : Union[int,str] = 0,
+ tcp=False, source=None, raise_on_no_answer=True,
+ source_port=0, lifetime : Optional[float]=None,
+ search : bool = False):
+ ...
+def query(qname : str, rdtype : Union[int,str] = 0,
+ rdclass : Union[int,str] = 0,
tcp=False, source=None, raise_on_no_answer=True,
- source_port=0):
+ source_port=0, lifetime : Optional[float]=None):
...
-def reverse_query(self, ipaddr: str, *args: Any, **kwargs: Optional[Dict]):
+def resolve_address(self, ipaddr: str, *args: Any, **kwargs: Optional[Dict]):
...
class LRUCache:
def __init__(self, max_size=1000):
def __init__(self, qname, rdtype, rdclass, response,
raise_on_no_answer=True):
...
-def zone_for_name(name, rdclass : int = rdataclass.IN, tcp=False, resolver : Optional[Resolver] = None):
+def zone_for_name(name, rdclass : int = rdataclass.IN, tcp=False,
+ resolver : Optional[Resolver] = None):
...
class Resolver:
- def __init__(self, filename : Optional[str] = '/etc/resolv.conf', configure : Optional[bool] = True):
+ def __init__(self, filename : Optional[str] = '/etc/resolv.conf',
+ configure : Optional[bool] = True):
self.nameservers : List[str]
- def query(self, qname : str, rdtype : Union[int,str] = rdatatype.A, rdclass : Union[int,str] = rdataclass.IN,
- tcp : bool = False, source : Optional[str] = None, raise_on_no_answer=True, source_port : int = 0):
+ def resolve(self, qname : str, rdtype : Union[int,str] = rdatatype.A,
+ rdclass : Union[int,str] = rdataclass.IN,
+ tcp : bool = False, source : Optional[str] = None,
+ raise_on_no_answer=True, source_port : int = 0,
+ lifetime : Optional[float]=None, search : bool = False):
+ ...
+ def query(self, qname : str, rdtype : Union[int,str] = rdatatype.A,
+ rdclass : Union[int,str] = rdataclass.IN,
+ tcp : bool = False, source : Optional[str] = None,
+ raise_on_no_answer=True, source_port : int = 0,
+ lifetime : Optional[float]=None):
...