]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
perf(c): use poll() instead of select() for waiting, where available
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 29 Oct 2022 23:37:01 +0000 (01:37 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 11 Dec 2022 20:04:31 +0000 (20:04 +0000)
psycopg/psycopg/waiting.py
psycopg_c/psycopg_c/_psycopg.pyi
psycopg_c/psycopg_c/_psycopg/waiting.pyx
tests/test_waiting.py

index 1d4db1fa5ddc34ada03699749487067caf90408a..36f80a1c0a55ddae867c7e0ccd4fe6afa690ad31 100644 (file)
@@ -205,7 +205,7 @@ async def wait_conn_async(gen: PQGenConn[RV], timeout: Optional[float] = None) -
 # Specialised implementation of wait functions.
 
 
-def _wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
     """
     Wait for a generator using select where supported.
     """
@@ -288,14 +288,19 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) ->
         return rv
 
 
-wait_select = _psycopg.wait_select if _psycopg else _wait_select
+if _psycopg:
+    wait_c = _psycopg.wait_c
+
 
 # Choose the best wait strategy for the platform.
 #
 # the selectors objects have a generic interface but come with some overhead,
 # so we also offer more finely tuned implementations.
 
-if selectors.DefaultSelector is getattr(selectors, "SelectSelector", None):
+if _psycopg:
+    wait = wait_c
+
+elif selectors.DefaultSelector is getattr(selectors, "SelectSelector", None):
     # On Windows, SelectSelector should be the default.
     wait = wait_select
 
index fd54d82cc8d08a70af4302c267661e547830ea18..bd7c63d91e4b7412b4a250bda55fec0d273eff25 100644 (file)
@@ -61,7 +61,7 @@ def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ...
 def pipeline_communicate(
     pgconn: PGconn, commands: Deque[abc.PipelineCommand]
 ) -> abc.PQGen[List[List[PGresult]]]: ...
