@ivar expiration: The time when the answer expires
@type expiration: float (seconds since the epoch)
"""
- def __init__(self, qname, rdtype, rdclass, response):
+ def __init__(self, qname, rdtype, rdclass, response,
+ raise_on_no_answer=True):
self.qname = qname
self.rdtype = rdtype
self.rdclass = rdclass
break
continue
except KeyError:
- raise NoAnswer
- raise NoAnswer
- if rrset is None:
+ if raise_on_no_answer:
+ raise NoAnswer
+ if raise_on_no_answer:
+ raise NoAnswer
+ if rrset is None and raise_on_no_answer:
raise NoAnswer
self.rrset = rrset
+ if rrset is None:
+ while 1:
+ # Look for a SOA RR whose owner name is a superdomain
+ # of qname.
+ try:
+ srrset = response.find_rrset(response.authority, qname,
+ rdclass, dns.rdatatype.SOA)
+ if min_ttl == -1 or srrset.ttl < min_ttl:
+ min_ttl = srrset.ttl
+ if srrset[0].minimum < min_ttl:
+ min_ttl = srrset[0].minimum
+ break
+ except KeyError:
+ try:
+ qname = qname.parent()
+ except dns.name.NoParent:
+ break
self.expiration = time.time() + min_ttl
def __getattr__(self, attr):
return min(self.lifetime - duration, self.timeout)
def query(self, qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
- tcp=False, source=None):
+ tcp=False, source=None, raise_on_no_answer=True):
"""Query nameservers to find the answer to the question.
The I{qname}, I{rdtype}, and I{rdclass} parameters may be objects
@type tcp: bool
@param source: bind to this IP address (defaults to machine default IP).
@type source: IP address in dotted quad notation
+ @param raise_on_no_answer: raise NoAnswer if there's no answer
+ (defaults is True).
+ @type raise_on_no_answer: bool
@rtype: dns.resolver.Answer instance
@raises Timeout: no answers could be found in the specified lifetime
@raises NXDOMAIN: the query name does not exist
- @raises NoAnswer: the response did not contain an answer
+ @raises NoAnswer: the response did not contain an answer and
+ raise_on_no_answer is True.
@raises NoNameservers: no non-broken nameservers are available to
answer the question."""
break
if all_nxdomain:
raise NXDOMAIN
- answer = Answer(qname, rdtype, rdclass, response)
+ answer = Answer(qname, rdtype, rdclass, response,
+ raise_on_no_answer)
if self.cache:
self.cache.put((qname, rdtype, rdclass), answer)
return answer
return default_resolver
def query(qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
- tcp=False, source=None):
+ tcp=False, source=None, raise_on_no_answer=True):
"""Query nameservers to find the answer to the question.
This is a convenience function that uses the default resolver
object to make the query.
@see: L{dns.resolver.Resolver.query} for more information on the
parameters."""
- return get_default_resolver().query(qname, rdtype, rdclass, tcp, source)
+ return get_default_resolver().query(qname, rdtype, rdclass, tcp, source,
+ raise_on_no_answer)
def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None):
"""Find the name of the zone which contains the specified name.