async def serve(*args):
pass
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.bind(addr)
+ with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
+ sock.bind(addr)
- srv = await asyncio.start_unix_server(serve, sock=sock)
+ srv = await asyncio.start_unix_server(serve, sock=sock)
- srv.close()
- self.assertFalse(os.path.exists(addr))
+ srv.close()
+ self.assertFalse(os.path.exists(addr))
@socket_helper.skip_unless_bind_unix_socket
async def test_unix_server_cleanup_gone(self):
async def serve(*args):
pass
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.bind(addr)
+ with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
+ sock.bind(addr)
- srv = await asyncio.start_unix_server(serve, sock=sock)
+ srv = await asyncio.start_unix_server(serve, sock=sock)
- os.unlink(addr)
+ os.unlink(addr)
- srv.close()
+ srv.close()
@socket_helper.skip_unless_bind_unix_socket
async def test_unix_server_cleanup_replaced(self):
srv = await asyncio.start_unix_server(serve, addr)
os.unlink(addr)
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.bind(addr)
+ with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
+ sock.bind(addr)
- srv.close()
- self.assertTrue(os.path.exists(addr))
+ srv.close()
+ self.assertTrue(os.path.exists(addr))
@socket_helper.skip_unless_bind_unix_socket
async def test_unix_server_cleanup_prevented(self):