-def wait_select(
+def wait_c(
     gen: abc.PQGen[abc.RV], fileno: int, timeout: Optional[float] = None
 ) -> abc.RV: ...
 
index 3dc068b9085de3573f7368bc8377f9cc94e3db6b..bdecfda72855ac6fa02ecc0596299e25fe3ce1bb 100644 (file)
@@ -6,14 +6,28 @@ C implementation of waiting functions
 
 cdef extern from *:
     """
+#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
+
+#if defined(HAVE_POLL_H)
+#include <poll.h>
+#elif defined(HAVE_SYS_POLL_H)
+#include <sys/poll.h>
+#endif
+
+#else  /* no poll available */
+
 #ifdef MS_WINDOWS
 #include <winsock2.h>
 #else
 #include <sys/select.h>
 #endif
 
+#endif  /* HAVE_POLL */
+
 #define SELECT_EV_READ 1
 #define SELECT_EV_WRITE 2
+
+#define SEC_TO_MS 1000
 #define SEC_TO_US (1000 * 1000)
 
 /* Use select to wait for readiness on fileno.
@@ -21,17 +35,58 @@ cdef extern from *:
  * - Return SELECT_EV_* if the file is ready
  * - Return 0 on timeout
  * - Return -1 (and set an exception) on error.
+ *
+ * The wisdom of this function comes from:
+ *
+ * - PostgreSQL libpq (see src/interfaces/libpq/fe-misc.c)
+ * - Python select module (see Modules/selectmodule.c)
  */
 static int
-select_impl(int fileno, int wait, float timeout)
+wait_c_impl(int fileno, int wait, float timeout)
 {
+    int select_rv;
+    int rv = 0;
+
+#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
+
+    struct pollfd input_fd;
+    int timeout_ms;
+
+    input_fd.fd = fileno;
+    input_fd.events = POLLERR;
+    input_fd.revents = 0;
+
+    if (wait & SELECT_EV_READ) { input_fd.events |= POLLIN; }
+    if (wait & SELECT_EV_WRITE) { input_fd.events |= POLLOUT; }
+
+    if (timeout < 0.0) {
+        timeout_ms = -1;
+    } else {
+        timeout_ms = (int)(timeout * SEC_TO_MS);
+    }
+
+    Py_BEGIN_ALLOW_THREADS
+    errno = 0;
+    select_rv = poll(&input_fd, 1, timeout_ms);
+    Py_END_ALLOW_THREADS
+
+    if (PyErr_CheckSignals()) { goto finally; }
+
+    if (select_rv < 0) {
+        goto error;
+    }
+
+    if (input_fd.events & POLLIN) { rv |= SELECT_EV_READ; }
+    if (input_fd.events & POLLOUT) { rv |= SELECT_EV_WRITE; }
+
+#else
+
     fd_set ifds;
     fd_set ofds;
     fd_set efds;
     struct timeval tv, *tvptr;
-    int select_rv;
 
-#ifdef MS_WINDOWS
+#ifndef MS_WINDOWS
     if (fileno >= 1024) {
         PyErr_SetString(
             PyExc_ValueError,  /* same exception of Python's 'select.select()' */
@@ -44,20 +99,15 @@ select_impl(int fileno, int wait, float timeout)
     FD_ZERO(&ofds);
     FD_ZERO(&efds);
 
-    if (wait & SELECT_EV_READ) {
-        FD_SET(fileno, &ifds);
-    }
-    if (wait & SELECT_EV_WRITE) {
-        FD_SET(fileno, &ofds);
-    }
+    if (wait & SELECT_EV_READ) { FD_SET(fileno, &ifds); }
+    if (wait & SELECT_EV_WRITE) { FD_SET(fileno, &ofds); }
     FD_SET(fileno, &efds);
 
     /* Compute appropriate timeout interval */
     if (timeout < 0.0) {
         tvptr = NULL;
     }
-    else
-    {
+    else {
         tv.tv_sec = (int)timeout;
         tv.tv_usec = (int)(((long)timeout * SEC_TO_US) % SEC_TO_US);
         tvptr = &tv;
@@ -68,49 +118,48 @@ select_impl(int fileno, int wait, float timeout)
     select_rv = select(fileno + 1, &ifds, &ofds, &efds, tvptr);
     Py_END_ALLOW_THREADS
 
-    if (PyErr_CheckSignals()) {
-        return -1;
-    }
+    if (PyErr_CheckSignals()) { goto finally; }
 
     if (select_rv < 0) {
+        goto error;
+    }
+
+    if (FD_ISSET(fileno, &ifds)) { rv |= SELECT_EV_READ; }
+    if (FD_ISSET(fileno, &ofds)) { rv |= SELECT_EV_WRITE; }
+
+#endif  /* HAVE_POLL */
+
+    return rv;
+
+error:
 
 #ifdef MS_WINDOWS
-        if (select_rv == SOCKET_ERROR) {
-            PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError());
-        }
+    if (select_rv == SOCKET_ERROR) {
+        PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError());
+    }
 #else
-        if (select_rv < 0) {
-            PyErr_SetFromErrno(PyExc_OSError);
-        }
-#endif
-        else {
-            PyErr_SetString(PyExc_OSError, "unexpected error from select()");
-        }
-        return -1;
+    if (select_rv < 0) {
+        PyErr_SetFromErrno(PyExc_OSError);
     }
+#endif
     else {
-        int rv = 0;
-
-        if (select_rv >= 0) {
-            if (FD_ISSET(fileno, &ifds)) {
-                rv = SELECT_EV_READ;
-            }
-            if (FD_ISSET(fileno, &ofds)) {
-                rv |= SELECT_EV_WRITE;
-            }
-        }
-        return rv;
+        PyErr_SetString(PyExc_OSError, "unexpected error from select()");
     }
+
+finally:
+
+    return -1;
+
 }
     """
     const int SELECT_EV_READ
     const int SELECT_EV_WRITE
-    cdef int select_impl(int fileno, int wait, float timeout) except -1
+    cdef int wait_c_impl(int fileno, int wait, float timeout) except -1
 
 
-def wait_select(gen: PQGen[RV], int fileno, timeout = None) -> RV:
+def wait_c(gen: PQGen[RV], int fileno, timeout = None) -> RV:
     """
-    Wait for a generator using select.
+    Wait for a generator using poll or select.
     """
     cdef float ctimeout
     cdef int wait, ready
@@ -124,7 +173,7 @@ def wait_select(gen: PQGen[RV], int fileno, timeout = None) -> RV:
         wait = next(gen)
 
         while True:
-            ready = select_impl(fileno, wait, ctimeout)
+            ready = wait_c_impl(fileno, wait, ctimeout)
             if ready == 0:
                 continue
 
index 72c258d7825f4ce78ae197c0f8288b8b7ebcbd80..e187af13acf446626dd06ffc7201d4dd6e4f3f15 100644 (file)
@@ -14,18 +14,15 @@ skip_if_not_linux = pytest.mark.skipif(
 )
 
 waitfns = [
-    pytest.param(waiting.wait, id="wait"),
-    pytest.param(waiting.wait_selector, id="wait_selector"),
+    "wait",
+    "wait_selector",
     pytest.param(
-        waiting.wait_select,
-        id="wait_select",
-        marks=pytest.mark.skipif("not hasattr(select, 'select')"),
+        "wait_select", marks=pytest.mark.skipif("not hasattr(select, 'select')")
     ),
     pytest.param(
-        waiting.wait_epoll,
-        id="wait_epoll",
-        marks=pytest.mark.skipif("not hasattr(select, 'epoll')"),
+        "wait_epoll", marks=pytest.mark.skipif("not hasattr(select, 'epoll')")
     ),
+    pytest.param("wait_c", marks=pytest.mark.skipif("not psycopg._cmodule._psycopg")),
 ]
 
 timeouts = [pytest.param({}, id="blank")]
@@ -49,6 +46,8 @@ def test_wait_conn_bad(dsn):
 @pytest.mark.parametrize("wait, ready", zip(waiting.Wait, waiting.Ready))
 @skip_if_not_linux
 def test_wait_ready(waitfn, wait, ready):
+    waitfn = getattr(waiting, waitfn)
+
     def gen():
         r = yield wait
         return r
@@ -61,6 +60,8 @@ def test_wait_ready(waitfn, wait, ready):
 @pytest.mark.parametrize("waitfn", waitfns)
 @pytest.mark.parametrize("timeout", timeouts)
 def test_wait(pgconn, waitfn, timeout):
+    waitfn = getattr(waiting, waitfn)
+
     pgconn.send_query(b"select 1")
     gen = generators.execute(pgconn)
     (res,) = waitfn(gen, pgconn.socket, **timeout)
@@ -69,6 +70,8 @@ def test_wait(pgconn, waitfn, timeout):
 
 @pytest.mark.parametrize("waitfn", waitfns)
 def test_wait_bad(pgconn, waitfn):
+    waitfn = getattr(waiting, waitfn)
+
     pgconn.send_query(b"select 1")
     gen = generators.execute(pgconn)
     pgconn.finish()
@@ -79,6 +82,8 @@ def test_wait_bad(pgconn, waitfn):
 @pytest.mark.slow
 @pytest.mark.parametrize("waitfn", waitfns)
 def test_wait_large_fd(dsn, waitfn):
+    waitfn = getattr(waiting, waitfn)
+
     files = []
     try:
         try: