return asyncio.get_event_loop()
-class _DatagramProtocol:
+class _DatagramProtocol(asyncio.DatagramProtocol):
def __init__(self):
self.transport = None
self.recvfrom = None
return await awaitable
-class DatagramSocket(dns._asyncbackend.DatagramSocket):
+class _DatagramSocket(dns._asyncbackend.DatagramSocket):
def __init__(self, family, transport, protocol):
super().__init__(family, socket.SOCK_DGRAM)
self.transport = transport
raise NotImplementedError
-class StreamSocket(dns._asyncbackend.StreamSocket):
+class _StreamSocket(dns._asyncbackend.StreamSocket):
def __init__(self, af, reader, writer):
super().__init__(af, socket.SOCK_STREAM)
self.reader = reader
source = (dns.inet.any_for_af(af), 0)
transport, protocol = await loop.create_datagram_endpoint(
_DatagramProtocol, # type: ignore
- source,
+ local_addr=source,
family=af,
proto=proto,
remote_addr=destination,
)
- return DatagramSocket(af, transport, protocol)
+ return _DatagramSocket(af, transport, protocol)
elif socktype == socket.SOCK_STREAM:
if destination is None:
# This shouldn't happen, but we check to make code analysis software
),
timeout,
)
- return StreamSocket(af, r, w)
+ return _StreamSocket(af, r, w)
raise NotImplementedError(
"unsupported socket " + f"type {socktype}"
) # pragma: no cover
one_rr_per_rrset=one_rr_per_rrset,
ignore_trailing=ignore_trailing,
raise_on_truncation=raise_on_truncation,
+ continue_on_error=ignore_errors,
)
except dns.message.Truncated as e:
# See the comment in query.py for details.
keyring: dict[dns.name.Name, dns.tsig.Key] | None = None,
request_mac: bytes | None = b"",
ignore_trailing: bool = False,
+ ignore_errors: bool = False,
) -> tuple[dns.message.Message, float]:
"""Read a DNS message from a TCP socket.
request_mac=request_mac,
one_rr_per_rrset=one_rr_per_rrset,
ignore_trailing=ignore_trailing,
+ continue_on_error=ignore_errors,
)
return (r, received_time)