import sys
import socket
from unittest.mock import ANY
-from test.support import os_helper, SHORT_TIMEOUT, busy_retry, requires_gil_enabled
+from test.support import os_helper, SHORT_TIMEOUT, busy_retry
from test.support.script_helper import make_script
from test.support.socket_helper import find_unused_port
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @requires_gil_enabled("gh-133359: occasionally flaky on AMD64")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support")
def test_async_global_awaited_by(self):
assert message == data.decode()
writer.close()
await writer.wait_closed()
+ # Signal we are ready to sleep
+ sock.sendall(b"ready")
await asyncio.sleep(SHORT_TIMEOUT)
async def echo_client_spam(server):
random.shuffle(msg)
tg.create_task(echo_client("".join(msg)))
await asyncio.sleep(0)
- # at least a 1000 tasks created
- sock.sendall(b"ready")
+ # at least a 1000 tasks created. Each task will signal
+ # when is ready to avoid the race caused by the fact that
+ # tasks are waited on tg.__exit__ and we cannot signal when
+ # that happens otherwise
# at this point all client tasks completed without assertion errors
# let's wrap up the test
server.close()
p = subprocess.Popen([sys.executable, script_name])
client_socket, _ = server_socket.accept()
server_socket.close()
- response = client_socket.recv(1024)
- self.assertEqual(response, b"ready")
+ for _ in range(1000):
+ expected_response = b"ready"
+ response = client_socket.recv(len(expected_response))
+ self.assertEqual(response, expected_response)
for _ in busy_retry(SHORT_TIMEOUT):
try:
all_awaited_by = get_all_awaited_by(p.pid)