n2 = dns.inet.inet_pton(af, a2[0])
return n1 == n2 and a1[1:] == a2[1:]
+def _destination_and_source(af, where, port, source, source_port):
+ # Apply defaults and compute destination and source tuples
+ # suitable for use in connect(), sendto(), or bind().
+ if af is None:
+ try:
+ af = dns.inet.af_for_address(where)
+ except:
+ af = dns.inet.AF_INET
+ if af == dns.inet.AF_INET:
+ destination = (where, port)
+ if source is not None or source_port != 0:
+ if source is None:
+ source = '0.0.0.0'
+ source = (source, source_port)
+ elif af == dns.inet.AF_INET6:
+ destination = (where, port, 0, 0)
+ if source is not None or source_port != 0:
+ if source is None:
+ source = '::'
+ source = (source, source_port, 0, 0)
+ return (af, destination, source)
+
def udp(q, where, timeout=None, port=53, af=None, source=None, source_port=0,
ignore_unexpected=False, one_rr_per_rrset=False):
"""Return the response obtained after sending a query via UDP.
If the inference attempt fails, AF_INET is used.
@type af: int
@rtype: dns.message.Message object
- @param source: source address. The default is the IPv4 wildcard address.
+ @param source: source address. The default is the wildcard address.
@type source: string
@param source_port: The port from which to send the message.
The default is 0.
"""
wire = q.to_wire()
- if af is None:
- try:
- af = dns.inet.af_for_address(where)
- except:
- af = dns.inet.AF_INET
- if af == dns.inet.AF_INET:
- destination = (where, port)
- if source is not None:
- source = (source, source_port)
- elif af == dns.inet.AF_INET6:
- destination = (where, port, 0, 0)
- if source is not None:
- source = (source, source_port, 0, 0)
+ (af, destination, source) = _destination_and_source(af, where, port, source,
+ source_port)
s = socket.socket(af, socket.SOCK_DGRAM, 0)
try:
expiration = _compute_expiration(timeout)
If the inference attempt fails, AF_INET is used.
@type af: int
@rtype: dns.message.Message object
- @param source: source address. The default is the IPv4 wildcard address.
+ @param source: source address. The default is the wildcard address.
@type source: string
@param source_port: The port from which to send the message.
The default is 0.
"""
wire = q.to_wire()
- if af is None:
- try:
- af = dns.inet.af_for_address(where)
- except:
- af = dns.inet.AF_INET
- if af == dns.inet.AF_INET:
- destination = (where, port)
- if source is not None:
- source = (source, source_port)
- elif af == dns.inet.AF_INET6:
- destination = (where, port, 0, 0)
- if source is not None:
- source = (source, source_port, 0, 0)
+ (af, destination, source) = _destination_and_source(af, where, port, source,
+ source_port)
s = socket.socket(af, socket.SOCK_STREAM, 0)
try:
expiration = _compute_expiration(timeout)
take.
@type lifetime: float
@rtype: generator of dns.message.Message objects.
- @param source: source address. The default is the IPv4 wildcard address.
+ @param source: source address. The default is the wildcard address.
@type source: string
@param source_port: The port from which to send the message.
The default is 0.
if not keyring is None:
q.use_tsig(keyring, keyname, algorithm=keyalgorithm)
wire = q.to_wire()
- if af is None:
- try:
- af = dns.inet.af_for_address(where)
- except:
- af = dns.inet.AF_INET
- if af == dns.inet.AF_INET:
- destination = (where, port)
- if source is not None:
- source = (source, source_port)
- elif af == dns.inet.AF_INET6:
- destination = (where, port, 0, 0)
- if source is not None:
- source = (source, source_port, 0, 0)
+ (af, destination, source) = _destination_and_source(af, where, port, source,
+ source_port)
if use_udp:
if rdtype != dns.rdatatype.IXFR:
raise ValueError('cannot do a UDP AXFR')
return min(self.lifetime - duration, self.timeout)
def query(self, qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
- tcp=False, source=None, raise_on_no_answer=True):
+ tcp=False, source=None, raise_on_no_answer=True, source_port=0):
"""Query nameservers to find the answer to the question.
The I{qname}, I{rdtype}, and I{rdclass} parameters may be objects
@param raise_on_no_answer: raise NoAnswer if there's no answer
(defaults is True).
@type raise_on_no_answer: bool
+ @param source_port: The port from which to send the message.
+ The default is 0.
+ @type source_port: int
@rtype: dns.resolver.Answer instance
@raises Timeout: no answers could be found in the specified lifetime
@raises NXDOMAIN: the query name does not exist
if tcp:
response = dns.query.tcp(request, nameserver,
timeout, self.port,
- source=source)
+ source=source,
+ source_port=source_port)
else:
response = dns.query.udp(request, nameserver,
timeout, self.port,
- source=source)
+ source=source,
+ source_port=source_port)
if response.flags & dns.flags.TC:
# Response truncated; retry with TCP.
timeout = self._compute_timeout(start)
response = dns.query.tcp(request, nameserver,
- timeout, self.port,
- source=source)
+ timeout, self.port,
+ source=source,
+ source_port=source_port)
except (socket.error, dns.exception.Timeout):
#
return default_resolver
def query(qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
- tcp=False, source=None, raise_on_no_answer=True):
+ tcp=False, source=None, raise_on_no_answer=True,
+ source_port=0):
"""Query nameservers to find the answer to the question.
This is a convenience function that uses the default resolver
@see: L{dns.resolver.Resolver.query} for more information on the
parameters."""
return get_default_resolver().query(qname, rdtype, rdclass, tcp, source,
- raise_on_no_answer)
+ raise_on_no_answer, source_port)
def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None):
"""Find the name of the zone which contains the specified name.