class NXDOMAIN(dns.exception.DNSException):
"""The DNS query name does not exist."""
- supp_kwargs = set(['qname'])
+ supp_kwargs = set(['qnames', 'responses'])
+ fmt = None # we have our own __str__ implementation
+
+ def _check_kwargs(self, qnames, responses=None):
+ if not isinstance(qnames, (list, tuple, set)):
+ raise AttributeError("qnames must be a list, tuple or set")
+ if len(qnames) == 0:
+ raise AttributeError("qnames must contain at least one element")
+ if responses is None:
+ responses = {}
+ elif not isinstance(responses, dict):
+ raise AttributeError("responses must be a dict(qname=response)")
+ kwargs = dict(qnames=qnames, responses=responses)
+ return kwargs
def __str__(self):
- if 'qname' not in self.kwargs:
+ if 'qnames' not in self.kwargs:
return super(NXDOMAIN, self).__str__()
-
- qname = self.kwargs['qname']
- msg = self.__doc__[:-1]
- if isinstance(qname, (list, set)):
- if len(qname) > 1:
- msg = 'None of DNS query names exist'
- qname = list(map(str, qname))
- else:
- qname = qname[0]
- return "%s: %s" % (msg, (str(qname)))
+ qnames = self.kwargs['qnames']
+ if len(qnames) > 1:
+ msg = 'None of DNS query names exist'
+ else:
+ msg = self.__doc__[:-1]
+ qnames = ', '.join(map(str, qnames))
+ return "%s: %s" % (msg, qnames)
+
+ def canonical_name(self):
+ if not 'qnames' in self.kwargs:
+ raise TypeError("parametrized exception required")
+ IN = dns.rdataclass.IN
+ CNAME = dns.rdatatype.CNAME
+ cname = None
+ for qname in self.kwargs['qnames']:
+ response = self.kwargs['responses'][qname]
+ for answer in response.answer:
+ if answer.rdtype != CNAME or answer.rdclass != IN:
+ continue
+ cname = answer.items[0].target.to_text()
+ if cname is not None:
+ return dns.name.from_text(cname)
+ return self.kwargs['qnames'][0]
+ canonical_name = property(canonical_name, doc=(
+ "Return the unresolved canonical name."))
+
+ def __add__(self, e_nx):
+ """Augment by results from another NXDOMAIN exception."""
+ qnames0 = list(self.kwargs.get('qnames', []))
+ responses0 = dict(self.kwargs.get('responses', {}))
+ responses1 = e_nx.kwargs.get('responses', {})
+ for qname1 in e_nx.kwargs.get('qnames', []):
+ if qname1 not in qnames0:
+ qnames0.append(qname1)
+ if qname1 in responses1:
+ responses0[qname1] = responses1[qname1]
+ return NXDOMAIN(qnames=qnames0, responses=responses0)
class YXDOMAIN(dns.exception.DNSException):
else:
qnames_to_try.append(qname.concatenate(self.domain))
all_nxdomain = True
+ nxdomain_responses = {}
start = time.time()
for qname in qnames_to_try:
if self.cache:
backoff *= 2
time.sleep(sleep_time)
if response.rcode() == dns.rcode.NXDOMAIN:
+ nxdomain_responses[qname] = response
continue
all_nxdomain = False
break
if all_nxdomain:
- raise NXDOMAIN(qname=qnames_to_try)
+ raise NXDOMAIN(qnames=qnames_to_try, responses=nxdomain_responses)
answer = Answer(qname, rdtype, rdclass, response,
raise_on_no_answer)
if self.cache:
;ADDITIONAL
"""
+dangling_cname_0_message_text = """id 10000
+opcode QUERY
+rcode NOERROR
+flags QR AA RD RA
+;QUESTION
+91.11.17.172.in-addr.arpa.none. IN PTR
+;ANSWER
+;AUTHORITY
+;ADDITIONAL
+"""
+
+dangling_cname_1_message_text = """id 10001
+opcode QUERY
+rcode NOERROR
+flags QR AA RD RA
+;QUESTION
+91.11.17.172.in-addr.arpa. IN PTR
+;ANSWER
+11.17.172.in-addr.arpa. 86400 IN DNAME 11.8-22.17.172.in-addr.arpa.
+91.11.17.172.in-addr.arpa. 86400 IN CNAME 91.11.8-22.17.172.in-addr.arpa.
+;AUTHORITY
+;ADDITIONAL
+"""
+
+dangling_cname_2_message_text = """id 10002
+opcode QUERY
+rcode NOERROR
+flags QR AA RD RA
+;QUESTION
+91.11.17.172.in-addr.arpa.example. IN PTR
+;ANSWER
+91.11.17.172.in-addr.arpa.example. 86400 IN CNAME 91.11.17.172.in-addr.arpa.base.
+91.11.17.172.in-addr.arpa.base. 86400 IN CNAME 91.11.17.172.clients.example.
+91.11.17.172.clients.example. 86400 IN CNAME 91-11-17-172.dynamic.example.
+;AUTHORITY
+;ADDITIONAL
+"""
+
+
class FakeAnswer(object):
def __init__(self, expiration):
self.expiration = expiration
def polling_backend(self):
return dns.query._poll_for
+class NXDOMAINExceptionTestCase(unittest.TestCase):
+
+ def test_nxdomain_compatible(self):
+ n1 = dns.name.Name(('a', 'b', ''))
+ n2 = dns.name.Name(('a', 'b', 's', ''))
+ py3 = (sys.version_info[0] > 2)
+
+ try:
+ raise dns.resolver.NXDOMAIN
+ except Exception as e:
+ if not py3: self.assertTrue((e.message == e.__doc__))
+ self.assertTrue((e.args == (e.__doc__,)))
+ self.assertTrue(('kwargs' in dir(e)))
+ self.assertTrue((str(e) == e.__doc__), str(e))
+ self.assertTrue(('qnames' not in e.kwargs))
+ self.assertTrue(('responses' not in e.kwargs))
+
+ try:
+ raise dns.resolver.NXDOMAIN("errmsg")
+ except Exception as e:
+ if not py3: self.assertTrue((e.message == "errmsg"))
+ self.assertTrue((e.args == ("errmsg",)))
+ self.assertTrue(('kwargs' in dir(e)))
+ self.assertTrue((str(e) == "errmsg"), str(e))
+ self.assertTrue(('qnames' not in e.kwargs))
+ self.assertTrue(('responses' not in e.kwargs))
+
+ try:
+ raise dns.resolver.NXDOMAIN("errmsg", -1)
+ except Exception as e:
+ if not py3: self.assertTrue((e.message == ""))
+ self.assertTrue((e.args == ("errmsg", -1)))
+ self.assertTrue(('kwargs' in dir(e)))
+ self.assertTrue((str(e) == "('errmsg', -1)"), str(e))
+ self.assertTrue(('qnames' not in e.kwargs))
+ self.assertTrue(('responses' not in e.kwargs))
+
+ try:
+ raise dns.resolver.NXDOMAIN(qnames=None)
+ except Exception as e:
+ self.assertTrue((isinstance(e, AttributeError)))
+
+ try:
+ raise dns.resolver.NXDOMAIN(qnames=n1)
+ except Exception as e:
+ self.assertTrue((isinstance(e, AttributeError)))
+
+ try:
+ raise dns.resolver.NXDOMAIN(qnames=[])
+ except Exception as e:
+ self.assertTrue((isinstance(e, AttributeError)))
+
+ try:
+ raise dns.resolver.NXDOMAIN(qnames=[n1])
+ except Exception as e:
+ MSG = "The DNS query name does not exist: a.b."
+ if not py3: self.assertTrue((e.message == MSG), e.message)
+ self.assertTrue((e.args == (MSG,)), repr(e.args))
+ self.assertTrue(('kwargs' in dir(e)))
+ self.assertTrue((str(e) == MSG), str(e))
+ self.assertTrue(('qnames' in e.kwargs))
+ self.assertTrue((e.kwargs['qnames'] == [n1]))
+ self.assertTrue(('responses' in e.kwargs))
+ self.assertTrue((e.kwargs['responses'] == {}))
+
+ try:
+ raise dns.resolver.NXDOMAIN(qnames=[n2, n1])
+ except Exception as e:
+ e0 = dns.resolver.NXDOMAIN("errmsg")
+ e = e0 + e
+ MSG = "None of DNS query names exist: a.b.s., a.b."
+ if not py3: self.assertTrue((e.message == MSG), e.message)
+ self.assertTrue((e.args == (MSG,)), repr(e.args))
+ self.assertTrue(('kwargs' in dir(e)))
+ self.assertTrue((str(e) == MSG), str(e))
+ self.assertTrue(('qnames' in e.kwargs))
+ self.assertTrue((e.kwargs['qnames'] == [n2, n1]))
+ self.assertTrue(('responses' in e.kwargs))
+ self.assertTrue((e.kwargs['responses'] == {}))
+
+ try:
+ raise dns.resolver.NXDOMAIN(qnames=[n1], responses=['r1.1'])
+ except Exception as e:
+ self.assertTrue((isinstance(e, AttributeError)))
+
+ try:
+ raise dns.resolver.NXDOMAIN(qnames=[n1], responses={n1: 'r1.1'})
+ except Exception as e:
+ MSG = "The DNS query name does not exist: a.b."
+ if not py3: self.assertTrue((e.message == MSG), e.message)
+ self.assertTrue((e.args == (MSG,)), repr(e.args))
+ self.assertTrue(('kwargs' in dir(e)))
+ self.assertTrue((str(e) == MSG), str(e))
+ self.assertTrue(('qnames' in e.kwargs))
+ self.assertTrue((e.kwargs['qnames'] == [n1]))
+ self.assertTrue(('responses' in e.kwargs))
+ self.assertTrue((e.kwargs['responses'] == {n1: 'r1.1'}))
+
+ def test_nxdomain_merge(self):
+ n1 = dns.name.Name(('a', 'b', ''))
+ n2 = dns.name.Name(('a', 'b', ''))
+ n3 = dns.name.Name(('a', 'b', 'c', ''))
+ n4 = dns.name.Name(('a', 'b', 'd', ''))
+ responses1 = {n1: 'r1.1', n2: 'r1.2', n4: 'r1.4'}
+ qnames1 = [n1, n4] # n2 == n1
+ responses2 = {n2: 'r2.2', n3: 'r2.3'}
+ qnames2 = [n2, n3]
+ e0 = dns.resolver.NXDOMAIN()
+ e1 = dns.resolver.NXDOMAIN(qnames=qnames1, responses=responses1)
+ e2 = dns.resolver.NXDOMAIN(qnames=qnames2, responses=responses2)
+ e = e1 + e0 + e2
+ self.assertRaises(AttributeError, lambda : e0 + e0)
+ self.assertTrue(e.kwargs['qnames'] == [n1, n4, n3], repr(e.kwargs['qnames']))
+ self.assertTrue(e.kwargs['responses'][n1].startswith('r2.'))
+ self.assertTrue(e.kwargs['responses'][n2].startswith('r2.'))
+ self.assertTrue(e.kwargs['responses'][n3].startswith('r2.'))
+ self.assertTrue(e.kwargs['responses'][n4].startswith('r1.'))
+
+ def test_nxdomain_canonical_name(self):
+ cname0 = "91.11.8-22.17.172.in-addr.arpa.none."
+ cname1 = "91.11.8-22.17.172.in-addr.arpa."
+ cname2 = "91-11-17-172.dynamic.example."
+ message0 = dns.message.from_text(dangling_cname_0_message_text)
+ message1 = dns.message.from_text(dangling_cname_1_message_text)
+ message2 = dns.message.from_text(dangling_cname_2_message_text)
+ qname0 = message0.question[0].name
+ qname1 = message1.question[0].name
+ qname2 = message2.question[0].name
+ responses = {qname0: message0, qname1: message1, qname2: message2}
+ eX = dns.resolver.NXDOMAIN()
+ e0 = dns.resolver.NXDOMAIN(qnames=[qname0], responses=responses)
+ e1 = dns.resolver.NXDOMAIN(qnames=[qname0, qname1, qname2], responses=responses)
+ e2 = dns.resolver.NXDOMAIN(qnames=[qname0, qname2, qname1], responses=responses)
+ self.assertRaises(TypeError, lambda : eX.canonical_name)
+ self.assertTrue(e0.canonical_name == qname0)
+ self.assertTrue(e1.canonical_name == dns.name.from_text(cname1))
+ self.assertTrue(e2.canonical_name == dns.name.from_text(cname2))
+
+
if __name__ == '__main__':
unittest.main()