# Copyright (C) 2020 The Psycopg Team
from enum import Enum, IntEnum
+from selectors import EVENT_READ, EVENT_WRITE
from . import pq
+class Wait(IntEnum):
+ R = EVENT_READ
+ W = EVENT_WRITE
+ RW = EVENT_READ | EVENT_WRITE
+
+
+class Ready(IntEnum):
+ R = EVENT_READ
+ W = EVENT_WRITE
+ RW = EVENT_READ | EVENT_WRITE
+
+
class PyFormat(str, Enum):
"""
Enum representing the format wanted for a query argument.
import select
import selectors
-from enum import IntEnum
from typing import Dict, Optional
from asyncio import get_event_loop, wait_for, Event, TimeoutError
-from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
+from selectors import DefaultSelector
from . import errors as e
from .abc import PQGen, PQGenConn, RV
-
-
-class Wait(IntEnum):
- R = EVENT_READ
- W = EVENT_WRITE
- RW = EVENT_READ | EVENT_WRITE
-
-
-class Ready(IntEnum):
- R = EVENT_READ
- W = EVENT_WRITE
- RW = EVENT_READ | EVENT_WRITE
-
+from ._enums import Wait as Wait, Ready as Ready # re-exported
+from ._cmodule import _psycopg
WAIT_R = Wait.R
WAIT_W = Wait.W
# Specialised implementation of wait functions.
-def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+def _wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
"""
Wait for a generator using select where supported.
"""
return rv
+wait_select = _psycopg.wait_select if _psycopg else _wait_select
+
# Choose the best wait strategy for the platform.
#
# the selectors objects have a generic interface but come with some overhead,
def pipeline_communicate(
pgconn: PGconn, commands: Deque[abc.PipelineCommand]
) -> abc.PQGen[List[List[PGresult]]]: ...
+def wait_select(
+ gen: abc.PQGen[abc.RV], fileno: int, timeout: Optional[float] = None
+) -> abc.RV: ...
# Copy support
def format_row_text(
include "_psycopg/copy.pyx"
include "_psycopg/generators.pyx"
include "_psycopg/transform.pyx"
+include "_psycopg/waiting.pyx"
include "types/array.pyx"
include "types/datetime.pyx"
from psycopg import errors as e
from psycopg.pq import abc, error_message
from psycopg.abc import PipelineCommand, PQGen
-from psycopg.waiting import Wait, Ready
+from psycopg._enums import Wait, Ready
from psycopg._compat import Deque
from psycopg._encodings import conninfo_encoding
--- /dev/null
+"""
+C implementation of waiting functions
+"""
+
+# Copyright (C) 2022 The Psycopg Team
+
+cdef extern from *:
+ """
+#include <sys/select.h>
+
+#define SELECT_EV_READ 1
+#define SELECT_EV_WRITE 2
+#define SEC_TO_US (1000 * 1000)
+
+static int
+select_impl(int fileno, int wait, float timeout)
+{
+ fd_set ifds;
+ fd_set ofds;
+ fd_set efds;
+ struct timeval tv, *tvptr;
+ int select_rv, rv = 0;
+
+ FD_ZERO(&ifds);
+ FD_ZERO(&ofds);
+ FD_ZERO(&efds);
+
+ if (wait & SELECT_EV_READ) {
+ FD_SET(fileno, &ifds);
+ }
+ if (wait & SELECT_EV_WRITE) {
+ FD_SET(fileno, &ofds);
+ }
+ FD_SET(fileno, &efds);
+
+ /* Compute appropriate timeout interval */
+ if (timeout < 0.0) {
+ tvptr = NULL;
+ }
+ else
+ {
+ tv.tv_sec = (int)timeout;
+ tv.tv_usec = (int)(((long)timeout * SEC_TO_US) % SEC_TO_US);
+ tvptr = &tv;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ errno = 0;
+ select_rv = select(fileno + 1, &ifds, &ofds, &efds, tvptr);
+ Py_END_ALLOW_THREADS
+
+ if (select_rv <= 0) {
+ rv = select_rv;
+ }
+ else {
+ if (FD_ISSET(fileno, &ifds)) {
+ rv |= SELECT_EV_READ;
+ }
+ if (FD_ISSET(fileno, &ofds)) {
+ rv |= SELECT_EV_WRITE;
+ }
+ }
+
+ return rv;
+}
+
+static int
+select_raise(int n)
+{
+#ifdef MS_WINDOWS
+ if (n == SOCKET_ERROR) {
+ PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError());
+ return -1;
+ }
+#else
+ if (n < 0) {
+ PyErr_SetFromErrno(PyExc_OSError);
+ return -1;
+ }
+#endif
+
+ PyErr_SetString(PyExc_OSError, "unexpected error from select()");
+ return -1;
+}
+ """
+ const int SELECT_EV_READ
+ const int SELECT_EV_WRITE
+ cdef int select_impl(int fileno, int wait, float timeout)
+ cdef int select_raise(int e) except -1
+
+
+def wait_select(gen: PQGen[RV], int fileno, timeout = None) -> RV:
+ """
+ Wait for a generator using select.
+ """
+ cdef float ctimeout
+ cdef int wait, ready
+
+ if timeout is None or timeout < 0:
+ ctimeout = -1.0
+ else:
+ ctimeout = float(timeout)
+
+ try:
+ wait = next(gen)
+
+ while True:
+ ready = select_impl(fileno, wait, ctimeout)
+ if ready == 0:
+ continue
+ elif ready < 0:
+ select_raise(ready)
+
+ wait = gen.send(ready)
+
+ except StopIteration as ex:
+ rv: RV = ex.args[0] if ex.args else None
+ return rv