def tcp_server(self, server_prog, *,
family=socket.AF_INET,
addr=None,
- timeout=5,
+ timeout=support.SHORT_TIMEOUT,
backlog=1,
max_clients=10):
def tcp_client(self, client_prog,
family=socket.AF_INET,
- timeout=10):
+ timeout=support.SHORT_TIMEOUT):
sock = socket.socket(family, socket.SOCK_STREAM)
async def start_server():
extras = {}
- extras = dict(ssl_handshake_timeout=40.0)
+ extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT)
srv = await asyncio.start_server(
handle_client,
async def client(addr):
extras = {}
- extras = dict(ssl_handshake_timeout=40.0)
+ extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT)
reader, writer = await asyncio.open_connection(
*addr,
*addr,
ssl=client_sslctx,
server_hostname='',
- ssl_handshake_timeout=1.0)
+ ssl_handshake_timeout=support.SHORT_TIMEOUT)
writer.close()
await self.wait_closed(writer)
extras = {}
if server_ssl:
- extras = dict(ssl_handshake_timeout=10.0)
+ extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT)
f = loop.create_task(
loop.connect_accepted_socket(
with self.tcp_server(serve, timeout=self.TIMEOUT) as srv:
self.loop.run_until_complete(
- asyncio.wait_for(client(srv.addr), timeout=10))
+ asyncio.wait_for(client(srv.addr),
+ timeout=support.SHORT_TIMEOUT))
def test_create_connection_memory_leak(self):
HELLO_MSG = b'1' * self.PAYLOAD_SIZE
with self.tcp_server(serve, timeout=self.TIMEOUT) as srv:
self.loop.run_until_complete(
- asyncio.wait_for(client(srv.addr), timeout=10))
+ asyncio.wait_for(client(srv.addr),
+ timeout=support.SHORT_TIMEOUT))
# No garbage is left for SSL client from loop.create_connection, even
# if user stores the SSLTransport in corresponding protocol instance
with self.tcp_server(serve, timeout=self.TIMEOUT) as srv:
self.loop.run_until_complete(
- asyncio.wait_for(client(srv.addr), timeout=10))
+ asyncio.wait_for(client(srv.addr),
+ timeout=support.SHORT_TIMEOUT))
def test_start_tls_server_1(self):
HELLO_MSG = b'1' * self.PAYLOAD_SIZE
async def client(addr):
extras = {}
- extras = dict(ssl_handshake_timeout=10.0)
+ extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT)
reader, writer = await asyncio.open_connection(
*addr,