]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: consider a connection closed in poll() only if it's not ready too
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 11 Oct 2025 01:26:40 +0000 (03:26 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 13 Oct 2025 11:26:01 +0000 (13:26 +0200)
When closing the proxy and trying to communicate, poll returns state 25
on the connection, i.e. POLLIN | POLLERR | POLLHUP. The connection is
not closed though, so we might end up in weird state ahead (e.g. connection
stuck in ACTIVE).

Because it's suggested that the connection is readable, do read from it.
Likely a proper error will be raised downstream, but we will remain in
consistent state.

psycopg/psycopg/waiting.py
psycopg_c/psycopg_c/_psycopg/waiting.pyx
tests/test_waiting.py
tests/test_waiting_async.py

index 0f3ef92e58c1e6f9cea9cd0267b4befbcf582282..6a6efab0929ebeff7ff95ac81c8a948e1da67425 100644 (file)
@@ -381,16 +381,18 @@ def wait_poll(gen: PQGen[RV], fileno: int, interval: float = 0.0) -> RV:
                 continue
 
             ev = fileevs[0][1]
-            if ev & POLL_BAD:
-                _check_fd_closed(fileno)
-                # Unlikely: the exception should have been raised above
-                raise e.OperationalError("connection socket closed")
 
             ready = 0
             if ev & select.POLLIN:
                 ready = READY_R
             if ev & select.POLLOUT:
                 ready |= READY_W
+
+            if not ready and ev & POLL_BAD:
+                _check_fd_closed(fileno)
+                # Unlikely: the exception should have been raised above
+                raise e.OperationalError("connection socket closed")
+
             s = gen.send(ready)
             evmask = _poll_evmasks[s]
             poll.modify(fileno, evmask)
index bd60399714091abd81f77360c661c8ac5593c960..1c5b69717deb1bb8e72d246862fa52a809b95a38 100644 (file)
@@ -95,13 +95,11 @@ retry_eintr:
 
     rv = 0;  /* success, maybe with timeout */
     if (select_rv >= 0) {
-        if (input_fd.revents & ~(POLLIN | POLLOUT)) {
+        if (input_fd.revents & POLLIN) { rv |= SELECT_EV_READ; }
+        if (input_fd.revents & POLLOUT) { rv |= SELECT_EV_WRITE; }
+        if (!rv && (input_fd.revents & ~(POLLIN | POLLOUT))) {
             rv = CWAIT_SOCKET_ERROR;
         }
-        else {
-            if (input_fd.revents & POLLIN) { rv |= SELECT_EV_READ; }
-            if (input_fd.revents & POLLOUT) { rv |= SELECT_EV_WRITE; }
-        }
     }
 
 #else
index c8c03b88202f3076bb39e8f9414cdd276db42822..fee80a773046b3fd1e2b59b487e07ebdfb12f203 100644 (file)
@@ -375,6 +375,29 @@ def test_socket_closed(dsn, waitfn, pgconn):
     assert dt < 1.0
 
 
+@pytest.mark.parametrize("waitfn", waitfns)
+def test_wait_remote_closed(proxy, conn_cls, waitfn):
+    waitfn = getattr(waiting, waitfn)
+    proxy.start()
+    with conn_cls.connect(proxy.client_dsn, autocommit=True) as conn:
+        conn.pgconn.send_query(b"select 1")
+        proxy.stop()
+        with pytest.raises(psycopg.OperationalError):
+            gen = generators.execute(conn.pgconn)
+            waitfn(gen, conn.pgconn.socket, 0.1)
+
+
+def test_remote_closed(proxy, conn_cls, caplog):
+    caplog.clear()
+    proxy.start()
+    with conn_cls.connect(proxy.client_dsn) as conn:
+        proxy.stop()
+        with pytest.raises(psycopg.OperationalError):
+            conn.execute("select 1")
+
+    assert not caplog.messages
+
+
 @pytest.mark.parametrize("waitfn", waitfns)
 def test_wait_timeout_none_unsupported(waitfn):
     waitfn = getattr(waiting, waitfn)
index dc3cf8cc1af29e7c37acf5aa8f3907bb0cbe5eb5..18d2e58f964894b8c052f49a6d10ca60edd69c2d 100644 (file)
@@ -383,6 +383,29 @@ async def test_socket_closed(dsn, waitfn, pgconn):
     assert dt < 1.0
 
 
+@pytest.mark.parametrize("waitfn", waitfns)
+async def test_wait_remote_closed(proxy, aconn_cls, waitfn):
+    waitfn = getattr(waiting, waitfn)
+    proxy.start()
+    async with await aconn_cls.connect(proxy.client_dsn, autocommit=True) as conn:
+        conn.pgconn.send_query(b"select 1")
+        proxy.stop()
+        with pytest.raises(psycopg.OperationalError):
+            gen = generators.execute(conn.pgconn)
+            await waitfn(gen, conn.pgconn.socket, 0.1)
+
+
+async def test_remote_closed(proxy, aconn_cls, caplog):
+    caplog.clear()
+    proxy.start()
+    async with await aconn_cls.connect(proxy.client_dsn) as conn:
+        proxy.stop()
+        with pytest.raises(psycopg.OperationalError):
+            await conn.execute("select 1")
+
+    assert not caplog.messages
+
+
 @pytest.mark.parametrize("waitfn", waitfns)
 async def test_wait_timeout_none_unsupported(waitfn):
     waitfn = getattr(waiting, waitfn)