From: Daniele Varrazzo Date: Sat, 21 Mar 2020 09:34:11 +0000 (+1300) Subject: Use an event instead of a queue in wait_async X-Git-Tag: 3.0.dev0~689 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8115ef9da568f09b978116b98777794fa781d8de;p=thirdparty%2Fpsycopg.git Use an event instead of a queue in wait_async --- diff --git a/psycopg3/waiting.py b/psycopg3/waiting.py index 5023a36b8..67f34bcb7 100644 --- a/psycopg3/waiting.py +++ b/psycopg3/waiting.py @@ -7,8 +7,7 @@ Code concerned with waiting in different contexts (blocking, async, etc). from enum import Enum from select import select -from asyncio import get_event_loop -from asyncio.queues import Queue +from asyncio import get_event_loop, Event from . import exceptions as exc @@ -64,25 +63,33 @@ async def wait_async(gen): """ # Use a queue to block and restart after the fd state changes. # Not sure this is the best implementation but it's a start. - q = Queue() + e = Event() loop = get_event_loop() + ready = None + + def wakeup(state): + nonlocal ready + ready = state + e.set() + try: while 1: fd, s = next(gen) + e.clear() if s is Wait.R: - loop.add_reader(fd, q.put_nowait, Ready.R) - ready = await q.get() + loop.add_reader(fd, wakeup, Ready.R) + await e.wait() loop.remove_reader(fd) gen.send(ready) elif s is Wait.W: - loop.add_writer(fd, q.put_nowait, Ready.W) - ready = await q.get() + loop.add_writer(fd, wakeup, Ready.W) + await e.wait() loop.remove_writer(fd) gen.send(ready) elif s is Wait.RW: - loop.add_reader(fd, q.put_nowait, Ready.R) - loop.add_writer(fd, q.put_nowait, Ready.W) - ready = await q.get() + loop.add_reader(fd, wakeup, Ready.R) + loop.add_writer(fd, wakeup, Ready.W) + await e.wait() loop.remove_reader(fd) loop.remove_writer(fd) gen.send(ready)