# Specialised implementation of wait functions.
-def _wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
"""
Wait for a generator using select where supported.
"""
return rv
-wait_select = _psycopg.wait_select if _psycopg else _wait_select
+if _psycopg:
+ wait_c = _psycopg.wait_c
+
# Choose the best wait strategy for the platform.
#
# the selectors objects have a generic interface but come with some overhead,
# so we also offer more finely tuned implementations.
-if selectors.DefaultSelector is getattr(selectors, "SelectSelector", None):
+if _psycopg:
+ wait = wait_c
+
+elif selectors.DefaultSelector is getattr(selectors, "SelectSelector", None):
# On Windows, SelectSelector should be the default.
wait = wait_select
cdef extern from *:
"""
+#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
+
+#if defined(HAVE_POLL_H)
+#include <poll.h>
+#elif defined(HAVE_SYS_POLL_H)
+#include <sys/poll.h>
+#endif
+
+#else /* no poll available */
+
#ifdef MS_WINDOWS
#include <winsock2.h>
#else
#include <sys/select.h>
#endif
+#endif /* HAVE_POLL */
+
#define SELECT_EV_READ 1
#define SELECT_EV_WRITE 2
+
+#define SEC_TO_MS 1000
#define SEC_TO_US (1000 * 1000)
/* Use select to wait for readiness on fileno.
* - Return SELECT_EV_* if the file is ready
* - Return 0 on timeout
* - Return -1 (and set an exception) on error.
+ *
+ * The wisdom of this function comes from:
+ *
+ * - PostgreSQL libpq (see src/interfaces/libpq/fe-misc.c)
+ * - Python select module (see Modules/selectmodule.c)
*/
static int
-select_impl(int fileno, int wait, float timeout)
+wait_c_impl(int fileno, int wait, float timeout)
{
+ int select_rv;
+ int rv = 0;
+
+#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
+
+ struct pollfd input_fd;
+ int timeout_ms;
+
+ input_fd.fd = fileno;
+ input_fd.events = POLLERR;
+ input_fd.revents = 0;
+
+ if (wait & SELECT_EV_READ) { input_fd.events |= POLLIN; }
+ if (wait & SELECT_EV_WRITE) { input_fd.events |= POLLOUT; }
+
+ if (timeout < 0.0) {
+ timeout_ms = -1;
+ } else {
+ timeout_ms = (int)(timeout * SEC_TO_MS);
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ errno = 0;
+ select_rv = poll(&input_fd, 1, timeout_ms);
+ Py_END_ALLOW_THREADS
+
+ if (PyErr_CheckSignals()) { goto finally; }
+
+ if (select_rv < 0) {
+ goto error;
+ }
+
+ if (input_fd.events & POLLIN) { rv |= SELECT_EV_READ; }
+ if (input_fd.events & POLLOUT) { rv |= SELECT_EV_WRITE; }
+
+#else
+
fd_set ifds;
fd_set ofds;
fd_set efds;
struct timeval tv, *tvptr;
- int select_rv;
-#ifdef MS_WINDOWS
+#ifndef MS_WINDOWS
if (fileno >= 1024) {
PyErr_SetString(
PyExc_ValueError, /* same exception of Python's 'select.select()' */
FD_ZERO(&ofds);
FD_ZERO(&efds);
- if (wait & SELECT_EV_READ) {
- FD_SET(fileno, &ifds);
- }
- if (wait & SELECT_EV_WRITE) {
- FD_SET(fileno, &ofds);
- }
+ if (wait & SELECT_EV_READ) { FD_SET(fileno, &ifds); }
+ if (wait & SELECT_EV_WRITE) { FD_SET(fileno, &ofds); }
FD_SET(fileno, &efds);
/* Compute appropriate timeout interval */
if (timeout < 0.0) {
tvptr = NULL;
}
- else
- {
+ else {
tv.tv_sec = (int)timeout;
tv.tv_usec = (int)(((long)timeout * SEC_TO_US) % SEC_TO_US);
tvptr = &tv;
select_rv = select(fileno + 1, &ifds, &ofds, &efds, tvptr);
Py_END_ALLOW_THREADS
- if (PyErr_CheckSignals()) {
- return -1;
- }
+ if (PyErr_CheckSignals()) { goto finally; }
if (select_rv < 0) {
+ goto error;
+ }
+
+ if (FD_ISSET(fileno, &ifds)) { rv |= SELECT_EV_READ; }
+ if (FD_ISSET(fileno, &ofds)) { rv |= SELECT_EV_WRITE; }
+
+#endif /* HAVE_POLL */
+
+ return rv;
+
+error:
#ifdef MS_WINDOWS
- if (select_rv == SOCKET_ERROR) {
- PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError());
- }
+ if (select_rv == SOCKET_ERROR) {
+ PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError());
+ }
#else
- if (select_rv < 0) {
- PyErr_SetFromErrno(PyExc_OSError);
- }
-#endif
- else {
- PyErr_SetString(PyExc_OSError, "unexpected error from select()");
- }
- return -1;
+ if (select_rv < 0) {
+ PyErr_SetFromErrno(PyExc_OSError);
}
+#endif
else {
- int rv = 0;
-
- if (select_rv >= 0) {
- if (FD_ISSET(fileno, &ifds)) {
- rv = SELECT_EV_READ;
- }
- if (FD_ISSET(fileno, &ofds)) {
- rv |= SELECT_EV_WRITE;
- }
- }
- return rv;
+ PyErr_SetString(PyExc_OSError, "unexpected error from select()");
}
+
+finally:
+
+ return -1;
+
}
"""
const int SELECT_EV_READ
const int SELECT_EV_WRITE
- cdef int select_impl(int fileno, int wait, float timeout) except -1
+ cdef int wait_c_impl(int fileno, int wait, float timeout) except -1
-def wait_select(gen: PQGen[RV], int fileno, timeout = None) -> RV:
+def wait_c(gen: PQGen[RV], int fileno, timeout = None) -> RV:
"""
- Wait for a generator using select.
+ Wait for a generator using poll or select.
"""
cdef float ctimeout
cdef int wait, ready
wait = next(gen)
while True:
- ready = select_impl(fileno, wait, ctimeout)
+ ready = wait_c_impl(fileno, wait, ctimeout)
if ready == 0:
continue
)
waitfns = [
- pytest.param(waiting.wait, id="wait"),
- pytest.param(waiting.wait_selector, id="wait_selector"),
+ "wait",
+ "wait_selector",
pytest.param(
- waiting.wait_select,
- id="wait_select",
- marks=pytest.mark.skipif("not hasattr(select, 'select')"),
+ "wait_select", marks=pytest.mark.skipif("not hasattr(select, 'select')")
),
pytest.param(
- waiting.wait_epoll,
- id="wait_epoll",
- marks=pytest.mark.skipif("not hasattr(select, 'epoll')"),
+ "wait_epoll", marks=pytest.mark.skipif("not hasattr(select, 'epoll')")
),
+ pytest.param("wait_c", marks=pytest.mark.skipif("not psycopg._cmodule._psycopg")),
]
timeouts = [pytest.param({}, id="blank")]
@pytest.mark.parametrize("wait, ready", zip(waiting.Wait, waiting.Ready))
@skip_if_not_linux
def test_wait_ready(waitfn, wait, ready):
+ waitfn = getattr(waiting, waitfn)
+
def gen():
r = yield wait
return r
@pytest.mark.parametrize("waitfn", waitfns)
@pytest.mark.parametrize("timeout", timeouts)
def test_wait(pgconn, waitfn, timeout):
+ waitfn = getattr(waiting, waitfn)
+
pgconn.send_query(b"select 1")
gen = generators.execute(pgconn)
(res,) = waitfn(gen, pgconn.socket, **timeout)
@pytest.mark.parametrize("waitfn", waitfns)
def test_wait_bad(pgconn, waitfn):
+ waitfn = getattr(waiting, waitfn)
+
pgconn.send_query(b"select 1")
gen = generators.execute(pgconn)
pgconn.finish()
@pytest.mark.slow
@pytest.mark.parametrize("waitfn", waitfns)
def test_wait_large_fd(dsn, waitfn):
+ waitfn = getattr(waiting, waitfn)
+
files = []
try:
try: