# This shouldn't happen, but we check to make code analysis software
# happier.
raise ValueError("destination required for stream sockets")
- (r, w) = await _maybe_wait_for(
+ r, w = await _maybe_wait_for(
asyncio.open_connection(
destination[0],
destination[1],
wire = b""
while True:
- (wire, from_address) = await sock.recvfrom(65535, _timeout(expiration))
+ wire, from_address = await sock.recvfrom(65535, _timeout(expiration))
if not _matches_destination(
sock.family, from_address, destination, ignore_unexpected
):
parameters, exceptions, and return type of this method.
"""
wire = q.to_wire()
- (begin_time, expiration) = _compute_times(timeout)
+ begin_time, expiration = _compute_times(timeout)
af = dns.inet.af_for_address(where)
destination = _lltuple((where, port), af)
if sock:
cm = await backend.make_socket(af, socket.SOCK_DGRAM, 0, stuple, dtuple)
async with cm as s:
await send_udp(s, wire, destination, expiration) # pyright: ignore
- (r, received_time, _) = await receive_udp(
+ r, received_time, _ = await receive_udp(
s, # pyright: ignore
destination,
expiration,
"""
wire = q.to_wire()
- (begin_time, expiration) = _compute_times(timeout)
+ begin_time, expiration = _compute_times(timeout)
if sock:
# Verify that the socket is connected, as if it's not connected,
# it's not writable, and the polling in send_tcp() will time out or
)
async with cm as s:
await send_tcp(s, wire, expiration) # pyright: ignore
- (r, received_time) = await receive_tcp(
+ r, received_time = await receive_tcp(
s, # pyright: ignore
expiration,
one_rr_per_rrset,
See :py:func:`dns.query.tls()` for the documentation of the other
parameters, exceptions, and return type of this method.
"""
- (begin_time, expiration) = _compute_times(timeout)
+ begin_time, expiration = _compute_times(timeout)
if sock:
cm: contextlib.AbstractAsyncContextManager = NullContext(sock)
else:
cfactory = dns.quic.null_factory # type: ignore
mfactory = dns.quic.null_factory # type: ignore
else:
- (cfactory, mfactory) = dns.quic.factories_for_backend(backend) # type: ignore
+ cfactory, mfactory = dns.quic.factories_for_backend(backend) # type: ignore
async with cfactory() as context:
async with mfactory(
the_connection = the_manager.connect( # pyright: ignore
where, port, source, source_port
)
- (start, expiration) = _compute_times(timeout)
+ start, expiration = _compute_times(timeout)
stream = await the_connection.make_stream(timeout) # type: ignore
async with stream:
# note that send_h3() does not need await
mfactory = dns.quic.null_factory # type: ignore
the_connection = connection
else:
- (cfactory, mfactory) = dns.quic.factories_for_backend(backend) # type: ignore
+ cfactory, mfactory = dns.quic.factories_for_backend(backend) # type: ignore
async with cfactory() as context:
async with mfactory(
the_connection = the_manager.connect( # pyright: ignore
where, port, source, source_port
)
- (start, expiration) = _compute_times(timeout)
+ start, expiration = _compute_times(timeout)
stream = await the_connection.make_stream(timeout) # type: ignore
async with stream:
await stream.send(wire, True)
tsig_ctx = None
r: dns.message.Message | None = None
while not done:
- (_, mexpiration) = _compute_times(timeout)
+ _, mexpiration = _compute_times(timeout)
if mexpiration is None or (
expiration is not None and mexpiration > expiration
):
mexpiration = expiration
if is_udp:
timeout = _timeout(mexpiration)
- (rwire, _) = await udp_sock.recvfrom(65535, timeout) # pyright: ignore
+ rwire, _ = await udp_sock.recvfrom(65535, timeout) # pyright: ignore
else:
ldata = await _read_exactly(tcp_sock, 2, mexpiration) # pyright: ignore
(l,) = struct.unpack("!H", ldata)
the other parameters, exceptions, and return type of this method.
"""
if query is None:
- (query, serial) = dns.xfr.make_query(txn_manager)
+ query, serial = dns.xfr.make_query(txn_manager)
else:
serial = dns.xfr.extract_serial_from_query(query)
af = dns.inet.af_for_address(where)
dtuple = (where, port)
if not backend:
backend = dns.asyncbackend.get_default_backend()
- (_, expiration) = _compute_times(lifetime)
+ _, expiration = _compute_times(lifetime)
if query.question[0].rdtype == dns.rdatatype.IXFR and udp_mode != UDPMode.NEVER:
s = await backend.make_socket(
af, socket.SOCK_DGRAM, 0, stuple, dtuple, _timeout(expiration)
backend = dns.asyncbackend.get_default_backend()
start = time.time()
while True:
- (request, answer) = resolution.next_request()
+ request, answer = resolution.next_request()
# Note we need to say "if answer is not None" and not just
# "if answer" because answer implements __len__, and python
# will call that. We want to return if we have an answer
assert request is not None # needed for type checking
done = False
while not done:
- (nameserver, tcp, backoff) = resolution.next_nameserver()
+ nameserver, tcp, backoff = resolution.next_nameserver()
if backoff:
await backend.sleep(backoff)
timeout = self._compute_timeout(start, lifetime, resolution.errors)
backend=backend,
)
except Exception as ex:
- (_, done) = resolution.query_result(None, ex)
+ _, done = resolution.query_result(None, ex)
continue
- (answer, done) = resolution.query_result(response, None)
+ answer, done = resolution.query_result(response, None)
# Note we need to say "if answer is not None" and not just
# "if answer" because answer implements __len__, and python
# will call that. We want to return if we have an answer
def _maybe_cow_with_name(
self, name: dns.name.Name
) -> tuple[dns.node.Node, dns.name.Name]:
- (node, name) = super()._maybe_cow_with_name(name)
+ node, name = super()._maybe_cow_with_name(name)
node = cast(Node, node)
if self._is_origin(name):
node.flags |= NodeFlags.ORIGIN
def put_rdataset(
self, name: dns.name.Name, rdataset: dns.rdataset.Rdataset
) -> None:
- (node, name) = self._maybe_cow_with_name(name)
+ node, name = self._maybe_cow_with_name(name)
if (
rdataset.rdtype == dns.rdatatype.NS
and not node.is_origin_or_glue() # type: ignore
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType,
) -> None:
- (node, name) = self._maybe_cow_with_name(name)
+ node, name = self._maybe_cow_with_name(name)
if rdtype == dns.rdatatype.NS and name in self.delegations: # pyright: ignore
node.flags &= ~NodeFlags.DELEGATION # type: ignore
self.delegations.discard(name) # pyright: ignore
return (addrpart, port, 0, socket.if_nametoindex(scope))
except AttributeError: # pragma: no cover (we can't really test this)
ai_flags = socket.AI_NUMERICHOST
- ((*_, tup), *_) = socket.getaddrinfo(address, port, flags=ai_flags)
+ (*_, tup), *_ = socket.getaddrinfo(address, port, flags=ai_flags)
return tup
else:
raise NotImplementedError(f"unknown address family {af}")
r.write_header()
if self.tsig is not None:
if self.want_tsig_sign:
- (new_tsig, ctx) = dns.tsig.sign(
+ new_tsig, ctx = dns.tsig.sign(
r.get_wire(),
self.keyring,
self.tsig[0],
*rcode*, a ``dns.rcode.Rcode``, is the rcode to set.
"""
- (value, evalue) = dns.rcode.to_flags(rcode)
+ value, evalue = dns.rcode.to_flags(rcode)
self.flags &= 0xFFF0
self.flags |= value
self.ednsflags &= 0x00FFFFFF
section = self.message.sections[section_number]
for _ in range(qcount):
qname = self.parser.get_name(self.message.origin)
- (rdtype, rdclass) = self.parser.get_struct("!HH")
- (rdclass, rdtype, _, _) = self.message._parse_rr_header(
+ rdtype, rdclass = self.parser.get_struct("!HH")
+ rdclass, rdtype, _, _ = self.message._parse_rr_header(
section_number, qname, rdclass, rdtype
)
self.message.find_rrset(
name = absolute_name.relativize(self.message.origin)
else:
name = absolute_name
- (rdtype, rdclass, ttl, rdlen) = self.parser.get_struct("!HHIH")
+ rdtype, rdclass, ttl, rdlen = self.parser.get_struct("!HHIH")
if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG):
(
rdclass,
section_number, count, i, name, rdclass, rdtype
)
else:
- (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header(
+ rdclass, rdtype, deleting, empty = self.message._parse_rr_header(
section_number, name, rdclass, rdtype
)
rdata_start = self.parser.current
if self.parser.remaining() < 12:
raise ShortHeader
- (id, flags, qcount, ancount, aucount, adcount) = self.parser.get_struct(
- "!HHHHHH"
- )
+ id, flags, qcount, ancount, aucount, adcount = self.parser.get_struct("!HHHHHH")
factory = _message_factory_from_opcode(dns.opcode.from_flags(flags))
self.message = factory(id=id)
self.message.flags = dns.flags.Flag(flags)
rdclass = dns.rdataclass.IN
# Type
rdtype = dns.rdatatype.from_text(token.value)
- (rdclass, rdtype, _, _) = self.message._parse_rr_header(
+ rdclass, rdtype, _, _ = self.message._parse_rr_header(
section_number, name, rdclass, rdtype
)
self.message.find_rrset(
rdclass = dns.rdataclass.IN
# Type
rdtype = dns.rdatatype.from_text(token.value)
- (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header(
+ rdclass, rdtype, deleting, empty = self.message._parse_rr_header(
section_number, name, rdclass, rdtype
)
token = self.tok.get()
Returns a ``bool``.
"""
- (nr, _, _) = self.fullcompare(other)
+ nr, _, _ = self.fullcompare(other)
if nr == NameRelation.SUBDOMAIN or nr == NameRelation.EQUAL:
return True
return False
Returns a ``bool``.
"""
- (nr, _, _) = self.fullcompare(other)
+ nr, _, _ = self.fullcompare(other)
if nr == NameRelation.SUPERDOMAIN or nr == NameRelation.EQUAL:
return True
return False
Returns a ``dns.message.Message``.
"""
- (af, _, the_source) = _destination_and_source(
- where, port, source, source_port, False
- )
+ af, _, the_source = _destination_and_source(where, port, source, source_port, False)
# we bind url and then override as pyright can't figure out all paths bind.
url = where
if af is not None and dns.inet.is_address(where):
the_connection = the_manager.connect( # pyright: ignore
where, port, source, source_port
)
- (start, expiration) = _compute_times(timeout)
+ start, expiration = _compute_times(timeout)
with the_connection.make_stream(timeout) as stream: # type: ignore
stream.send_h3(url, wire, post)
wire = stream.receive(_remaining(expiration))
wire = b""
while True:
- (wire, from_address) = _udp_recv(sock, 65535, expiration)
+ wire, from_address = _udp_recv(sock, 65535, expiration)
if not _matches_destination(
sock.family, from_address, destination, ignore_unexpected
):
"""
wire = q.to_wire()
- (af, destination, source) = _destination_and_source(
+ af, destination, source = _destination_and_source(
where, port, source, source_port, True
)
- (begin_time, expiration) = _compute_times(timeout)
+ begin_time, expiration = _compute_times(timeout)
if sock:
cm: contextlib.AbstractContextManager = contextlib.nullcontext(sock)
else:
cm = make_socket(af, socket.SOCK_DGRAM, source)
with cm as s:
send_udp(s, wire, destination, expiration)
- (r, received_time) = receive_udp(
+ r, received_time = receive_udp(
s,
destination,
expiration,
"""
wire = q.to_wire()
- (begin_time, expiration) = _compute_times(timeout)
+ begin_time, expiration = _compute_times(timeout)
if sock:
cm: contextlib.AbstractContextManager = contextlib.nullcontext(sock)
else:
- (af, destination, source) = _destination_and_source(
+ af, destination, source = _destination_and_source(
where, port, source, source_port, True
)
assert af is not None
# pylint: disable=possibly-used-before-assignment
_connect(s, destination, expiration) # pyright: ignore
send_tcp(s, wire, expiration)
- (r, received_time) = receive_tcp(
+ r, received_time = receive_tcp(
s, expiration, one_rr_per_rrset, q.keyring, q.mac, ignore_trailing
)
r.time = received_time - begin_time
)
wire = q.to_wire()
- (begin_time, expiration) = _compute_times(timeout)
- (af, destination, source) = _destination_and_source(
+ begin_time, expiration = _compute_times(timeout)
+ af, destination, source = _destination_and_source(
where, port, source, source_port, True
)
assert af is not None # where must be an address
_connect(s, destination, expiration)
_tls_handshake(s, expiration)
send_tcp(s, wire, expiration)
- (r, received_time) = receive_tcp(
+ r, received_time = receive_tcp(
s, expiration, one_rr_per_rrset, q.keyring, q.mac, ignore_trailing
)
r.time = received_time - begin_time
the_connection = the_manager.connect( # pyright: ignore
where, port, source, source_port
)
- (start, expiration) = _compute_times(timeout)
+ start, expiration = _compute_times(timeout)
with the_connection.make_stream(timeout) as stream: # type: ignore
stream.send(wire, True)
wire = stream.receive(_remaining(expiration))
tsig_ctx = None
r: dns.message.Message | None = None
while not done:
- (_, mexpiration) = _compute_times(timeout)
+ _, mexpiration = _compute_times(timeout)
if mexpiration is None or (
expiration is not None and mexpiration > expiration
):
mexpiration = expiration
if is_udp:
- (rwire, _) = _udp_recv(s, 65535, mexpiration)
+ rwire, _ = _udp_recv(s, 65535, mexpiration)
else:
ldata = _net_read(s, 2, mexpiration)
(l,) = struct.unpack("!H", ldata)
rrset.add(soa, 0)
if keyring is not None:
q.use_tsig(keyring, keyname, algorithm=keyalgorithm)
- (af, destination, source) = _destination_and_source(
+ af, destination, source = _destination_and_source(
where, port, source, source_port, True
)
assert af is not None
- (_, expiration) = _compute_times(lifetime)
+ _, expiration = _compute_times(lifetime)
tm = DummyTransactionManager(zone, relativize)
if use_udp and rdtype != dns.rdatatype.IXFR:
raise ValueError("cannot do a UDP AXFR")
Raises on errors.
"""
if query is None:
- (query, serial) = dns.xfr.make_query(txn_manager)
+ query, serial = dns.xfr.make_query(txn_manager)
else:
serial = dns.xfr.extract_serial_from_query(query)
- (af, destination, source) = _destination_and_source(
+ af, destination, source = _destination_and_source(
where, port, source, source_port, True
)
assert af is not None
- (_, expiration) = _compute_times(lifetime)
+ _, expiration = _compute_times(lifetime)
if query.question[0].rdtype == dns.rdatatype.IXFR and udp_mode != UDPMode.NEVER:
with make_socket(af, socket.SOCK_DGRAM, source) as s:
_connect(s, destination, expiration)
self._socket_created.set()
async with self._socket:
while not self._done:
- (datagram, address) = await self._socket.recvfrom(
+ datagram, address = await self._socket.recvfrom(
QUIC_MAX_DATAGRAM, None
)
if address[0] != self._peer[0] or address[1] != self._peer[1]:
assert address == self._peer
assert self._socket is not None
await self._socket.sendto(datagram, self._peer, None)
- (expiration, interval) = self._get_timer_values()
+ expiration, interval = self._get_timer_values()
try:
await asyncio.wait_for(self._wait_for_wake_timer(), interval)
except Exception:
def connect(
self, address, port=853, source=None, source_port=0, want_session_ticket=True
):
- (connection, start) = self._connect(
+ connection, start = self._connect(
address, port, source, source_port, want_session_ticket
)
if start:
self._socket.close()
raise
self._socket.connect(self._peer)
- (self._send_wakeup, self._receive_wakeup) = socket.socketpair()
+ self._send_wakeup, self._receive_wakeup = socket.socketpair()
self._receive_wakeup.setblocking(False)
self._socket.setblocking(False)
self._handshake_complete = threading.Event()
self._receive_wakeup, selectors.EVENT_READ, self._drain_wakeup
)
while not self._done:
- (expiration, interval) = self._get_timer_values(False)
+ expiration, interval = self._get_timer_values(False)
items = sel.select(interval)
for key, _ in items:
key.data()
want_token=True,
):
with self._lock:
- (connection, start) = self._connect(
+ connection, start = self._connect(
address, port, source, source_port, want_session_ticket, want_token
)
if start:
)
await self._socket.connect(self._peer)
while not self._done:
- (expiration, interval) = self._get_timer_values(False)
+ expiration, interval = self._get_timer_values(False)
if self._send_pending:
# Do not block forever if sends are pending. Even though we
# have a wake-up mechanism if we've already started the blocking
def connect(
self, address, port=853, source=None, source_port=0, want_session_ticket=True
):
- (connection, start) = self._connect(
+ connection, start = self._connect(
address, port, source, source_port, want_session_ticket
)
if start:
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- (precedence, relay_type) = parser.get_struct("!BB")
+ precedence, relay_type = parser.get_struct("!BB")
discovery_optional = bool(relay_type >> 7)
relay_type &= 0x7F
relay = Relay.from_wire_parser(relay_type, parser, origin)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- (certificate_type, key_tag, algorithm) = parser.get_struct("!HHB")
+ certificate_type, key_tag, algorithm = parser.get_struct("!HHB")
certificate = parser.get_remaining()
return cls(rdclass, rdtype, certificate_type, key_tag, algorithm, certificate)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- (serial, flags) = parser.get_struct("!IH")
+ serial, flags = parser.get_struct("!IH")
bitmap = Bitmap.from_wire_parser(parser)
return cls(rdclass, rdtype, serial, flags, bitmap)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- (rrtype, scheme, port) = parser.get_struct("!HBH")
+ rrtype, scheme, port = parser.get_struct("!HBH")
target = parser.get_name(origin)
return cls(rdclass, rdtype, rrtype, scheme, port, target)
if what.isdigit():
return
try:
- (left, right) = what.split(b".")
+ left, right = what.split(b".")
except ValueError:
raise dns.exception.FormError
if left == b"" and right == b"":
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- (lh, algorithm, lk) = parser.get_struct("!BBH")
+ lh, algorithm, lk = parser.get_struct("!BBH")
hit = parser.get_bytes(lh)
key = parser.get_bytes(lk)
servers = []
import dns.rdtypes.dnskeybase # lgtm[py/import-and-import-from]
-
class Protocol(dns.enum.IntEnum):
NONE = 0
TLS = 1
latitude[1] = int(t)
t = tok.get_string()
if "." in t:
- (seconds, milliseconds) = t.split(".")
+ seconds, milliseconds = t.split(".")
if not seconds.isdigit():
raise dns.exception.SyntaxError("bad latitude seconds value")
latitude[2] = int(seconds)
longitude[1] = int(t)
t = tok.get_string()
if "." in t:
- (seconds, milliseconds) = t.split(".")
+ seconds, milliseconds = t.split(".")
if not seconds.isdigit():
raise dns.exception.SyntaxError("bad longitude seconds value")
longitude[2] = int(seconds)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- (algorithm, flags, iterations) = parser.get_struct("!BBH")
+ algorithm, flags, iterations = parser.get_struct("!BBH")
salt = parser.get_counted_bytes()
next = parser.get_counted_bytes()
bitmap = Bitmap.from_wire_parser(parser)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- (algorithm, flags, iterations) = parser.get_struct("!BBH")
+ algorithm, flags, iterations = parser.get_struct("!BBH")
salt = parser.get_counted_bytes()
return cls(rdclass, rdtype, algorithm, flags, iterations, salt)
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
options = []
while parser.remaining() > 0:
- (otype, olen) = parser.get_struct("!HH")
+ otype, olen = parser.get_struct("!HH")
with parser.restrict_to(olen):
opt = dns.edns.option_from_wire_parser(otype, parser)
options.append(opt)
time_signed = parser.get_uint48()
fudge = parser.get_uint16()
mac = parser.get_counted_bytes(2)
- (original_id, error) = parser.get_struct("!HH")
+ original_id, error = parser.get_struct("!HH")
other = parser.get_counted_bytes(2)
return cls(
rdclass,
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- (priority, weight) = parser.get_struct("!HH")
+ priority, weight = parser.get_struct("!HH")
target = parser.get_remaining()
if len(target) == 0:
raise dns.exception.FormError("URI target may not be empty")
item = item[1:]
else:
negation = False
- (family, rest) = item.split(":", 1)
+ family, rest = item.split(":", 1)
family = int(family)
- (address, prefix) = rest.split("/", 1)
+ address, prefix = rest.split("/", 1)
prefix = int(prefix)
item = APLItem(family, negation, address, prefix)
items.append(item)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- (order, preference) = parser.get_struct("!HH")
+ order, preference = parser.get_struct("!HH")
strings = []
for _ in range(3):
s = parser.get_counted_bytes()
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
- (priority, weight, port) = parser.get_struct("!HHH")
+ priority, weight, port = parser.get_struct("!HHH")
target = parser.get_name(origin)
return cls(rdclass, rdtype, priority, weight, port, target)
def _validate_and_define(params, key, value):
- (key, force_generic) = _validate_key(_unescape(key))
+ key, force_generic = _validate_key(_unescape(key))
if key in params:
raise SyntaxError(f'duplicate key "{key:d}"')
cls = _class_for_key.get(key, GenericParam)
tsig = _make_tsig( # pyright: ignore
keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
)
- (tsig, _) = dns.tsig.sign(s, key, tsig[0], int(time.time()), request_mac)
+ tsig, _ = dns.tsig.sign(s, key, tsig[0], int(time.time()), request_mac)
self._write_tsig(tsig, keyname)
def add_multi_tsig(
tsig = _make_tsig( # pyright: ignore
keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
)
- (tsig, ctx) = dns.tsig.sign(
+ tsig, ctx = dns.tsig.sign(
s, key, tsig[0], int(time.time()), request_mac, ctx, True
)
self._write_tsig(tsig, keyname)
)
start = time.time()
while True:
- (request, answer) = resolution.next_request()
+ request, answer = resolution.next_request()
# Note we need to say "if answer is not None" and not just
# "if answer" because answer implements __len__, and python
# will call that. We want to return if we have an answer
assert request is not None # needed for type checking
done = False
while not done:
- (nameserver, tcp, backoff) = resolution.next_nameserver()
+ nameserver, tcp, backoff = resolution.next_nameserver()
if backoff:
time.sleep(backoff)
timeout = self._compute_timeout(start, lifetime, resolution.errors)
max_size=tcp,
)
except Exception as ex:
- (_, done) = resolution.query_result(None, ex)
+ _, done = resolution.query_result(None, ex)
continue
- (answer, done) = resolution.query_result(response, None)
+ answer, done = resolution.query_result(response, None)
# Note we need to say "if answer is not None" and not just
# "if answer" because answer implements __len__, and python
# will call that. We want to return if we have an answer
if response:
for rrs in response.authority:
if rrs.rdtype == dns.rdatatype.SOA and rrs.rdclass == rdclass:
- (nr, _, _) = rrs.name.fullcompare(name)
+ nr, _, _ = rrs.name.fullcompare(name)
if nr == dns.name.NAMERELN_SUPERDOMAIN:
# We're doing a proper superdomain check as
# if the name were equal we ought to have gotten
if name is None:
name = socket.gethostname()
try:
- (name, _, _) = _gethostbyaddr(name)
+ name, _, _ = _gethostbyaddr(name)
# Python's version checks aliases too, but our gethostbyname
# ignores them, so we do so here as well.
except Exception: # pragma: no cover
raise socket.gaierror(socket.EAI_NONAME, "Name or service not known")
sockaddr = (ip, 80)
family = socket.AF_INET
- (name, _) = _getnameinfo(sockaddr, socket.NI_NAMEREQD)
+ name, _ = _getnameinfo(sockaddr, socket.NI_NAMEREQD)
aliases = []
addresses = []
tuples = _getaddrinfo(
"""
return super().to_wire(
- self.name, file, compress, origin, self.deleting, **kw # type:ignore
+ self.name, file, compress, origin, self.deleting, **kw # type: ignore
)
# pylint: enable=arguments-differ
def pop(self):
"""Remove an arbitrary item from the set."""
- (k, _) = self.items.popitem()
+ k, _ = self.items.popitem()
return k
def _clone(self) -> "Set":
def from_wire_origin(self) -> dns.name.Name | None:
"""Origin to use in from_wire() calls."""
- (absolute_origin, relativize, _) = self.origin_information()
+ absolute_origin, relativize, _ = self.origin_information()
if relativize:
return absolute_origin
else:
if rdataset.rdclass != self.manager.get_class():
raise ValueError(f"{method} has objects of wrong RdataClass")
if rdataset.rdtype == dns.rdatatype.SOA:
- (_, _, origin) = self._origin_information()
+ _, _, origin = self._origin_information()
if name != origin:
raise ValueError(f"{method} has non-origin SOA")
self._raise_if_not_empty(method, args)
if isinstance(value, str):
keyring[kname] = dns.tsig.Key(kname, value).secret
else:
- (algorithm, secret) = value
+ algorithm, secret = value
keyring[kname] = dns.tsig.Key(kname, secret, algorithm)
return keyring
try:
# The PnpInstanceID points to a key inside Enum
- (pnp_id, ttype) = winreg.QueryValueEx(
- connection_key, "PnpInstanceID"
- )
+ pnp_id, ttype = winreg.QueryValueEx(connection_key, "PnpInstanceID")
if ttype != winreg.REG_SZ:
raise ValueError # pragma: no cover
try:
# Get ConfigFlags for this device
- (flags, ttype) = winreg.QueryValueEx(device_key, "ConfigFlags")
+ flags, ttype = winreg.QueryValueEx(device_key, "ConfigFlags")
if ttype != winreg.REG_DWORD:
raise ValueError # pragma: no cover
raise ValueError("rdtype is not IXFR or AXFR")
self.serial = serial
self.is_udp = is_udp
- (_, _, origin) = txn_manager.origin_information()
+ _, _, origin = txn_manager.origin_information()
if origin is None:
raise ValueError("transaction manager must supply an origin for XFRs")
self.origin = origin
Returns a `(query, serial)` tuple.
"""
- (zone_origin, _, origin) = txn_manager.origin_information()
+ zone_origin, _, origin = txn_manager.origin_information()
if zone_origin is None:
raise ValueError("no zone origin")
if serial is None:
self,
) -> tuple[dns.name.Name | None, bool, dns.name.Name | None]:
assert self.version is not None
- (absolute, relativize, effective) = self.manager.origin_information()
+ absolute, relativize, effective = self.manager.origin_information()
if absolute is None and self.version.origin is not None:
# No origin has been committed yet, but we've learned one as part of
# this txn. Use it.
default_ttl: int | None = None,
):
self.tok = tok
- (self.zone_origin, self.relativize, _) = txn.manager.origin_information()
+ self.zone_origin, self.relativize, _ = txn.manager.origin_information()
self.current_origin = self.zone_origin
self.last_ttl = 0
self.last_ttl_known = False
# correct, but it is correct almost all of the time.
# We convert them to syntax errors so that we can emit
# helpful filename:line info.
- (ty, va) = sys.exc_info()[:2]
+ ty, va = sys.exc_info()[:2]
raise dns.exception.SyntaxError(f"caught exception {str(ty)}: {str(va)}")
if not self.default_ttl_known and rdtype == dns.rdatatype.SOA:
# correct, but it is correct almost all of the time.
# We convert them to syntax errors so that we can emit
# helpful filename:line info.
- (ty, va) = sys.exc_info()[:2]
+ ty, va = sys.exc_info()[:2]
raise dns.exception.SyntaxError(
f"caught exception {str(ty)}: {str(va)}"
)
self.tok.unget(token)
self._rr_line()
except dns.exception.SyntaxError as detail:
- (filename, line_number) = self.tok.where()
+ filename, line_number = self.tok.where()
if detail is None:
detail = "syntax error"
ex = dns.exception.SyntaxError(f"{filename}:{line_number}: {detail}")
import dns.tsigkeyring
import dns.update
-
KEYRING = dns.tsigkeyring.from_text({"keyname.": "NjHwPsMKjdN++dOfE5iAiQ=="})
TEST_ZONES = {
msg = dns.message.from_wire(data, keyring=KEYRING)
try:
if msg.opcode() != dns.opcode.UPDATE:
- raise NotImplementedError("Opcode %s not implemented" % dns.opcode.to_text(msg.opcode()))
+ raise NotImplementedError(
+ "Opcode %s not implemented" % dns.opcode.to_text(msg.opcode())
+ )
update_msg = typing.cast(dns.update.UpdateMessage, msg)
zone = update_msg.zone[0].name
if not msg.had_tsig or msg.keyname not in TEST_ZONES[zone]:
- raise dns.exception.DeniedByPolicy(f"Key {msg.keyname} not allowed for zone {zone}")
+ raise dns.exception.DeniedByPolicy(
+ f"Key {msg.keyname} not allowed for zone {zone}"
+ )
for r in update_msg.update:
if r.deleting:
if r.deleting == dns.rdataclass.ANY and r.rdtype == dns.rdatatype.ANY:
result = await handle_nsupdate(data, addr)
self.transport.sendto(result, addr)
- transport, _protocol = await loop.create_datagram_endpoint(lambda: DatagramProtocol(), local_addr=(hostname, port))
+ transport, _protocol = await loop.create_datagram_endpoint(
+ lambda: DatagramProtocol(), local_addr=(hostname, port)
+ )
# Start TCP server
class StreamReaderProtocol(asyncio.StreamReaderProtocol):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((address, port))
while True:
- (wire, address) = s.recvfrom(512)
+ wire, address = s.recvfrom(512)
notify = dns.message.from_wire(wire)
try:
"""
Minimalistic RFC 1996-like NOTIFY sender.
"""
+
import argparse
import ipaddress
import socket
[project.optional-dependencies]
dev = [
- "black>=25.12.0",
+ "black>=26.1",
"coverage>=7.0",
"hypercorn>=0.18.0",
"pyright>=1.1.407",
async with trio.open_nursery() as nursery:
while not self.done:
now = time.time()
- (expiration, interval) = self.get_timer_values(now)
+ expiration, interval = self.get_timer_values(now)
# Note it must be trio.current_time() and not now due to how
# trio time works!
if self.send_pending:
with trio.CancelScope(
deadline=trio.current_time() + interval
) as self.worker_scope:
- (datagram, peer) = await self.receive_channel.receive()
+ datagram, peer = await self.receive_channel.receive()
self.quic_connection.receive_datagram(datagram, peer, now)
self.worker_scope = None
now = time.time()
data = None
peer = None
try:
- (data, peer) = await self.socket.recvfrom(65535)
+ data, peer = await self.socket.recvfrom(65535)
except Exception:
continue
buffer = aioquic.buffer.Buffer(data=data)
continue
else:
try:
- (cid, retry_cid) = self.retry.validate_token(
+ cid, retry_cid = self.retry.validate_token(
peer, header.token
)
# We need to recheck the cid here in case of duplicates,
udp_socket.close()
def __enter__(self):
- (self.left, self.right) = socket.socketpair()
+ self.left, self.right = socket.socketpair()
# We're making the sockets now so they can be sent to by the
# caller immediately (i.e. no race with the listener starting
# in the thread).
local = self.addresses[connection_type]
while True:
try:
- (wire, peer) = await sock.recvfrom(65535)
+ wire, peer = await sock.recvfrom(65535)
for wire in self.handle_wire(wire, peer, local, connection_type):
await sock.sendto(wire, peer)
except Exception as e:
q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
return await dns.asyncquery.udp_with_fallback(q, address, timeout=4)
- (_, tcp) = self.async_run(run)
+ _, tcp = self.async_run(run)
self.assertTrue(tcp)
@tests.util.retry_on_timeout
q = dns.message.make_query(qname, dns.rdatatype.A)
return await dns.asyncquery.udp_with_fallback(q, address, timeout=2)
- (_, tcp) = self.async_run(run)
+ _, tcp = self.async_run(run)
self.assertFalse(tcp)
@tests.util.retry_on_timeout
q = dns.message.make_query("dns.google", dns.rdatatype.A)
await dns.asyncquery.send_udp(sender, q, listener_address)
expiration = time.time() + 2
- (_, _, recv_address) = await dns.asyncquery.receive_udp(
+ _, _, recv_address = await dns.asyncquery.receive_udp(
listener, expiration=expiration
)
return (sender_address, recv_address)
- (sender_address, recv_address) = self.async_run(run)
+ sender_address, recv_address = self.async_run(run)
self.assertEqual(sender_address, recv_address)
def testUDPReceiveTimeout(self):
if good_r is None:
good_r = self.good_r
s = MockSock(wire1, from1, wire2, from2)
- (r, when, _) = await dns.asyncquery.receive_udp(
+ r, when, _ = await dns.asyncquery.receive_udp(
s,
("127.0.0.1", 53),
time.time() + 2,
bad_r_wire = bad_r.to_wire()
async def abad():
- (r, wire) = await self.mock_receive(
+ r, wire = await self.mock_receive(
bad_r_wire,
("127.0.0.1", 53),
self.good_r_wire,
import dns.rrset
import dns.zone
-
example_text = """$TTL 1h
$ORIGIN 0.0.192.IN-ADDR.ARPA.
$GENERATE 1-2 0 CNAME SERVER$.EXAMPLE.
r.resolve_chaining()
def test_resolve_chaining_no_infinite_loop(self):
- r = dns.message.from_text(
- """id 1
+ r = dns.message.from_text("""id 1
flags QR
;QUESTION
www.example. IN CNAME
;AUTHORITY
example. 300 IN SOA . . 1 2 3 4 5
-"""
- )
+""")
# passing is not going into an infinite loop in this call
result = r.resolve_chaining()
self.assertEqual(result.canonical_name, dns.name.from_text("www.example."))
def test_bad_text_questions(self):
with self.assertRaises(dns.exception.SyntaxError):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
;QUESTION
example.
-"""
- )
+""")
with self.assertRaises(dns.exception.SyntaxError):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
;QUESTION
example. IN
-"""
- )
+""")
with self.assertRaises(dns.rdatatype.UnknownRdatatype):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
;QUESTION
example. INA
-"""
- )
+""")
with self.assertRaises(dns.rdatatype.UnknownRdatatype):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
;QUESTION
example. IN BOGUS
-"""
- )
+""")
def test_bad_text_rrs(self):
with self.assertRaises(dns.exception.SyntaxError):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
;QUESTION
example. IN A
;ANSWER
example.
-"""
- )
+""")
with self.assertRaises(dns.exception.SyntaxError):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
;QUESTION
example. IN A
;ANSWER
example. IN
-"""
- )
+""")
with self.assertRaises(dns.exception.SyntaxError):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
;QUESTION
example. IN A
;ANSWER
example. 300
-"""
- )
+""")
with self.assertRaises(dns.rdatatype.UnknownRdatatype):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
;QUESTION
example. IN A
;ANSWER
example. 30a IN A
-"""
- )
+""")
with self.assertRaises(dns.rdatatype.UnknownRdatatype):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
;QUESTION
example. IN A
;ANSWER
example. 300 INA A
-"""
- )
+""")
with self.assertRaises(dns.exception.UnexpectedEnd):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
;QUESTION
example. IN A
;ANSWER
example. 300 IN A
-"""
- )
+""")
with self.assertRaises(dns.exception.SyntaxError):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
opcode UPDATE
;ZONE
example. IN SOA
;UPDATE
example. 300 IN A
-"""
- )
+""")
with self.assertRaises(dns.exception.SyntaxError):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
opcode UPDATE
;ZONE
example. IN SOA
;UPDATE
example. 300 NONE A
-"""
- )
+""")
with self.assertRaises(dns.exception.SyntaxError):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
opcode UPDATE
;ZONE
example. IN SOA
;PREREQ
example. 300 NONE A 10.0.0.1
-"""
- )
+""")
with self.assertRaises(dns.exception.SyntaxError):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
;ANSWER
300 IN A 10.0.0.1
-"""
- )
+""")
with self.assertRaises(dns.exception.SyntaxError):
- dns.message.from_text(
- """id 1
+ dns.message.from_text("""id 1
flags QR
;QUESTION
IN SOA
-"""
- )
+""")
def test_from_wire_makes_Flag(self):
m = dns.message.from_wire(goodwire)
self.assertEqual(m.flags, dns.flags.Flag.RD)
def test_continue_on_error(self):
- good_message = dns.message.from_text(
- """id 1234
+ good_message = dns.message.from_text("""id 1234
opcode QUERY
rcode NOERROR
flags QR AA RD
www.dnspython.org. 300 IN SOA . . 1 2 3 4 4294967295
www.dnspython.org. 300 IN A 1.2.3.4
www.dnspython.org. 300 IN AAAA ::1
-"""
- )
+""")
wire = good_message.to_wire()
# change ANCOUNT to 255
bad_wire = wire[:6] + b"\x00\xff" + wire[8:]
print(m.errors)
self.assertEqual(str(m.errors[0].exception), "IPv6 addresses are 16 bytes long")
self.assertEqual(str(m.errors[1].exception), "DNS message is malformed.")
- expected_message = dns.message.from_text(
- """id 1234
+ expected_message = dns.message.from_text("""id 1234
opcode QUERY
rcode NOERROR
flags QR AA RD
;ANSWER
www.dnspython.org. 300 IN SOA . . 1 2 3 4 4294967295
www.dnspython.org. 300 IN A 1.2.3.4
-"""
- )
+""")
self.assertEqual(m, expected_message)
def test_padding_basic(self):
def testSplit1(self):
n = dns.name.from_text("foo.bar.")
- (prefix, suffix) = n.split(2)
+ prefix, suffix = n.split(2)
ep = dns.name.from_text("foo", None)
es = dns.name.from_text("bar.", None)
self.assertEqual(prefix, ep)
def testSplit2(self):
n = dns.name.from_text("foo.bar.")
- (prefix, suffix) = n.split(1)
+ prefix, suffix = n.split(1)
ep = dns.name.from_text("foo.bar", None)
es = dns.name.from_text(".", None)
self.assertEqual(prefix, ep)
def testSplit3(self):
n = dns.name.from_text("foo.bar.")
- (prefix, suffix) = n.split(0)
+ prefix, suffix = n.split(0)
ep = dns.name.from_text("foo.bar.", None)
es = dns.name.from_text("", None)
self.assertEqual(prefix, ep)
def testSplit4(self):
n = dns.name.from_text("foo.bar.")
- (prefix, suffix) = n.split(3)
+ prefix, suffix = n.split(3)
ep = dns.name.from_text("", None)
es = dns.name.from_text("foo.bar.", None)
self.assertEqual(prefix, ep)
def testFromWire1(self):
w = b"\x03foo\x00\xc0\x00"
- (n1, cused1) = dns.name.from_wire(w, 0)
- (n2, cused2) = dns.name.from_wire(w, cused1)
+ n1, cused1 = dns.name.from_wire(w, 0)
+ n2, cused2 = dns.name.from_wire(w, cused1)
en1 = dns.name.from_text("foo.")
en2 = en1
ecused1 = 5
def testFromWire2(self):
w = b"\x03foo\x00\x01a\xc0\x00\x01b\xc0\x05"
current = 0
- (n1, cused1) = dns.name.from_wire(w, current)
+ n1, cused1 = dns.name.from_wire(w, current)
current += cused1
- (n2, cused2) = dns.name.from_wire(w, current)
+ n2, cused2 = dns.name.from_wire(w, current)
current += cused2
- (n3, cused3) = dns.name.from_wire(w, current)
+ n3, cused3 = dns.name.from_wire(w, current)
en1 = dns.name.from_text("foo.")
en2 = dns.name.from_text("a.foo.")
en3 = dns.name.from_text("b.a.foo.")
for address in query_addresses:
qname = dns.name.from_text(".")
q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
- (_, tcp) = dns.query.udp_with_fallback(q, address, timeout=4)
+ _, tcp = dns.query.udp_with_fallback(q, address, timeout=4)
self.assertTrue(tcp)
@tests.util.retry_on_timeout
tcp_s.setblocking(0)
qname = dns.name.from_text(".")
q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
- (_, tcp) = dns.query.udp_with_fallback(
+ _, tcp = dns.query.udp_with_fallback(
q, address, udp_sock=udp_s, tcp_sock=tcp_s, timeout=4
)
self.assertTrue(tcp)
for address in query_addresses:
qname = dns.name.from_text("dns.google.")
q = dns.message.make_query(qname, dns.rdatatype.A)
- (_, tcp) = dns.query.udp_with_fallback(q, address, timeout=2)
+ _, tcp = dns.query.udp_with_fallback(q, address, timeout=2)
self.assertFalse(tcp)
@tests.util.retry_on_timeout
q = dns.message.make_query("dns.google", dns.rdatatype.A)
dns.query.send_udp(sender, q, listener.getsockname())
expiration = time.time() + 2
- (q, _, addr) = dns.query.receive_udp(listener, expiration=expiration)
+ q, _, addr = dns.query.receive_udp(listener, expiration=expiration)
self.assertEqual(addr, sender.getsockname())
class DestinationAndSourceTests(unittest.TestCase):
def test_af_inferred_from_where(self):
- (af, d, s) = _d_and_s("1.2.3.4", 53, None, 0)
+ af, d, s = _d_and_s("1.2.3.4", 53, None, 0)
self.assertEqual(af, socket.AF_INET)
def test_af_inferred_from_where(self):
- (af, d, s) = _d_and_s("1::2", 53, None, 0)
+ af, d, s = _d_and_s("1::2", 53, None, 0)
self.assertEqual(af, socket.AF_INET6)
def test_af_inferred_from_source(self):
- (af, d, s) = _d_and_s("https://example/dns-query", 443, "1.2.3.4", 0, False)
+ af, d, s = _d_and_s("https://example/dns-query", 443, "1.2.3.4", 0, False)
self.assertEqual(af, socket.AF_INET)
def test_af_mismatch(self):
def bad():
- (af, d, s) = _d_and_s("1::2", 53, "1.2.3.4", 0)
+ af, d, s = _d_and_s("1::2", 53, "1.2.3.4", 0)
self.assertRaises(ValueError, bad)
def test_source_port_but_no_af_inferred(self):
def bad():
- (af, d, s) = _d_and_s("https://example/dns-query", 443, None, 12345, False)
+ af, d, s = _d_and_s("https://example/dns-query", 443, None, 12345, False)
self.assertRaises(ValueError, bad)
def test_where_must_be_an_address(self):
def bad():
- (af, d, s) = _d_and_s("not a valid address", 53, "1.2.3.4", 0)
+ af, d, s = _d_and_s("not a valid address", 53, "1.2.3.4", 0)
self.assertRaises(ValueError, bad)
def test_destination_is_none_of_where_url(self):
- (af, d, s) = _d_and_s("https://example/dns-query", 443, None, 0, False)
+ af, d, s = _d_and_s("https://example/dns-query", 443, None, 0, False)
self.assertEqual(d, None)
def test_v4_wildcard_source_set(self):
- (af, d, s) = _d_and_s("1.2.3.4", 53, None, 12345)
+ af, d, s = _d_and_s("1.2.3.4", 53, None, 12345)
self.assertEqual(s, ("0.0.0.0", 12345))
def test_v6_wildcard_source_set(self):
- (af, d, s) = _d_and_s("1::2", 53, None, 12345)
+ af, d, s = _d_and_s("1::2", 53, None, 12345)
self.assertEqual(s, ("::", 12345, 0, 0))
class LowLevelWaitTests(unittest.TestCase):
def test_wait_for(self):
try:
- (l, r) = socket.socketpair()
+ l, r = socket.socketpair()
# already expired
with self.assertRaises(dns.exception.Timeout):
dns.query._wait_for(l, True, True, True, 0)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
with mock_udp_recv(wire1, from1, wire2, from2):
- (r, when) = dns.query.receive_udp(
+ r, when = dns.query.receive_udp(
s,
("127.0.0.1", 53),
time.time() + 2,
bad_r_wire = bad_r.to_wire()
def bad():
- (r, wire) = self.mock_receive(
+ r, wire = self.mock_receive(
bad_r_wire,
("127.0.0.1", 53),
self.good_r_wire,
)
def test_next_request_abs(self):
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertTrue(answer is None)
self.assertEqual(request.question[0].name, self.qname)
self.assertEqual(request.question[0].rdtype, dns.rdatatype.A)
self.resn = dns.resolver._Resolution(
self.resolver, qname, "A", "IN", False, True, True
)
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertTrue(answer is None)
self.assertEqual(request.question[0].name, self.qname)
self.assertEqual(request.question[0].rdtype, dns.rdatatype.A)
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertTrue(answer is None)
self.assertEqual(request.question[0].name, abs_qname_1)
self.assertEqual(request.question[0].rdtype, dns.rdatatype.A)
def bad():
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertRaises(dns.resolver.NXDOMAIN, bad)
self.resn = dns.resolver._Resolution(
self.resolver, qname, "A", "IN", False, True, False
)
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertTrue(answer is None)
self.assertEqual(request.question[0].name, self.qname)
self.assertEqual(request.question[0].rdtype, dns.rdatatype.A)
def bad():
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertRaises(dns.resolver.NXDOMAIN, bad)
def test_next_request_exhaust_causes_nxdomain(self):
def bad():
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertRaises(dns.resolver.NXDOMAIN, bad)
def make_address_response(self, q):
self.resolver.cache.put(
(self.qname, dns.rdatatype.A, dns.rdataclass.IN), cache_answer
)
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertTrue(request is None)
self.assertTrue(answer is cache_answer)
)
def bad():
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertRaises(dns.resolver.NoAnswer, bad)
# If raise_on_no_answer is False, we should get a cache hit.
self.resn = dns.resolver._Resolution(
self.resolver, self.qname, "A", "IN", False, False, False
)
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertTrue(request is None)
self.assertTrue(answer is cache_answer)
(qname1, dns.rdatatype.ANY, dns.rdataclass.IN), cache_answer
)
try:
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertTrue(False) # should not happen!
except dns.resolver.NXDOMAIN as nx:
self.assertTrue(nx.response(qname1) is r1)
(qname2, dns.rdatatype.ANY, dns.rdataclass.IN), cache_answer
)
try:
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertTrue(False) # should not happen!
except dns.resolver.NXDOMAIN as nx:
self.assertTrue(nx.response(qname1) is r1)
self.resolver.keyring = dns.tsigkeyring.from_text(
{"keyname.": "NjHwPsMKjdN++dOfE5iAiQ=="}
)
- (keyname, secret) = next(iter(self.resolver.keyring.items()))
+ keyname, secret = next(iter(self.resolver.keyring.items()))
self.resolver.keyname = dns.name.from_text("keyname.")
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertFalse(request is None)
self.assertEqual(request.keyring.name, keyname)
self.assertEqual(request.keyring.secret, secret)
def test_next_request_flags(self):
self.resolver.flags = dns.flags.RD | dns.flags.CD
- (request, answer) = self.resn.next_request()
+ request, answer = self.resn.next_request()
self.assertFalse(request is None)
self.assertEqual(request.flags, self.resolver.flags)
def test_next_nameserver_udp(self):
- (request, answer) = self.resn.next_request()
- (nameserver1, tcp, backoff) = self.resn.next_nameserver()
+ request, answer = self.resn.next_request()
+ nameserver1, tcp, backoff = self.resn.next_nameserver()
self.assertEqual(nameserver1.port, 53)
self.assertFalse(tcp)
self.assertEqual(backoff, 0.0)
- (nameserver2, tcp, backoff) = self.resn.next_nameserver()
+ nameserver2, tcp, backoff = self.resn.next_nameserver()
self.assertTrue(nameserver2 != nameserver1)
self.assertEqual(nameserver2.port, 53)
self.assertFalse(tcp)
self.assertEqual(backoff, 0.0)
- (nameserver3, tcp, backoff) = self.resn.next_nameserver()
+ nameserver3, tcp, backoff = self.resn.next_nameserver()
self.assertTrue(nameserver3 is nameserver1)
self.assertFalse(tcp)
self.assertEqual(backoff, 0.1)
- (nameserver4, tcp, backoff) = self.resn.next_nameserver()
+ nameserver4, tcp, backoff = self.resn.next_nameserver()
self.assertTrue(nameserver4 is nameserver2)
self.assertFalse(tcp)
self.assertEqual(backoff, 0.0)
- (nameserver5, tcp, backoff) = self.resn.next_nameserver()
+ nameserver5, tcp, backoff = self.resn.next_nameserver()
self.assertTrue(nameserver5 is nameserver1)
self.assertFalse(tcp)
self.assertEqual(backoff, 0.2)
def test_next_nameserver_retry_with_tcp(self):
- (request, answer) = self.resn.next_request()
- (nameserver1, tcp, backoff) = self.resn.next_nameserver()
+ request, answer = self.resn.next_request()
+ nameserver1, tcp, backoff = self.resn.next_nameserver()
self.assertEqual(nameserver1.port, 53)
self.assertFalse(tcp)
self.assertEqual(backoff, 0.0)
self.resn.retry_with_tcp = True
- (nameserver2, tcp, backoff) = self.resn.next_nameserver()
+ nameserver2, tcp, backoff = self.resn.next_nameserver()
self.assertTrue(nameserver2 is nameserver1)
self.assertTrue(tcp)
self.assertEqual(backoff, 0.0)
- (nameserver3, tcp, backoff) = self.resn.next_nameserver()
+ nameserver3, tcp, backoff = self.resn.next_nameserver()
self.assertTrue(nameserver3 != nameserver1)
self.assertEqual(nameserver3.port, 53)
self.assertFalse(tcp)
self.assertEqual(backoff, 0.0)
def test_next_nameserver_no_nameservers(self):
- (request, answer) = self.resn.next_request()
- (nameserver, _, _) = self.resn.next_nameserver()
+ request, answer = self.resn.next_request()
+ nameserver, _, _ = self.resn.next_nameserver()
self.resn.nameservers.remove(nameserver)
- (nameserver, _, _) = self.resn.next_nameserver()
+ nameserver, _, _ = self.resn.next_nameserver()
self.resn.nameservers.remove(nameserver)
def bad():
- (nameserver, _, _) = self.resn.next_nameserver()
+ nameserver, _, _ = self.resn.next_nameserver()
self.assertRaises(dns.resolver.NoNameservers, bad)
# A query to a nameserver that always supports a maximum size query
# always counts as a "tcp attempt" for the state machine
self.resolver.nameservers = ["https://127.0.0.1:443/bogus"]
- (_, _) = self.resn.next_request()
- (nameserver, tcp_attempt, _) = self.resn.next_nameserver()
+ _, _ = self.resn.next_request()
+ nameserver, tcp_attempt, _ = self.resn.next_nameserver()
print(nameserver)
assert tcp_attempt
new_nameservers = list(self.resolver.nameservers[:])
new_nameservers.extend(["10.0.0.3", "10.0.0.4"])
self.resolver.nameservers = new_nameservers
- (request, _) = self.resn.next_request()
+ request, _ = self.resn.next_request()
exceptions = [
dns.exception.FormError(),
EOFError(),
dns.message.Truncated(),
]
for i in range(4):
- (nameserver, _, _) = self.resn.next_nameserver()
+ nameserver, _, _ = self.resn.next_nameserver()
if i == 3:
# Truncated is only bad if we're doing TCP, make it look
# like that's the case
self.resn.tcp_attempt = True
self.assertTrue(nameserver in self.resn.nameservers)
- (answer, done) = self.resn.query_result(None, exceptions[i])
+ answer, done = self.resn.query_result(None, exceptions[i])
self.assertTrue(answer is None)
self.assertFalse(done)
self.assertFalse(nameserver in self.resn.nameservers)
# except for the exceptions tested in
# test_query_result_nameserver_removing_exceptions(), we should
# not remove any nameservers and just continue resolving.
- (_, _) = self.resn.next_request()
- (_, _, _) = self.resn.next_nameserver()
+ _, _ = self.resn.next_request()
+ _, _, _ = self.resn.next_nameserver()
nameservers = self.resn.nameservers[:]
- (answer, done) = self.resn.query_result(None, dns.exception.Timeout())
+ answer, done = self.resn.query_result(None, dns.exception.Timeout())
self.assertTrue(answer is None)
self.assertFalse(done)
self.assertEqual(nameservers, self.resn.nameservers)
def test_query_result_retry_with_tcp(self):
- (request, _) = self.resn.next_request()
- (nameserver, tcp, _) = self.resn.next_nameserver()
+ request, _ = self.resn.next_request()
+ nameserver, tcp, _ = self.resn.next_nameserver()
self.assertFalse(tcp)
- (answer, done) = self.resn.query_result(None, dns.message.Truncated())
+ answer, done = self.resn.query_result(None, dns.message.Truncated())
self.assertTrue(answer is None)
self.assertFalse(done)
self.assertTrue(self.resn.retry_with_tcp)
def test_query_result_no_error_with_data(self):
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_address_response(q)
- (_, _) = self.resn.next_request()
- (_, _, _) = self.resn.next_nameserver()
- (answer, done) = self.resn.query_result(r, None)
+ _, _ = self.resn.next_request()
+ _, _, _ = self.resn.next_nameserver()
+ answer, done = self.resn.query_result(r, None)
self.assertFalse(answer is None)
self.assertTrue(done)
self.assertEqual(answer.qname, self.qname)
self.resolver.cache = dns.resolver.Cache()
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_address_response(q)
- (_, _) = self.resn.next_request()
- (_, _, _) = self.resn.next_nameserver()
- (answer, done) = self.resn.query_result(r, None)
+ _, _ = self.resn.next_request()
+ _, _, _ = self.resn.next_nameserver()
+ answer, done = self.resn.query_result(r, None)
self.assertFalse(answer is None)
cache_answer = self.resolver.cache.get(
(self.qname, dns.rdatatype.A, dns.rdataclass.IN)
def test_query_result_no_error_no_data(self):
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_negative_response(q)
- (_, _) = self.resn.next_request()
- (_, _, _) = self.resn.next_nameserver()
+ _, _ = self.resn.next_request()
+ _, _, _ = self.resn.next_nameserver()
def bad():
- (answer, done) = self.resn.query_result(r, None)
+ answer, done = self.resn.query_result(r, None)
self.assertRaises(dns.resolver.NoAnswer, bad)
def test_query_result_nxdomain(self):
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_negative_response(q, True)
- (_, _) = self.resn.next_request()
- (_, _, _) = self.resn.next_nameserver()
- (answer, done) = self.resn.query_result(r, None)
+ _, _ = self.resn.next_request()
+ _, _, _ = self.resn.next_nameserver()
+ answer, done = self.resn.query_result(r, None)
self.assertTrue(answer is None)
self.assertTrue(done)
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_address_response(q)
r.set_rcode(dns.rcode.NXDOMAIN)
- (_, _) = self.resn.next_request()
- (nameserver, _, _) = self.resn.next_nameserver()
- (answer, done) = self.resn.query_result(r, None)
+ _, _ = self.resn.next_request()
+ nameserver, _, _ = self.resn.next_nameserver()
+ answer, done = self.resn.query_result(r, None)
self.assertIsNone(answer)
self.assertFalse(done)
self.assertTrue(nameserver not in self.resn.nameservers)
def test_query_result_chain_not_too_long(self):
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_long_chain_response(q, 15)
- (_, _) = self.resn.next_request()
- (_, _, _) = self.resn.next_nameserver()
- (answer, done) = self.resn.query_result(r, None)
+ _, _ = self.resn.next_request()
+ _, _, _ = self.resn.next_nameserver()
+ answer, done = self.resn.query_result(r, None)
self.assertIsNotNone(answer)
self.assertTrue(done)
def test_query_result_chain_too_long(self):
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_long_chain_response(q, 16)
- (_, _) = self.resn.next_request()
- (nameserver, _, _) = self.resn.next_nameserver()
- (answer, done) = self.resn.query_result(r, None)
+ _, _ = self.resn.next_request()
+ nameserver, _, _ = self.resn.next_nameserver()
+ answer, done = self.resn.query_result(r, None)
self.assertIsNone(answer)
self.assertFalse(done)
self.assertTrue(nameserver not in self.resn.nameservers)
self.resolver.cache = dns.resolver.Cache()
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_negative_response(q, True)
- (_, _) = self.resn.next_request()
- (_, _, _) = self.resn.next_nameserver()
- (answer, done) = self.resn.query_result(r, None)
+ _, _ = self.resn.next_request()
+ _, _, _ = self.resn.next_nameserver()
+ answer, done = self.resn.query_result(r, None)
self.assertTrue(answer is None)
self.assertTrue(done)
cache_answer = self.resolver.cache.get(
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_address_response(q)
r.set_rcode(dns.rcode.YXDOMAIN)
- (_, _) = self.resn.next_request()
- (_, _, _) = self.resn.next_nameserver()
+ _, _ = self.resn.next_request()
+ _, _, _ = self.resn.next_nameserver()
def bad():
- (answer, done) = self.resn.query_result(r, None)
+ answer, done = self.resn.query_result(r, None)
self.assertRaises(dns.resolver.YXDOMAIN, bad)
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_address_response(q)
r.set_rcode(dns.rcode.SERVFAIL)
- (_, _) = self.resn.next_request()
- (nameserver, _, _) = self.resn.next_nameserver()
- (answer, done) = self.resn.query_result(r, None)
+ _, _ = self.resn.next_request()
+ nameserver, _, _ = self.resn.next_nameserver()
+ answer, done = self.resn.query_result(r, None)
self.assertTrue(answer is None)
self.assertFalse(done)
self.assertTrue(nameserver not in self.resn.nameservers)
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_address_response(q)
r.set_rcode(dns.rcode.SERVFAIL)
- (_, _) = self.resn.next_request()
- (_, _, _) = self.resn.next_nameserver()
+ _, _ = self.resn.next_request()
+ _, _, _ = self.resn.next_nameserver()
nameservers = self.resn.nameservers[:]
- (answer, done) = self.resn.query_result(r, None)
+ answer, done = self.resn.query_result(r, None)
self.assertTrue(answer is None)
self.assertFalse(done)
self.assertEqual(nameservers, self.resn.nameservers)
q = dns.message.make_query(self.qname, dns.rdatatype.A)
r = self.make_address_response(q)
r.set_rcode(dns.rcode.REFUSED)
- (_, _) = self.resn.next_request()
- (nameserver, _, _) = self.resn.next_nameserver()
- (answer, done) = self.resn.query_result(r, None)
+ _, _ = self.resn.next_request()
+ nameserver, _, _ = self.resn.next_nameserver()
+ answer, done = self.resn.query_result(r, None)
self.assertTrue(answer is None)
self.assertFalse(done)
self.assertTrue(nameserver not in self.resn.nameservers)
def test_set_config_method(self):
from dns.win32util import set_config_method, ConfigMethod
- self.assertNotEqual(dns.win32util._config_method, dns.win32util.ConfigMethod.Win32)
+
+ self.assertNotEqual(
+ dns.win32util._config_method, dns.win32util.ConfigMethod.Win32
+ )
dns.win32util.set_config_method(dns.win32util.ConfigMethod.Win32)
- self.assertEqual(dns.win32util._config_method, dns.win32util.ConfigMethod.Win32)
+ self.assertEqual(
+ dns.win32util._config_method, dns.win32util.ConfigMethod.Win32
+ )
+
class ResolverNameserverValidTypeTestCase(unittest.TestCase):
def test_set_nameservers_to_list(self):
def test_make_query_basic():
z = dns.versioned.Zone("example.")
- (q, s) = dns.xfr.make_query(z)
+ q, s = dns.xfr.make_query(z)
assert q.question[0].rdtype == dns.rdatatype.AXFR
assert s is None
- (q, s) = dns.xfr.make_query(z, serial=None)
+ q, s = dns.xfr.make_query(z, serial=None)
assert q.question[0].rdtype == dns.rdatatype.AXFR
assert s is None
- (q, s) = dns.xfr.make_query(z, serial=10)
+ q, s = dns.xfr.make_query(z, serial=10)
assert q.question[0].rdtype == dns.rdatatype.IXFR
assert q.authority[0].rdtype == dns.rdatatype.SOA
assert q.authority[0][0].serial == 10
assert s == 10
with z.writer() as txn:
txn.add("@", 300, dns.rdata.from_text("in", "soa", ". . 1 2 3 4 5"))
- (q, s) = dns.xfr.make_query(z)
+ q, s = dns.xfr.make_query(z)
assert q.question[0].rdtype == dns.rdatatype.IXFR
assert q.authority[0].rdtype == dns.rdatatype.SOA
assert q.authority[0][0].serial == 1
assert s == 1
- (q, s) = dns.xfr.make_query(z, keyring=keyring, keyname=keyname)
+ q, s = dns.xfr.make_query(z, keyring=keyring, keyname=keyname)
assert q.question[0].rdtype == dns.rdatatype.IXFR
assert q.authority[0].rdtype == dns.rdatatype.SOA
assert q.authority[0][0].serial == 1
def test_extract_serial_from_query():
z = dns.versioned.Zone("example.")
- (q, s) = dns.xfr.make_query(z)
+ q, s = dns.xfr.make_query(z)
xs = dns.xfr.extract_serial_from_query(q)
assert s is None
assert s == xs
- (q, s) = dns.xfr.make_query(z, serial=10)
+ q, s = dns.xfr.make_query(z, serial=10)
xs = dns.xfr.extract_serial_from_query(q)
assert s == 10
assert s == xs
"""
include_text = """$INCLUDE "%s"
-""" % here(
- "example"
-)
+""" % here("example")
bad_directive_text = """$FOO bar
$ORIGIN example.
class ZoneDigestTestCase(unittest.TestCase):
# Examples from RFC 8976, fixed per errata.
- simple_example = textwrap.dedent(
- """
+ simple_example = textwrap.dedent("""
example. 86400 IN SOA ns1 admin 2018031900 (
1800 900 604800 86400 )
86400 IN NS ns1
777f98b8e730044c )
ns1 3600 IN A 203.0.113.63
ns2 3600 IN AAAA 2001:db8::63
- """
- )
+ """)
- complex_example = textwrap.dedent(
- """
+ complex_example = textwrap.dedent("""
example. 86400 IN SOA ns1 admin 2018031900 (
1800 900 604800 86400 )
86400 IN NS ns1
6f77656420627574
2069676e6f726564
2e20616c6c6f7765 )
- """
- )
+ """)
- multiple_digests_example = textwrap.dedent(
- """
+ multiple_digests_example = textwrap.dedent("""
example. 86400 IN SOA ns1 admin 2018031900 (
1800 900 604800 86400 )
example. 86400 IN NS ns1.example.
ns1.example. 3600 IN A 203.0.113.63
ns2.example. 86400 IN TXT "This example has multiple digests"
NS2.EXAMPLE. 3600 IN AAAA 2001:db8::63
- """
- )
+ """)
def _get_zonemd(self, zone):
return zone.get_rdataset(zone.origin, "ZONEMD")
with self.assertRaises(dns.exception.SyntaxError):
dns.rdata.from_text("IN", "ZONEMD", "100 1 0 " + self.sha384_hash)
- sorting_zone = textwrap.dedent(
- """
+ sorting_zone = textwrap.dedent("""
@ 86400 IN SOA ns1 admin 2018031900 (
1800 900 604800 86400 )
86400 IN NS ns1
86400 IN NS ns2
86400 IN RP n1.example. a.
86400 IN RP n1. b.
- """
- )
+ """)
def test_relative_zone_sorting(self):
z1 = dns.zone.from_text(self.sorting_zone, "example.", relativize=True)