self.transport = transport
self.protocol = protocol
- async def sendto(self, what, destination, timeout):
+ async def sendto(self, what, destination, timeout): # pragma: no cover
# no timeout for asyncio sendto
self.transport.sendto(what, destination)
server_hostname=server_hostname),
timeout)
return StreamSocket(af, r, w)
- raise NotImplementedError(f'unsupported socket type {socktype}')
+ raise NotImplementedError('unsupported socket ' +
+ f'type {socktype}') # pragma: no cover
async def sleep(self, interval):
await asyncio.sleep(interval)
async def sendto(self, what, destination, timeout):
async with _maybe_timeout(timeout):
return await self.socket.sendto(what, destination)
- raise dns.exception.Timeout(timeout=timeout)
+ raise dns.exception.Timeout(timeout=timeout) # pragma: no cover
async def recvfrom(self, size, timeout):
async with _maybe_timeout(timeout):
try:
if source:
s.bind(_lltuple(source, af))
- except Exception:
+ except Exception: # pragma: no cover
await s.close()
raise
return DatagramSocket(s)
source_addr=source_addr,
server_hostname=server_hostname)
return StreamSocket(s)
- raise NotImplementedError(f'unsupported socket type {socktype}')
+ raise NotImplementedError('unsupported socket ' +
+ f'type {socktype}') # pragma: no cover
async def sleep(self, interval):
await curio.sleep(interval)
async def sendto(self, what, destination, timeout):
with _maybe_timeout(timeout):
return await self.socket.sendto(what, destination)
- raise dns.exception.Timeout(timeout=timeout)
+ raise dns.exception.Timeout(timeout=timeout) # pragma: no cover
async def recvfrom(self, size, timeout):
with _maybe_timeout(timeout):
if socktype == socket.SOCK_STREAM:
with _maybe_timeout(timeout):
await s.connect(_lltuple(destination, af))
- except Exception:
+ except Exception: # pragma: no cover
s.close()
raise
if socktype == socket.SOCK_DGRAM:
try:
stream = trio.SSLStream(stream, ssl_context,
server_hostname=server_hostname)
- except Exception:
+ except Exception: # pragma: no cover
await stream.aclose()
raise
return StreamSocket(af, stream, tls)
- raise NotImplementedError(f'unsupported socket type {socktype}')
+ raise NotImplementedError('unsupported socket ' +
+ f'type {socktype}') # pragma: no cover
async def sleep(self, interval):
await trio.sleep(interval)