]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Use the selectors module instead of select
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 31 Mar 2020 05:45:08 +0000 (18:45 +1300)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 31 Mar 2020 05:45:08 +0000 (18:45 +1300)
Choose a better way to wait for readyness than select if available.

Also fixed an error in both sync and async waiting: don't discard the
result of send().

psycopg3/connection.py
psycopg3/waiting.py

index 7d89c1b7ba6de9018bb7e9e7fba62e7a029053d9..8c68b7a7396ee650d2ffa2e6eb325499cbdd9988 100644 (file)
@@ -24,7 +24,7 @@ from . import pq
 from . import exceptions as exc
 from . import cursor
 from .conninfo import make_conninfo
-from .waiting import wait_select, wait_async, Wait, Ready
+from .waiting import wait, wait_async, Wait, Ready
 
 logger = logging.getLogger(__name__)
 
@@ -135,7 +135,7 @@ class BaseConnection:
                 break
 
             ready = yield pgconn.socket, Wait.RW
-            if ready is Ready.R:
+            if ready & Ready.R:
                 pgconn.consume_input()
             continue
 
@@ -211,8 +211,12 @@ class Connection(BaseConnection):
                 )
 
     @classmethod
-    def wait(cls, gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
-        return wait_select(gen)
+    def wait(
+        cls,
+        gen: Generator[Tuple[int, Wait], Ready, RV],
+        timeout: Optional[float] = 0.1,
+    ) -> RV:
+        return wait(gen, timeout=timeout)
 
 
 class AsyncConnection(BaseConnection):
index 59e0e1f618cfa87874127d415bff594bad5b92ce..c1743d45b83817d5e06043cd5939950d12d1ca57 100644 (file)
@@ -5,51 +5,55 @@ Code concerned with waiting in different contexts (blocking, async, etc).
 # Copyright (C) 2020 The Psycopg Team
 
 
-from enum import Enum
-from select import select
-from typing import Generator, Tuple, TypeVar
+from enum import IntEnum
+from typing import Generator, Optional, Tuple, TypeVar
 from asyncio import get_event_loop, Event
+from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
 
 from . import exceptions as exc
 
 
-Wait = Enum("Wait", "R W RW")
-Ready = Enum("Ready", "R W")
+class Wait(IntEnum):
+    R = EVENT_READ
+    W = EVENT_WRITE
+    RW = EVENT_READ | EVENT_WRITE
 
-RV = TypeVar("RV")
 
+class Ready(IntEnum):
+    R = EVENT_READ
+    W = EVENT_WRITE
 
-def wait_select(gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
-    """
-    Wait on the behalf of a generator using select().
 
-    *gen* is expected to generate tuples (fd, status). consume it and block
-    according to the status until fd is ready. Send back the ready state
-    to the generator.
+RV = TypeVar("RV")
 
-    Return what the generator eventually returned.
+
+def wait(
+    gen: Generator[Tuple[int, Wait], Ready, RV],
+    timeout: Optional[float] = None,
+) -> RV:
+    """
+    Wait for a generator using the best option available on the platform.
+
+    :param gen: a generator performing database operations and yielding
+        (fd, `Ready`) pairs when it would block.
+    :param timeout: timeout (in seconds) to check for other interrupt, e.g.
+        to allow Ctrl-C.
+    :type timeout: float
+    :return: whatever *gen* returns on completion.
     """
+    sel = DefaultSelector()
     try:
+        fd, s = next(gen)
         while 1:
-            fd, s = next(gen)
-            if s is Wait.R:
-                rf, wf, xf = select([fd], [], [])
-                assert rf
-                gen.send(Ready.R)
-            elif s is Wait.W:
-                rf, wf, xf = select([], [fd], [])
-                assert wf
-                gen.send(Ready.W)
-            elif s is Wait.RW:
-                rf, wf, xf = select([fd], [fd], [])
-                assert rf or wf
-                assert not (rf and wf)
-                if rf:
-                    gen.send(Ready.R)
-                else:
-                    gen.send(Ready.W)
-            else:
-                raise exc.InternalError("bad poll status: %s")
+            sel.register(fd, s)
+            ready = None
+            while not ready:
+                ready = sel.select(timeout=timeout)
+            sel.unregister(fd)
+
+            assert len(ready) == 1
+            fd, s = gen.send(ready[0][1])
+
     except StopIteration as e:
         rv: RV = e.args[0]
         return rv
@@ -77,28 +81,27 @@ async def wait_async(gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
         ev.set()
 
     try:
+        fd, s = next(gen)
         while 1:
-            fd, s = next(gen)
             ev.clear()
             if s is Wait.R:
                 loop.add_reader(fd, wakeup, Ready.R)
                 await ev.wait()
                 loop.remove_reader(fd)
-                gen.send(ready)
             elif s is Wait.W:
                 loop.add_writer(fd, wakeup, Ready.W)
                 await ev.wait()
                 loop.remove_writer(fd)
-                gen.send(ready)
             elif s is Wait.RW:
                 loop.add_reader(fd, wakeup, Ready.R)
                 loop.add_writer(fd, wakeup, Ready.W)
                 await ev.wait()
                 loop.remove_reader(fd)
                 loop.remove_writer(fd)
-                gen.send(ready)
             else:
                 raise exc.InternalError("bad poll status: %s")
+            fd, s = gen.send(ready)
+
     except StopIteration as e:
         rv: RV = e.args[0]
         return rv