class Ready(IntEnum):
+ NONE = 0
R = EVENT_READ
W = EVENT_WRITE
RW = EVENT_READ | EVENT_WRITE
RV = TypeVar("RV")
-PQGenConn: TypeAlias = Generator[Tuple[int, "Wait"], "Ready", RV]
+PQGenConn: TypeAlias = Generator[Tuple[int, "Wait"], Union["Ready", int], RV]
"""Generator for processes where the connection file number can change.
This can happen in connection and reset, but not in normal querying.
"""
-PQGen: TypeAlias = Generator["Wait", "Ready", RV]
+PQGen: TypeAlias = Generator["Wait", Union["Ready", int], RV]
"""Generator for processes where the connection file number won't change.
"""
functions in the `waiting` module are the ones who wait more or less
cooperatively for the socket to be ready and make these generators continue.
-All these generators yield pairs (fileno, `Wait`) whenever an operation would
-block. The generator can be restarted sending the appropriate `Ready` state
-when the file descriptor is ready.
-
+These generators yield `Wait` objects whenever an operation would block. These
+generators assume the connection fileno will not change. In case of the
+connection function, where the fileno may change, the generators yield pairs
+(fileno, `Wait`).
+
+The generator can be restarted sending the appropriate `Ready` state when the
+file descriptor is ready. If a None value is sent, it means that the wait
+function timed out without any file descriptor becoming ready; in this case the
+generator should probably yield the same value again in order to wait more.
"""
# Copyright (C) 2020 The Psycopg Team
if f == 0:
break
- ready = yield WAIT_RW
+ while True:
+ ready = yield WAIT_RW
+ if ready:
+ break
+
if ready & READY_R:
# This call may read notifies: they will be saved in the
# PGconn buffer and passed to Python later, in `fetch()`.
Return a result from the database (whether success or error).
"""
if pgconn.is_busy():
- yield WAIT_R
+ while True:
+ ready = yield WAIT_R
+ if ready:
+ break
+
while True:
pgconn.consume_input()
if not pgconn.is_busy():
break
- yield WAIT_R
+ while True:
+ ready = yield WAIT_R
+ if ready:
+ break
_consume_notifies(pgconn)
results = []
while True:
- ready = yield WAIT_RW
+ while True:
+ ready = yield WAIT_RW
+ if ready:
+ break
if ready & READY_R:
pgconn.consume_input()
break
# would block
- yield WAIT_R
+ while True:
+ ready = yield WAIT_R
+ if ready:
+ break
pgconn.consume_input()
if nbytes > 0:
# into smaller ones. We prefer to do it there instead of here in order to
# do it upstream the queue decoupling the writer task from the producer one.
while pgconn.put_copy_data(buffer) == 0:
- yield WAIT_W
+ while True:
+ ready = yield WAIT_W
+ if ready:
+ break
def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]:
# Retry enqueuing end copy message until successful
while pgconn.put_copy_end(error) == 0:
- yield WAIT_W
+ while True:
+ ready = yield WAIT_W
+ if ready:
+ break
# Repeat until it the message is flushed to the server
while True:
- yield WAIT_W
+ while True:
+ ready = yield WAIT_W
+ if ready:
+ break
f = pgconn.flush()
if f == 0:
break
WAIT_R = Wait.R
WAIT_W = Wait.W
WAIT_RW = Wait.RW
+READY_NONE = Ready.NONE
READY_R = Ready.R
READY_W = Ready.W
READY_RW = Ready.RW
try:
s = next(gen)
with DefaultSelector() as sel:
+ sel.register(fileno, s)
while True:
- sel.register(fileno, s)
- rlist = None
- while not rlist:
- rlist = sel.select(timeout=timeout)
+ rlist = sel.select(timeout=timeout)
+ if not rlist:
+ gen.send(READY_NONE)
+ continue
+
sel.unregister(fileno)
- # note: this line should require a cast, but mypy doesn't complain
- ready: Ready = rlist[0][1]
- assert s & ready
+ ready = rlist[0][1]
s = gen.send(ready)
+ sel.register(fileno, s)
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
sel.unregister(fileno)
if not rlist:
raise e.ConnectionTimeout("connection timeout expired")
- ready: Ready = rlist[0][1] # type: ignore[assignment]
+ ready = rlist[0][1]
fileno, s = gen.send(ready)
except StopIteration as ex:
# Not sure this is the best implementation but it's a start.
ev = Event()
loop = get_event_loop()
- ready: Ready
+ ready: int
s: Wait
def wakeup(state: Ready) -> None:
nonlocal ready
- ready |= state # type: ignore[assignment]
+ ready |= state
ev.set()
try:
if not reader and not writer:
raise e.InternalError(f"bad poll status: {s}")
ev.clear()
- ready = 0 # type: ignore[assignment]
+ ready = 0
if reader:
loop.add_reader(fileno, wakeup, READY_R)
if writer:
loop.add_writer(fileno, wakeup, READY_W)
try:
- if timeout is None:
- await ev.wait()
- else:
+ if timeout:
try:
await wait_for(ev.wait(), timeout)
except TimeoutError:
pass
+ else:
+ await ev.wait()
finally:
if reader:
loop.remove_reader(fileno)
loop.remove_writer(fileno)
s = gen.send(ready)
+ except OSError as ex:
+ # Assume the connection was closed
+ raise e.OperationalError(str(ex))
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv
if wl:
ready |= READY_W
if not ready:
+ gen.send(READY_NONE)
continue
- # assert s & ready
- s = gen.send(ready) # type: ignore
+
+ s = gen.send(ready)
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
s = next(gen)
if timeout is None or timeout < 0:
- timeout = 0
- else:
- timeout = int(timeout * 1000.0)
+ timeout = 0.0
with select.epoll() as epoll:
evmask = _epoll_evmasks[s]
epoll.register(fileno, evmask)
while True:
- fileevs = None
- while not fileevs:
- fileevs = epoll.poll(timeout)
+ fileevs = epoll.poll(timeout)
+ if not fileevs:
+ gen.send(READY_NONE)
+ continue
ev = fileevs[0][1]
ready = 0
if ev & ~select.EPOLLOUT:
ready = READY_R
if ev & ~select.EPOLLIN:
ready |= READY_W
- # assert s & ready
s = gen.send(ready)
evmask = _epoll_evmasks[s]
epoll.modify(fileno, evmask)
evmask = _poll_evmasks[s]
poll.register(fileno, evmask)
while True:
- fileevs = None
- while not fileevs:
- fileevs = poll.poll(timeout)
+ fileevs = poll.poll(timeout)
+ if not fileevs:
+ gen.send(READY_NONE)
+ continue
+
ev = fileevs[0][1]
ready = 0
if ev & ~select.POLLOUT:
ready = READY_R
if ev & ~select.POLLIN:
ready |= READY_W
- # assert s & ready
s = gen.send(ready)
evmask = _poll_evmasks[s]
poll.modify(fileno, evmask)
cdef object WAIT_W = Wait.W
cdef object WAIT_R = Wait.R
cdef object WAIT_RW = Wait.RW
+cdef object PY_READY_NONE = Ready.NONE
cdef object PY_READY_R = Ready.R
cdef object PY_READY_W = Ready.W
cdef object PY_READY_RW = Ready.RW
+cdef int READY_NONE = Ready.NONE
cdef int READY_R = Ready.R
cdef int READY_W = Ready.W
cdef int READY_RW = Ready.RW
to retrieve the results available.
"""
cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
- cdef int status
+ cdef int ready
cdef int cires
while True:
if pgconn.flush() == 0:
break
- status = yield WAIT_RW
- if status & READY_R:
+ while True:
+ ready = yield WAIT_RW
+ if ready:
+ break
+
+ if ready & READY_R:
with nogil:
# This call may read notifies which will be saved in the
# PGconn buffer and passed to Python later.
cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
cdef int cires, ibres
cdef libpq.PGresult *pgres
+ cdef object ready
with nogil:
ibres = libpq.PQisBusy(pgconn_ptr)
if ibres:
- yield WAIT_R
+ while True:
+ ready = yield WAIT_R
+ if ready:
+ break
+
while True:
with nogil:
cires = libpq.PQconsumeInput(pgconn_ptr)
f"consuming input failed: {error_message(pgconn)}")
if not ibres:
break
- yield WAIT_R
+ while True:
+ ready = yield WAIT_R
+ if ready:
+ break
_consume_notifies(pgconn)
cdef pq.PGresult r
while True:
- ready = yield WAIT_RW
+ while True:
+ ready = yield WAIT_RW
+ if ready:
+ break
if ready & READY_R:
with nogil:
wait_c_impl(int fileno, int wait, float timeout)
{
int select_rv;
- int rv = 0;
+ int rv = -1;
#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
goto retry_eintr;
}
- if (select_rv < 0) { goto error; }
if (PyErr_CheckSignals()) { goto finally; }
+ if (select_rv < 0) { goto finally; } /* poll error */
- if (input_fd.events & POLLIN) { rv |= SELECT_EV_READ; }
- if (input_fd.events & POLLOUT) { rv |= SELECT_EV_WRITE; }
+ rv = 0; /* success, maybe with timeout */
+ if (select_rv >= 0) {
+ if (input_fd.events & POLLIN) { rv |= SELECT_EV_READ; }
+ if (input_fd.events & POLLOUT) { rv |= SELECT_EV_WRITE; }
+ }
#else
goto retry_eintr;
}
- if (select_rv < 0) { goto error; }
if (PyErr_CheckSignals()) { goto finally; }
+ if (select_rv < 0) { goto error; } /* select error */
- if (FD_ISSET(fileno, &ifds)) { rv |= SELECT_EV_READ; }
- if (FD_ISSET(fileno, &ofds)) { rv |= SELECT_EV_WRITE; }
+ rv = 0;
+ if (select_rv > 0) {
+ if (FD_ISSET(fileno, &ifds)) { rv |= SELECT_EV_READ; }
+ if (FD_ISSET(fileno, &ofds)) { rv |= SELECT_EV_WRITE; }
+ }
#endif /* HAVE_POLL */
error:
+ rv = -1;
+
#ifdef MS_WINDOWS
if (select_rv == SOCKET_ERROR) {
PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError());
finally:
- return -1;
+ return rv;
}
"""
while True:
ready = wait_c_impl(fileno, wait, ctimeout)
- if ready == 0:
- continue
+ if ready == READY_NONE:
+ pyready = <PyObject *>PY_READY_NONE
elif ready == READY_R:
pyready = <PyObject *>PY_READY_R
elif ready == READY_RW:
+import sys
+import time
import select # noqa: used in pytest.mark.skipif
import socket
-import sys
import pytest
pytest.param("wait_c", marks=pytest.mark.skipif("not psycopg._cmodule._psycopg")),
]
+events = ["R", "W", "RW"]
timeouts = [pytest.param({}, id="blank")]
timeouts += [pytest.param({"timeout": x}, id=str(x)) for x in [None, 0, 0.2, 10]]
@pytest.mark.parametrize("waitfn", waitfns)
-@pytest.mark.parametrize("wait, ready", zip(waiting.Wait, waiting.Ready))
+@pytest.mark.parametrize("event", events)
@skip_if_not_linux
-def test_wait_ready(waitfn, wait, ready):
+def test_wait_ready(waitfn, event):
+ wait = getattr(waiting.Wait, event)
+ ready = getattr(waiting.Ready, event)
waitfn = getattr(waiting, waitfn)
def gen():
waitfn(gen, pgconn.socket)
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.parametrize("waitfn", waitfns)
+def test_wait_timeout(pgconn, waitfn):
+ waitfn = getattr(waiting, waitfn)
+
+ pgconn.send_query(b"select pg_sleep(0.5)")
+ gen = generators.execute(pgconn)
+
+ ts = [time.time()]
+
+ def gen_wrapper():
+ try:
+ for x in gen:
+ res = yield x
+ ts.append(time.time())
+ gen.send(res)
+ except StopIteration as ex:
+ return ex.value
+
+ (res,) = waitfn(gen_wrapper(), pgconn.socket, timeout=0.1)
+ assert res.status == ExecStatus.TUPLES_OK
+ ds = [t1 - t0 for t0, t1 in zip(ts[:-1], ts[1:])]
+ assert len(ds) >= 5
+ for d in ds[:5]:
+ assert d == pytest.approx(0.1, 0.05)
+
+
@pytest.mark.slow
@pytest.mark.skipif(
"sys.platform == 'win32'", reason="win32 works ok, but FDs are mysterious"
@pytest.mark.anyio
-@pytest.mark.parametrize("wait, ready", zip(waiting.Wait, waiting.Ready))
+@pytest.mark.parametrize("event", events)
@skip_if_not_linux
-async def test_wait_ready_async(wait, ready):
+async def test_wait_ready_async(event):
+ wait = getattr(waiting.Wait, event)
+ ready = getattr(waiting.Ready, event)
+
def gen():
r = yield wait
return r