]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: use poll() instead of epoll() for waiting
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 26 Sep 2023 17:24:44 +0000 (19:24 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 26 Sep 2023 20:47:35 +0000 (22:47 +0200)
epoll() hangs when the fd it listens to is closed. poll() doesn't have
this problem (as a consequence, hanging only happened in the Python
code, as wait_c is poll-based).

docs/news.rst
psycopg/psycopg/waiting.py
tests/test_concurrency.py
tests/test_waiting.py

index 2c5f81174123d0b7af76fb61e4c118ced0a93c62..3e2cd43d59ac94aa8ebffdd37e852c3469e5e32b 100644 (file)
@@ -10,7 +10,7 @@
 Psycopg 3.1.12 (unreleased)
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-- Fix hanging if an async connection is closed while querying (:ticket:`#608`).
+- Fix possible hanging if a connection is closed while querying (:ticket:`#608`).
 - Fix memory leak when `~register_*()` functions are called repeatedly
   (:ticket:`#647`).
 
index e31896c898ebe09639dc1cf27f8497fcbce9a64f..3416633b9190adf0f92079dee1a8fe76ac5e038a 100644 (file)
@@ -13,7 +13,7 @@ import os
 import sys
 import select
 import selectors
-from typing import Dict, Optional
+from typing import Optional
 from asyncio import get_event_loop, wait_for, Event, TimeoutError
 from selectors import DefaultSelector
 
@@ -218,6 +218,8 @@ async def wait_conn_async(gen: PQGenConn[RV], timeout: Optional[float] = None) -
 def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
     """
     Wait for a generator using select where supported.
+
+    BUG: on Linux, can't select on FD >= 1024. On Windows it's fine.
     """
     try:
         s = next(gen)
@@ -246,16 +248,15 @@ def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) ->
         return rv
 
 
-poll_evmasks: Dict[Wait, int]
-
 if hasattr(selectors, "EpollSelector"):
-    poll_evmasks = {
-        WAIT_R: select.EPOLLONESHOT | select.EPOLLIN,
-        WAIT_W: select.EPOLLONESHOT | select.EPOLLOUT,
-        WAIT_RW: select.EPOLLONESHOT | select.EPOLLIN | select.EPOLLOUT,
+    _epoll_evmasks = {
+        WAIT_R: select.EPOLLONESHOT | select.EPOLLIN | select.EPOLLERR,
+        WAIT_W: select.EPOLLONESHOT | select.EPOLLOUT | select.EPOLLERR,
+        WAIT_RW: select.EPOLLONESHOT
+        | (select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR),
     }
 else:
-    poll_evmasks = {}
+    _epoll_evmasks = {}
 
 
 def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
@@ -266,6 +267,13 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) ->
     strategy is `epoll` then this function will be used instead of `wait`.
 
     See also: https://linux.die.net/man/2/epoll_ctl
+
+    BUG: if the connection FD is closed, `epoll.poll()` hangs. Same for
+    EpollSelector. For this reason, wait_poll() is currently preferable.
+    To reproduce the bug:
+
+        export PSYCOPG_WAIT_FUNC=wait_epoll
+        pytest tests/test_concurrency.py::test_concurrent_close
     """
     try:
         s = next(gen)
@@ -276,7 +284,7 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) ->
             timeout = int(timeout * 1000.0)
 
         with select.epoll() as epoll:
-            evmask = poll_evmasks[s]
+            evmask = _epoll_evmasks[s]
             epoll.register(fileno, evmask)
             while True:
                 fileevs = None
@@ -290,7 +298,7 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) ->
                     ready |= READY_W
                 # assert s & ready
                 s = gen.send(ready)
-                evmask = poll_evmasks[s]
+                evmask = _epoll_evmasks[s]
                 epoll.modify(fileno, evmask)
 
     except StopIteration as ex:
@@ -298,6 +306,53 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) ->
         return rv
 
 
+if hasattr(selectors, "PollSelector"):
+    _poll_evmasks = {
+        WAIT_R: select.POLLIN,
+        WAIT_W: select.POLLOUT,
+        WAIT_RW: select.POLLIN | select.POLLOUT,
+    }
+else:
+    _poll_evmasks = {}
+
+
+def wait_poll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+    """
+    Wait for a generator using poll where supported.
+
+    Parameters are like for `wait()`.
+    """
+    try:
+        s = next(gen)
+
+        if timeout is None or timeout < 0:
+            timeout = 0
+        else:
+            timeout = int(timeout * 1000.0)
+
+        poll = select.poll()
+        evmask = _poll_evmasks[s]
+        poll.register(fileno, evmask)
+        while True:
+            fileevs = None
+            while not fileevs:
+                fileevs = poll.poll(timeout)
+            ev = fileevs[0][1]
+            ready = 0
+            if ev & ~select.POLLOUT:
+                ready = READY_R
+            if ev & ~select.POLLIN:
+                ready |= READY_W
+            # assert s & ready
+            s = gen.send(ready)
+            evmask = _poll_evmasks[s]
+            poll.modify(fileno, evmask)
+
+    except StopIteration as ex:
+        rv: RV = ex.args[0] if ex.args else None
+        return rv
+
+
 if _psycopg:
     wait_c = _psycopg.wait_c
 
@@ -329,8 +384,10 @@ elif selectors.DefaultSelector is getattr(selectors, "SelectSelector", None):
     # On Windows, SelectSelector should be the default.
     wait = wait_select
 
-elif selectors.DefaultSelector is getattr(selectors, "EpollSelector", None):
-    wait = wait_epoll
+elif hasattr(selectors, "PollSelector"):
+    # On linux, EpollSelector is the default. However, it hangs if the fd is
+    # closed while polling.
+    wait = wait_poll
 
 else:
     wait = wait_selector
index 9dbe9ace9e642237f76a793d971aad45ddf51e2b..3dcc2fbeb34bc746241d3ffaccacf30397085b8a 100644 (file)
@@ -401,8 +401,8 @@ if __name__ == '__main__':
     reason="Fails with: An operation was attempted on something that is not a socket",
 )
 def test_concurrent_close(dsn, conn):
-    # Verify something similar to the problem in #608, which doesn't affect
-    # sync connections anyway.
+    # Test issue #608: concurrent closing shouldn't hang the server
+    # (although, at the moment, it doesn't cancel a running query).
     pid = conn.info.backend_pid
     conn.autocommit = True
 
index bcfd348c294fc0bde4fcc6fbfa3464705aa7d5b9..6a9ad88f376155388db44433fc66736302e6d96b 100644 (file)
@@ -22,6 +22,7 @@ waitfns = [
     pytest.param(
         "wait_epoll", marks=pytest.mark.skipif("not hasattr(select, 'epoll')")
     ),
+    pytest.param("wait_poll", marks=pytest.mark.skipif("not hasattr(select, 'poll')")),
     pytest.param("wait_c", marks=pytest.mark.skipif("not psycopg._cmodule._psycopg")),
 ]