except WindowsError: # pylint: disable=undefined-variable
return False
- def _compute_timeout(self, start):
+ def _compute_timeout(self, start, lifetime=None):
+ lifetime = self.lifetime if lifetime is None else lifetime
now = time.time()
duration = now - start
if duration < 0:
# happen, e.g. under vmware with older linux kernels.
# Pretend it didn't happen.
now = start
- if duration >= self.lifetime:
+ if duration >= lifetime:
raise Timeout(timeout=duration)
- return min(self.lifetime - duration, self.timeout)
+ return min(lifetime - duration, self.timeout)
def query(self, qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
- tcp=False, source=None, raise_on_no_answer=True, source_port=0):
+ tcp=False, source=None, raise_on_no_answer=True, source_port=0,
+ lifetime=None):
"""Query nameservers to find the answer to the question.
The *qname*, *rdtype*, and *rdclass* parameters may be objects
*source_port*, an ``int``, the port from which to send the message.
+ *lifetime*, a ``float``, how long query should run before timing out.
+
Raises ``dns.exception.Timeout`` if no answers could be found
in the specified lifetime.
if len(nameservers) == 0:
raise NoNameservers(request=request, errors=errors)
for nameserver in nameservers[:]:
- timeout = self._compute_timeout(start)
+ timeout = self._compute_timeout(start, lifetime)
port = self.nameserver_ports.get(nameserver, self.port)
try:
tcp_attempt = tcp
if response.flags & dns.flags.TC:
# Response truncated; retry with TCP.
tcp_attempt = True
- timeout = self._compute_timeout(start)
+ timeout = self._compute_timeout(start, lifetime)
response = \
dns.query.tcp(request, nameserver,
timeout, port,
# But we still have servers to try. Sleep a bit
# so we don't pound them!
#
- timeout = self._compute_timeout(start)
+ timeout = self._compute_timeout(start, lifetime)
sleep_time = min(timeout, backoff)
backoff *= 2
time.sleep(sleep_time)
def query(qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
tcp=False, source=None, raise_on_no_answer=True,
- source_port=0):
+ source_port=0, lifetime=None):
"""Query nameservers to find the answer to the question.
This is a convenience function that uses the default resolver
"""
return get_default_resolver().query(qname, rdtype, rdclass, tcp, source,
- raise_on_no_answer, source_port)
+ raise_on_no_answer, source_port,
+ lifetime)
def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None):