From: Daniele Varrazzo Date: Sat, 29 Oct 2022 23:37:01 +0000 (+0200) Subject: perf(c): use poll() instead of select() for waiting, where available X-Git-Tag: 3.1.5~7^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=76ef0bd2abacffb586ced8d25336c45357c9ca30;p=thirdparty%2Fpsycopg.git perf(c): use poll() instead of select() for waiting, where available --- diff --git a/psycopg/psycopg/waiting.py b/psycopg/psycopg/waiting.py index 1d4db1fa5..36f80a1c0 100644 --- a/psycopg/psycopg/waiting.py +++ b/psycopg/psycopg/waiting.py @@ -205,7 +205,7 @@ async def wait_conn_async(gen: PQGenConn[RV], timeout: Optional[float] = None) - # Specialised implementation of wait functions. -def _wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV: +def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV: """ Wait for a generator using select where supported. """ @@ -288,14 +288,19 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> return rv -wait_select = _psycopg.wait_select if _psycopg else _wait_select +if _psycopg: + wait_c = _psycopg.wait_c + # Choose the best wait strategy for the platform. # # the selectors objects have a generic interface but come with some overhead, # so we also offer more finely tuned implementations. -if selectors.DefaultSelector is getattr(selectors, "SelectSelector", None): +if _psycopg: + wait = wait_c + +elif selectors.DefaultSelector is getattr(selectors, "SelectSelector", None): # On Windows, SelectSelector should be the default. wait = wait_select diff --git a/psycopg_c/psycopg_c/_psycopg.pyi b/psycopg_c/psycopg_c/_psycopg.pyi index fd54d82cc..bd7c63d91 100644 --- a/psycopg_c/psycopg_c/_psycopg.pyi +++ b/psycopg_c/psycopg_c/_psycopg.pyi @@ -61,7 +61,7 @@ def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ... def pipeline_communicate( pgconn: PGconn, commands: Deque[abc.PipelineCommand] ) -> abc.PQGen[List[List[PGresult]]]: ... -def wait_select( +def wait_c( gen: abc.PQGen[abc.RV], fileno: int, timeout: Optional[float] = None ) -> abc.RV: ... diff --git a/psycopg_c/psycopg_c/_psycopg/waiting.pyx b/psycopg_c/psycopg_c/_psycopg/waiting.pyx index 3dc068b90..bdecfda72 100644 --- a/psycopg_c/psycopg_c/_psycopg/waiting.pyx +++ b/psycopg_c/psycopg_c/_psycopg/waiting.pyx @@ -6,14 +6,28 @@ C implementation of waiting functions cdef extern from *: """ +#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL) + +#if defined(HAVE_POLL_H) +#include +#elif defined(HAVE_SYS_POLL_H) +#include +#endif + +#else /* no poll available */ + #ifdef MS_WINDOWS #include #else #include #endif +#endif /* HAVE_POLL */ + #define SELECT_EV_READ 1 #define SELECT_EV_WRITE 2 + +#define SEC_TO_MS 1000 #define SEC_TO_US (1000 * 1000) /* Use select to wait for readiness on fileno. @@ -21,17 +35,58 @@ cdef extern from *: * - Return SELECT_EV_* if the file is ready * - Return 0 on timeout * - Return -1 (and set an exception) on error. + * + * The wisdom of this function comes from: + * + * - PostgreSQL libpq (see src/interfaces/libpq/fe-misc.c) + * - Python select module (see Modules/selectmodule.c) */ static int -select_impl(int fileno, int wait, float timeout) +wait_c_impl(int fileno, int wait, float timeout) { + int select_rv; + int rv = 0; + +#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL) + + struct pollfd input_fd; + int timeout_ms; + + input_fd.fd = fileno; + input_fd.events = POLLERR; + input_fd.revents = 0; + + if (wait & SELECT_EV_READ) { input_fd.events |= POLLIN; } + if (wait & SELECT_EV_WRITE) { input_fd.events |= POLLOUT; } + + if (timeout < 0.0) { + timeout_ms = -1; + } else { + timeout_ms = (int)(timeout * SEC_TO_MS); + } + + Py_BEGIN_ALLOW_THREADS + errno = 0; + select_rv = poll(&input_fd, 1, timeout_ms); + Py_END_ALLOW_THREADS + + if (PyErr_CheckSignals()) { goto finally; } + + if (select_rv < 0) { + goto error; + } + + if (input_fd.events & POLLIN) { rv |= SELECT_EV_READ; } + if (input_fd.events & POLLOUT) { rv |= SELECT_EV_WRITE; } + +#else + fd_set ifds; fd_set ofds; fd_set efds; struct timeval tv, *tvptr; - int select_rv; -#ifdef MS_WINDOWS +#ifndef MS_WINDOWS if (fileno >= 1024) { PyErr_SetString( PyExc_ValueError, /* same exception of Python's 'select.select()' */ @@ -44,20 +99,15 @@ select_impl(int fileno, int wait, float timeout) FD_ZERO(&ofds); FD_ZERO(&efds); - if (wait & SELECT_EV_READ) { - FD_SET(fileno, &ifds); - } - if (wait & SELECT_EV_WRITE) { - FD_SET(fileno, &ofds); - } + if (wait & SELECT_EV_READ) { FD_SET(fileno, &ifds); } + if (wait & SELECT_EV_WRITE) { FD_SET(fileno, &ofds); } FD_SET(fileno, &efds); /* Compute appropriate timeout interval */ if (timeout < 0.0) { tvptr = NULL; } - else - { + else { tv.tv_sec = (int)timeout; tv.tv_usec = (int)(((long)timeout * SEC_TO_US) % SEC_TO_US); tvptr = &tv; @@ -68,49 +118,48 @@ select_impl(int fileno, int wait, float timeout) select_rv = select(fileno + 1, &ifds, &ofds, &efds, tvptr); Py_END_ALLOW_THREADS - if (PyErr_CheckSignals()) { - return -1; - } + if (PyErr_CheckSignals()) { goto finally; } if (select_rv < 0) { + goto error; + } + + if (FD_ISSET(fileno, &ifds)) { rv |= SELECT_EV_READ; } + if (FD_ISSET(fileno, &ofds)) { rv |= SELECT_EV_WRITE; } + +#endif /* HAVE_POLL */ + + return rv; + +error: #ifdef MS_WINDOWS - if (select_rv == SOCKET_ERROR) { - PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError()); - } + if (select_rv == SOCKET_ERROR) { + PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError()); + } #else - if (select_rv < 0) { - PyErr_SetFromErrno(PyExc_OSError); - } -#endif - else { - PyErr_SetString(PyExc_OSError, "unexpected error from select()"); - } - return -1; + if (select_rv < 0) { + PyErr_SetFromErrno(PyExc_OSError); } +#endif else { - int rv = 0; - - if (select_rv >= 0) { - if (FD_ISSET(fileno, &ifds)) { - rv = SELECT_EV_READ; - } - if (FD_ISSET(fileno, &ofds)) { - rv |= SELECT_EV_WRITE; - } - } - return rv; + PyErr_SetString(PyExc_OSError, "unexpected error from select()"); } + +finally: + + return -1; + } """ const int SELECT_EV_READ const int SELECT_EV_WRITE - cdef int select_impl(int fileno, int wait, float timeout) except -1 + cdef int wait_c_impl(int fileno, int wait, float timeout) except -1 -def wait_select(gen: PQGen[RV], int fileno, timeout = None) -> RV: +def wait_c(gen: PQGen[RV], int fileno, timeout = None) -> RV: """ - Wait for a generator using select. + Wait for a generator using poll or select. """ cdef float ctimeout cdef int wait, ready @@ -124,7 +173,7 @@ def wait_select(gen: PQGen[RV], int fileno, timeout = None) -> RV: wait = next(gen) while True: - ready = select_impl(fileno, wait, ctimeout) + ready = wait_c_impl(fileno, wait, ctimeout) if ready == 0: continue diff --git a/tests/test_waiting.py b/tests/test_waiting.py index 72c258d78..e187af13a 100644 --- a/tests/test_waiting.py +++ b/tests/test_waiting.py @@ -14,18 +14,15 @@ skip_if_not_linux = pytest.mark.skipif( ) waitfns = [ - pytest.param(waiting.wait, id="wait"), - pytest.param(waiting.wait_selector, id="wait_selector"), + "wait", + "wait_selector", pytest.param( - waiting.wait_select, - id="wait_select", - marks=pytest.mark.skipif("not hasattr(select, 'select')"), + "wait_select", marks=pytest.mark.skipif("not hasattr(select, 'select')") ), pytest.param( - waiting.wait_epoll, - id="wait_epoll", - marks=pytest.mark.skipif("not hasattr(select, 'epoll')"), + "wait_epoll", marks=pytest.mark.skipif("not hasattr(select, 'epoll')") ), + pytest.param("wait_c", marks=pytest.mark.skipif("not psycopg._cmodule._psycopg")), ] timeouts = [pytest.param({}, id="blank")] @@ -49,6 +46,8 @@ def test_wait_conn_bad(dsn): @pytest.mark.parametrize("wait, ready", zip(waiting.Wait, waiting.Ready)) @skip_if_not_linux def test_wait_ready(waitfn, wait, ready): + waitfn = getattr(waiting, waitfn) + def gen(): r = yield wait return r @@ -61,6 +60,8 @@ def test_wait_ready(waitfn, wait, ready): @pytest.mark.parametrize("waitfn", waitfns) @pytest.mark.parametrize("timeout", timeouts) def test_wait(pgconn, waitfn, timeout): + waitfn = getattr(waiting, waitfn) + pgconn.send_query(b"select 1") gen = generators.execute(pgconn) (res,) = waitfn(gen, pgconn.socket, **timeout) @@ -69,6 +70,8 @@ def test_wait(pgconn, waitfn, timeout): @pytest.mark.parametrize("waitfn", waitfns) def test_wait_bad(pgconn, waitfn): + waitfn = getattr(waiting, waitfn) + pgconn.send_query(b"select 1") gen = generators.execute(pgconn) pgconn.finish() @@ -79,6 +82,8 @@ def test_wait_bad(pgconn, waitfn): @pytest.mark.slow @pytest.mark.parametrize("waitfn", waitfns) def test_wait_large_fd(dsn, waitfn): + waitfn = getattr(waiting, waitfn) + files = [] try: try: