]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Use an event instead of a queue in wait_async
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 21 Mar 2020 09:34:11 +0000 (22:34 +1300)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 21 Mar 2020 09:34:11 +0000 (22:34 +1300)
psycopg3/waiting.py

index 5023a36b86e0212d10ff3b749f7426133e537a05..67f34bcb7329bcdbc0b42913f4a7ea811c289e62 100644 (file)
@@ -7,8 +7,7 @@ Code concerned with waiting in different contexts (blocking, async, etc).
 
 from enum import Enum
 from select import select
-from asyncio import get_event_loop
-from asyncio.queues import Queue
+from asyncio import get_event_loop, Event
 
 from . import exceptions as exc
 
@@ -64,25 +63,33 @@ async def wait_async(gen):
     """
     # Use a queue to block and restart after the fd state changes.
     # Not sure this is the best implementation but it's a start.
-    q = Queue()
+    e = Event()
     loop = get_event_loop()
+    ready = None
+
+    def wakeup(state):
+        nonlocal ready
+        ready = state
+        e.set()
+
     try:
         while 1:
             fd, s = next(gen)
+            e.clear()
             if s is Wait.R:
-                loop.add_reader(fd, q.put_nowait, Ready.R)
-                ready = await q.get()
+                loop.add_reader(fd, wakeup, Ready.R)
+                await e.wait()
                 loop.remove_reader(fd)
                 gen.send(ready)
             elif s is Wait.W:
-                loop.add_writer(fd, q.put_nowait, Ready.W)
-                ready = await q.get()
+                loop.add_writer(fd, wakeup, Ready.W)
+                await e.wait()
                 loop.remove_writer(fd)
                 gen.send(ready)
             elif s is Wait.RW:
-                loop.add_reader(fd, q.put_nowait, Ready.R)
-                loop.add_writer(fd, q.put_nowait, Ready.W)
-                ready = await q.get()
+                loop.add_reader(fd, wakeup, Ready.R)
+                loop.add_writer(fd, wakeup, Ready.W)
+                await e.wait()
                 loop.remove_reader(fd)
                 loop.remove_writer(fd)
                 gen.send(ready)