From: Daniele Varrazzo Date: Sat, 11 Oct 2025 01:26:40 +0000 (+0200) Subject: fix: consider a connection closed in poll() only if it's not ready too X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=refs%2Fpull%2F1183%2Fhead;p=thirdparty%2Fpsycopg.git fix: consider a connection closed in poll() only if it's not ready too When closing the proxy and trying to communicate, poll returns state 25 on the connection, i.e. POLLIN | POLLERR | POLLHUP. The connection is not closed though, so we might end up in weird state ahead (e.g. connection stuck in ACTIVE). Because it's suggested that the connection is readable, do read from it. Likely a proper error will be raised downstream, but we will remain in consistent state. --- diff --git a/psycopg/psycopg/waiting.py b/psycopg/psycopg/waiting.py index 0f3ef92e5..6a6efab09 100644 --- a/psycopg/psycopg/waiting.py +++ b/psycopg/psycopg/waiting.py @@ -381,16 +381,18 @@ def wait_poll(gen: PQGen[RV], fileno: int, interval: float = 0.0) -> RV: continue ev = fileevs[0][1] - if ev & POLL_BAD: - _check_fd_closed(fileno) - # Unlikely: the exception should have been raised above - raise e.OperationalError("connection socket closed") ready = 0 if ev & select.POLLIN: ready = READY_R if ev & select.POLLOUT: ready |= READY_W + + if not ready and ev & POLL_BAD: + _check_fd_closed(fileno) + # Unlikely: the exception should have been raised above + raise e.OperationalError("connection socket closed") + s = gen.send(ready) evmask = _poll_evmasks[s] poll.modify(fileno, evmask) diff --git a/psycopg_c/psycopg_c/_psycopg/waiting.pyx b/psycopg_c/psycopg_c/_psycopg/waiting.pyx index bd6039971..1c5b69717 100644 --- a/psycopg_c/psycopg_c/_psycopg/waiting.pyx +++ b/psycopg_c/psycopg_c/_psycopg/waiting.pyx @@ -95,13 +95,11 @@ retry_eintr: rv = 0; /* success, maybe with timeout */ if (select_rv >= 0) { - if (input_fd.revents & ~(POLLIN | POLLOUT)) { + if (input_fd.revents & POLLIN) { rv |= SELECT_EV_READ; } + if (input_fd.revents & POLLOUT) { rv |= SELECT_EV_WRITE; } + if (!rv && (input_fd.revents & ~(POLLIN | POLLOUT))) { rv = CWAIT_SOCKET_ERROR; } - else { - if (input_fd.revents & POLLIN) { rv |= SELECT_EV_READ; } - if (input_fd.revents & POLLOUT) { rv |= SELECT_EV_WRITE; } - } } #else diff --git a/tests/test_waiting.py b/tests/test_waiting.py index c8c03b882..fee80a773 100644 --- a/tests/test_waiting.py +++ b/tests/test_waiting.py @@ -375,6 +375,29 @@ def test_socket_closed(dsn, waitfn, pgconn): assert dt < 1.0 +@pytest.mark.parametrize("waitfn", waitfns) +def test_wait_remote_closed(proxy, conn_cls, waitfn): + waitfn = getattr(waiting, waitfn) + proxy.start() + with conn_cls.connect(proxy.client_dsn, autocommit=True) as conn: + conn.pgconn.send_query(b"select 1") + proxy.stop() + with pytest.raises(psycopg.OperationalError): + gen = generators.execute(conn.pgconn) + waitfn(gen, conn.pgconn.socket, 0.1) + + +def test_remote_closed(proxy, conn_cls, caplog): + caplog.clear() + proxy.start() + with conn_cls.connect(proxy.client_dsn) as conn: + proxy.stop() + with pytest.raises(psycopg.OperationalError): + conn.execute("select 1") + + assert not caplog.messages + + @pytest.mark.parametrize("waitfn", waitfns) def test_wait_timeout_none_unsupported(waitfn): waitfn = getattr(waiting, waitfn) diff --git a/tests/test_waiting_async.py b/tests/test_waiting_async.py index dc3cf8cc1..18d2e58f9 100644 --- a/tests/test_waiting_async.py +++ b/tests/test_waiting_async.py @@ -383,6 +383,29 @@ async def test_socket_closed(dsn, waitfn, pgconn): assert dt < 1.0 +@pytest.mark.parametrize("waitfn", waitfns) +async def test_wait_remote_closed(proxy, aconn_cls, waitfn): + waitfn = getattr(waiting, waitfn) + proxy.start() + async with await aconn_cls.connect(proxy.client_dsn, autocommit=True) as conn: + conn.pgconn.send_query(b"select 1") + proxy.stop() + with pytest.raises(psycopg.OperationalError): + gen = generators.execute(conn.pgconn) + await waitfn(gen, conn.pgconn.socket, 0.1) + + +async def test_remote_closed(proxy, aconn_cls, caplog): + caplog.clear() + proxy.start() + async with await aconn_cls.connect(proxy.client_dsn) as conn: + proxy.stop() + with pytest.raises(psycopg.OperationalError): + await conn.execute("select 1") + + assert not caplog.messages + + @pytest.mark.parametrize("waitfn", waitfns) async def test_wait_timeout_none_unsupported(waitfn): waitfn = getattr(waiting, waitfn)