From: Daniele Varrazzo Date: Tue, 25 Oct 2022 20:31:11 +0000 (+0200) Subject: perf(c): add C version of select waiting function X-Git-Tag: 3.1.5~7^2~9 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=94655781b77d8bade3c5e19aecfafaa896261602;p=thirdparty%2Fpsycopg.git perf(c): add C version of select waiting function 'Wait' and 'Ready' enums moved into the '_enums' module to avoid a circular import. --- diff --git a/psycopg/psycopg/_enums.py b/psycopg/psycopg/_enums.py index aeb0ee420..a7cb78df4 100644 --- a/psycopg/psycopg/_enums.py +++ b/psycopg/psycopg/_enums.py @@ -8,10 +8,23 @@ libpq-defined enums. # Copyright (C) 2020 The Psycopg Team from enum import Enum, IntEnum +from selectors import EVENT_READ, EVENT_WRITE from . import pq +class Wait(IntEnum): + R = EVENT_READ + W = EVENT_WRITE + RW = EVENT_READ | EVENT_WRITE + + +class Ready(IntEnum): + R = EVENT_READ + W = EVENT_WRITE + RW = EVENT_READ | EVENT_WRITE + + class PyFormat(str, Enum): """ Enum representing the format wanted for a query argument. diff --git a/psycopg/psycopg/waiting.py b/psycopg/psycopg/waiting.py index 3b1329649..1d4db1fa5 100644 --- a/psycopg/psycopg/waiting.py +++ b/psycopg/psycopg/waiting.py @@ -11,26 +11,14 @@ These functions are designed to consume the generators returned by the import select import selectors -from enum import IntEnum from typing import Dict, Optional from asyncio import get_event_loop, wait_for, Event, TimeoutError -from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE +from selectors import DefaultSelector from . import errors as e from .abc import PQGen, PQGenConn, RV - - -class Wait(IntEnum): - R = EVENT_READ - W = EVENT_WRITE - RW = EVENT_READ | EVENT_WRITE - - -class Ready(IntEnum): - R = EVENT_READ - W = EVENT_WRITE - RW = EVENT_READ | EVENT_WRITE - +from ._enums import Wait as Wait, Ready as Ready # re-exported +from ._cmodule import _psycopg WAIT_R = Wait.R WAIT_W = Wait.W @@ -217,7 +205,7 @@ async def wait_conn_async(gen: PQGenConn[RV], timeout: Optional[float] = None) - # Specialised implementation of wait functions. -def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV: +def _wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV: """ Wait for a generator using select where supported. """ @@ -300,6 +288,8 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> return rv +wait_select = _psycopg.wait_select if _psycopg else _wait_select + # Choose the best wait strategy for the platform. # # the selectors objects have a generic interface but come with some overhead, diff --git a/psycopg_c/psycopg_c/_psycopg.pyi b/psycopg_c/psycopg_c/_psycopg.pyi index 9bff1ee1a..fd54d82cc 100644 --- a/psycopg_c/psycopg_c/_psycopg.pyi +++ b/psycopg_c/psycopg_c/_psycopg.pyi @@ -61,6 +61,9 @@ def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ... def pipeline_communicate( pgconn: PGconn, commands: Deque[abc.PipelineCommand] ) -> abc.PQGen[List[List[PGresult]]]: ... +def wait_select( + gen: abc.PQGen[abc.RV], fileno: int, timeout: Optional[float] = None +) -> abc.RV: ... # Copy support def format_row_text( diff --git a/psycopg_c/psycopg_c/_psycopg.pyx b/psycopg_c/psycopg_c/_psycopg.pyx index e4faa8088..9d2b8baed 100644 --- a/psycopg_c/psycopg_c/_psycopg.pyx +++ b/psycopg_c/psycopg_c/_psycopg.pyx @@ -39,6 +39,7 @@ include "_psycopg/adapt.pyx" include "_psycopg/copy.pyx" include "_psycopg/generators.pyx" include "_psycopg/transform.pyx" +include "_psycopg/waiting.pyx" include "types/array.pyx" include "types/datetime.pyx" diff --git a/psycopg_c/psycopg_c/_psycopg/generators.pyx b/psycopg_c/psycopg_c/_psycopg/generators.pyx index e389ef7a0..29f6fb40e 100644 --- a/psycopg_c/psycopg_c/_psycopg/generators.pyx +++ b/psycopg_c/psycopg_c/_psycopg/generators.pyx @@ -11,7 +11,7 @@ from typing import List from psycopg import errors as e from psycopg.pq import abc, error_message from psycopg.abc import PipelineCommand, PQGen -from psycopg.waiting import Wait, Ready +from psycopg._enums import Wait, Ready from psycopg._compat import Deque from psycopg._encodings import conninfo_encoding diff --git a/psycopg_c/psycopg_c/_psycopg/waiting.pyx b/psycopg_c/psycopg_c/_psycopg/waiting.pyx new file mode 100644 index 000000000..5eccbd0c1 --- /dev/null +++ b/psycopg_c/psycopg_c/_psycopg/waiting.pyx @@ -0,0 +1,118 @@ +""" +C implementation of waiting functions +""" + +# Copyright (C) 2022 The Psycopg Team + +cdef extern from *: + """ +#include + +#define SELECT_EV_READ 1 +#define SELECT_EV_WRITE 2 +#define SEC_TO_US (1000 * 1000) + +static int +select_impl(int fileno, int wait, float timeout) +{ + fd_set ifds; + fd_set ofds; + fd_set efds; + struct timeval tv, *tvptr; + int select_rv, rv = 0; + + FD_ZERO(&ifds); + FD_ZERO(&ofds); + FD_ZERO(&efds); + + if (wait & SELECT_EV_READ) { + FD_SET(fileno, &ifds); + } + if (wait & SELECT_EV_WRITE) { + FD_SET(fileno, &ofds); + } + FD_SET(fileno, &efds); + + /* Compute appropriate timeout interval */ + if (timeout < 0.0) { + tvptr = NULL; + } + else + { + tv.tv_sec = (int)timeout; + tv.tv_usec = (int)(((long)timeout * SEC_TO_US) % SEC_TO_US); + tvptr = &tv; + } + + Py_BEGIN_ALLOW_THREADS + errno = 0; + select_rv = select(fileno + 1, &ifds, &ofds, &efds, tvptr); + Py_END_ALLOW_THREADS + + if (select_rv <= 0) { + rv = select_rv; + } + else { + if (FD_ISSET(fileno, &ifds)) { + rv |= SELECT_EV_READ; + } + if (FD_ISSET(fileno, &ofds)) { + rv |= SELECT_EV_WRITE; + } + } + + return rv; +} + +static int +select_raise(int n) +{ +#ifdef MS_WINDOWS + if (n == SOCKET_ERROR) { + PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError()); + return -1; + } +#else + if (n < 0) { + PyErr_SetFromErrno(PyExc_OSError); + return -1; + } +#endif + + PyErr_SetString(PyExc_OSError, "unexpected error from select()"); + return -1; +} + """ + const int SELECT_EV_READ + const int SELECT_EV_WRITE + cdef int select_impl(int fileno, int wait, float timeout) + cdef int select_raise(int e) except -1 + + +def wait_select(gen: PQGen[RV], int fileno, timeout = None) -> RV: + """ + Wait for a generator using select. + """ + cdef float ctimeout + cdef int wait, ready + + if timeout is None or timeout < 0: + ctimeout = -1.0 + else: + ctimeout = float(timeout) + + try: + wait = next(gen) + + while True: + ready = select_impl(fileno, wait, ctimeout) + if ready == 0: + continue + elif ready < 0: + select_raise(ready) + + wait = gen.send(ready) + + except StopIteration as ex: + rv: RV = ex.args[0] if ex.args else None + return rv