]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
perf(c): add C version of select waiting function
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 25 Oct 2022 20:31:11 +0000 (22:31 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 11 Dec 2022 20:04:31 +0000 (20:04 +0000)
'Wait' and 'Ready' enums moved into the '_enums' module to avoid a
circular import.

psycopg/psycopg/_enums.py
psycopg/psycopg/waiting.py
psycopg_c/psycopg_c/_psycopg.pyi
psycopg_c/psycopg_c/_psycopg.pyx
psycopg_c/psycopg_c/_psycopg/generators.pyx
psycopg_c/psycopg_c/_psycopg/waiting.pyx [new file with mode: 0644]

index aeb0ee420b63642fd8ce9b28ef391a357822ba43..a7cb78df4c2123c008f81fc13b849dbf47a332ad 100644 (file)
@@ -8,10 +8,23 @@ libpq-defined enums.
 # Copyright (C) 2020 The Psycopg Team
 
 from enum import Enum, IntEnum
+from selectors import EVENT_READ, EVENT_WRITE
 
 from . import pq
 
 
+class Wait(IntEnum):
+    R = EVENT_READ
+    W = EVENT_WRITE
+    RW = EVENT_READ | EVENT_WRITE
+
+
+class Ready(IntEnum):
+    R = EVENT_READ
+    W = EVENT_WRITE
+    RW = EVENT_READ | EVENT_WRITE
+
+
 class PyFormat(str, Enum):
     """
     Enum representing the format wanted for a query argument.
index 3b13296495a85e213ceee241e2a0c9511526d8ff..1d4db1fa5ddc34ada03699749487067caf90408a 100644 (file)
@@ -11,26 +11,14 @@ These functions are designed to consume the generators returned by the
 
 import select
 import selectors
-from enum import IntEnum
 from typing import Dict, Optional
 from asyncio import get_event_loop, wait_for, Event, TimeoutError
-from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
+from selectors import DefaultSelector
 
 from . import errors as e
 from .abc import PQGen, PQGenConn, RV
-
-
-class Wait(IntEnum):
-    R = EVENT_READ
-    W = EVENT_WRITE
-    RW = EVENT_READ | EVENT_WRITE
-
-
-class Ready(IntEnum):
-    R = EVENT_READ
-    W = EVENT_WRITE
-    RW = EVENT_READ | EVENT_WRITE
-
+from ._enums import Wait as Wait, Ready as Ready  # re-exported
+from ._cmodule import _psycopg
 
 WAIT_R = Wait.R
 WAIT_W = Wait.W
@@ -217,7 +205,7 @@ async def wait_conn_async(gen: PQGenConn[RV], timeout: Optional[float] = None) -
 # Specialised implementation of wait functions.
 
 
-def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+def _wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
     """
     Wait for a generator using select where supported.
     """
@@ -300,6 +288,8 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) ->
         return rv
 
 
+wait_select = _psycopg.wait_select if _psycopg else _wait_select
+
 # Choose the best wait strategy for the platform.
 #
 # the selectors objects have a generic interface but come with some overhead,
index 9bff1ee1ab2be314e056bf2c2e912547d090715a..fd54d82cc8d08a70af4302c267661e547830ea18 100644 (file)
@@ -61,6 +61,9 @@ def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ...
 def pipeline_communicate(
     pgconn: PGconn, commands: Deque[abc.PipelineCommand]
 ) -> abc.PQGen[List[List[PGresult]]]: ...
+def wait_select(
+    gen: abc.PQGen[abc.RV], fileno: int, timeout: Optional[float] = None
+) -> abc.RV: ...
 
 # Copy support
 def format_row_text(
index e4faa808846fe12be28ab623ce6c4659847671e4..9d2b8baedfa483c90906f31f474714bad241c179 100644 (file)
@@ -39,6 +39,7 @@ include "_psycopg/adapt.pyx"
 include "_psycopg/copy.pyx"
 include "_psycopg/generators.pyx"
 include "_psycopg/transform.pyx"
+include "_psycopg/waiting.pyx"
 
 include "types/array.pyx"
 include "types/datetime.pyx"
index e389ef7a03a2b1c16dfb11f104d76544c171e191..29f6fb40e15d919a530269fb5b4dcc5e48baaaa4 100644 (file)
@@ -11,7 +11,7 @@ from typing import List
 from psycopg import errors as e
 from psycopg.pq import abc, error_message
 from psycopg.abc import PipelineCommand, PQGen
-from psycopg.waiting import Wait, Ready
+from psycopg._enums import Wait, Ready
 from psycopg._compat import Deque
 from psycopg._encodings import conninfo_encoding
 
diff --git a/psycopg_c/psycopg_c/_psycopg/waiting.pyx b/psycopg_c/psycopg_c/_psycopg/waiting.pyx
new file mode 100644 (file)
index 0000000..5eccbd0
--- /dev/null
@@ -0,0 +1,118 @@
+"""
+C implementation of waiting functions
+"""
+
+# Copyright (C) 2022 The Psycopg Team
+
+cdef extern from *:
+    """
+#include <sys/select.h>
+
+#define SELECT_EV_READ 1
+#define SELECT_EV_WRITE 2
+#define SEC_TO_US (1000 * 1000)
+
+static int
+select_impl(int fileno, int wait, float timeout)
+{
+    fd_set ifds;
+    fd_set ofds;
+    fd_set efds;
+    struct timeval tv, *tvptr;
+    int select_rv, rv = 0;
+
+    FD_ZERO(&ifds);
+    FD_ZERO(&ofds);
+    FD_ZERO(&efds);
+
+    if (wait & SELECT_EV_READ) {
+        FD_SET(fileno, &ifds);
+    }
+    if (wait & SELECT_EV_WRITE) {
+        FD_SET(fileno, &ofds);
+    }
+    FD_SET(fileno, &efds);
+
+    /* Compute appropriate timeout interval */
+    if (timeout < 0.0) {
+        tvptr = NULL;
+    }
+    else
+    {
+        tv.tv_sec = (int)timeout;
+        tv.tv_usec = (int)(((long)timeout * SEC_TO_US) % SEC_TO_US);
+        tvptr = &tv;
+    }
+
+    Py_BEGIN_ALLOW_THREADS
+    errno = 0;
+    select_rv = select(fileno + 1, &ifds, &ofds, &efds, tvptr);
+    Py_END_ALLOW_THREADS
+
+    if (select_rv <= 0) {
+        rv = select_rv;
+    }
+    else {
+        if (FD_ISSET(fileno, &ifds)) {
+            rv |= SELECT_EV_READ;
+        }
+        if (FD_ISSET(fileno, &ofds)) {
+            rv |= SELECT_EV_WRITE;
+        }
+    }
+
+    return rv;
+}
+
+static int
+select_raise(int n)
+{
+#ifdef MS_WINDOWS
+    if (n == SOCKET_ERROR) {
+        PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError());
+        return -1;
+    }
+#else
+    if (n < 0) {
+        PyErr_SetFromErrno(PyExc_OSError);
+        return -1;
+    }
+#endif
+
+    PyErr_SetString(PyExc_OSError, "unexpected error from select()");
+    return -1;
+}
+    """
+    const int SELECT_EV_READ
+    const int SELECT_EV_WRITE
+    cdef int select_impl(int fileno, int wait, float timeout)
+    cdef int select_raise(int e) except -1
+
+
+def wait_select(gen: PQGen[RV], int fileno, timeout = None) -> RV:
+    """
+    Wait for a generator using select.
+    """
+    cdef float ctimeout
+    cdef int wait, ready
+
+    if timeout is None or timeout < 0:
+        ctimeout = -1.0
+    else:
+        ctimeout = float(timeout)
+
+    try:
+        wait = next(gen)
+
+        while True:
+            ready = select_impl(fileno, wait, ctimeout)
+            if ready == 0:
+                continue
+            elif ready < 0:
+                select_raise(ready)
+
+            wait = gen.send(ready)
+
+    except StopIteration as ex:
+        rv: RV = ex.args[0] if ex.args else None
+        return rv