From: Daniele Varrazzo Date: Tue, 31 Mar 2020 05:45:08 +0000 (+1300) Subject: Use the selectors module instead of select X-Git-Tag: 3.0.dev0~640 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=bb16de93772c6d5d07097d1a2abbd8334323dadf;p=thirdparty%2Fpsycopg.git Use the selectors module instead of select Choose a better way to wait for readyness than select if available. Also fixed an error in both sync and async waiting: don't discard the result of send(). --- diff --git a/psycopg3/connection.py b/psycopg3/connection.py index 7d89c1b7b..8c68b7a73 100644 --- a/psycopg3/connection.py +++ b/psycopg3/connection.py @@ -24,7 +24,7 @@ from . import pq from . import exceptions as exc from . import cursor from .conninfo import make_conninfo -from .waiting import wait_select, wait_async, Wait, Ready +from .waiting import wait, wait_async, Wait, Ready logger = logging.getLogger(__name__) @@ -135,7 +135,7 @@ class BaseConnection: break ready = yield pgconn.socket, Wait.RW - if ready is Ready.R: + if ready & Ready.R: pgconn.consume_input() continue @@ -211,8 +211,12 @@ class Connection(BaseConnection): ) @classmethod - def wait(cls, gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV: - return wait_select(gen) + def wait( + cls, + gen: Generator[Tuple[int, Wait], Ready, RV], + timeout: Optional[float] = 0.1, + ) -> RV: + return wait(gen, timeout=timeout) class AsyncConnection(BaseConnection): diff --git a/psycopg3/waiting.py b/psycopg3/waiting.py index 59e0e1f61..c1743d45b 100644 --- a/psycopg3/waiting.py +++ b/psycopg3/waiting.py @@ -5,51 +5,55 @@ Code concerned with waiting in different contexts (blocking, async, etc). # Copyright (C) 2020 The Psycopg Team -from enum import Enum -from select import select -from typing import Generator, Tuple, TypeVar +from enum import IntEnum +from typing import Generator, Optional, Tuple, TypeVar from asyncio import get_event_loop, Event +from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from . import exceptions as exc -Wait = Enum("Wait", "R W RW") -Ready = Enum("Ready", "R W") +class Wait(IntEnum): + R = EVENT_READ + W = EVENT_WRITE + RW = EVENT_READ | EVENT_WRITE -RV = TypeVar("RV") +class Ready(IntEnum): + R = EVENT_READ + W = EVENT_WRITE -def wait_select(gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV: - """ - Wait on the behalf of a generator using select(). - *gen* is expected to generate tuples (fd, status). consume it and block - according to the status until fd is ready. Send back the ready state - to the generator. +RV = TypeVar("RV") - Return what the generator eventually returned. + +def wait( + gen: Generator[Tuple[int, Wait], Ready, RV], + timeout: Optional[float] = None, +) -> RV: + """ + Wait for a generator using the best option available on the platform. + + :param gen: a generator performing database operations and yielding + (fd, `Ready`) pairs when it would block. + :param timeout: timeout (in seconds) to check for other interrupt, e.g. + to allow Ctrl-C. + :type timeout: float + :return: whatever *gen* returns on completion. """ + sel = DefaultSelector() try: + fd, s = next(gen) while 1: - fd, s = next(gen) - if s is Wait.R: - rf, wf, xf = select([fd], [], []) - assert rf - gen.send(Ready.R) - elif s is Wait.W: - rf, wf, xf = select([], [fd], []) - assert wf - gen.send(Ready.W) - elif s is Wait.RW: - rf, wf, xf = select([fd], [fd], []) - assert rf or wf - assert not (rf and wf) - if rf: - gen.send(Ready.R) - else: - gen.send(Ready.W) - else: - raise exc.InternalError("bad poll status: %s") + sel.register(fd, s) + ready = None + while not ready: + ready = sel.select(timeout=timeout) + sel.unregister(fd) + + assert len(ready) == 1 + fd, s = gen.send(ready[0][1]) + except StopIteration as e: rv: RV = e.args[0] return rv @@ -77,28 +81,27 @@ async def wait_async(gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV: ev.set() try: + fd, s = next(gen) while 1: - fd, s = next(gen) ev.clear() if s is Wait.R: loop.add_reader(fd, wakeup, Ready.R) await ev.wait() loop.remove_reader(fd) - gen.send(ready) elif s is Wait.W: loop.add_writer(fd, wakeup, Ready.W) await ev.wait() loop.remove_writer(fd) - gen.send(ready) elif s is Wait.RW: loop.add_reader(fd, wakeup, Ready.R) loop.add_writer(fd, wakeup, Ready.W) await ev.wait() loop.remove_reader(fd) loop.remove_writer(fd) - gen.send(ready) else: raise exc.InternalError("bad poll status: %s") + fd, s = gen.send(ready) + except StopIteration as e: rv: RV = e.args[0] return rv