From ae43e63400dc7366dde1e26d689cff69238c7d2c Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Tue, 26 Sep 2023 19:24:44 +0200 Subject: [PATCH] fix: use poll() instead of epoll() for waiting epoll() hangs when the fd it listens to is closed. poll() doesn't have this problem (as a consequence, hanging only happened in the Python code, as wait_c is poll-based). --- docs/news.rst | 2 +- psycopg/psycopg/waiting.py | 81 ++++++++++++++++++++++++++++++++------ tests/test_concurrency.py | 4 +- tests/test_waiting.py | 1 + 4 files changed, 73 insertions(+), 15 deletions(-) diff --git a/docs/news.rst b/docs/news.rst index 2c5f81174..3e2cd43d5 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -10,7 +10,7 @@ Psycopg 3.1.12 (unreleased) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ -- Fix hanging if an async connection is closed while querying (:ticket:`#608`). +- Fix possible hanging if a connection is closed while querying (:ticket:`#608`). - Fix memory leak when `~register_*()` functions are called repeatedly (:ticket:`#647`). diff --git a/psycopg/psycopg/waiting.py b/psycopg/psycopg/waiting.py index e31896c89..3416633b9 100644 --- a/psycopg/psycopg/waiting.py +++ b/psycopg/psycopg/waiting.py @@ -13,7 +13,7 @@ import os import sys import select import selectors -from typing import Dict, Optional +from typing import Optional from asyncio import get_event_loop, wait_for, Event, TimeoutError from selectors import DefaultSelector @@ -218,6 +218,8 @@ async def wait_conn_async(gen: PQGenConn[RV], timeout: Optional[float] = None) - def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV: """ Wait for a generator using select where supported. + + BUG: on Linux, can't select on FD >= 1024. On Windows it's fine. """ try: s = next(gen) @@ -246,16 +248,15 @@ def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> return rv -poll_evmasks: Dict[Wait, int] - if hasattr(selectors, "EpollSelector"): - poll_evmasks = { - WAIT_R: select.EPOLLONESHOT | select.EPOLLIN, - WAIT_W: select.EPOLLONESHOT | select.EPOLLOUT, - WAIT_RW: select.EPOLLONESHOT | select.EPOLLIN | select.EPOLLOUT, + _epoll_evmasks = { + WAIT_R: select.EPOLLONESHOT | select.EPOLLIN | select.EPOLLERR, + WAIT_W: select.EPOLLONESHOT | select.EPOLLOUT | select.EPOLLERR, + WAIT_RW: select.EPOLLONESHOT + | (select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR), } else: - poll_evmasks = {} + _epoll_evmasks = {} def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV: @@ -266,6 +267,13 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> strategy is `epoll` then this function will be used instead of `wait`. See also: https://linux.die.net/man/2/epoll_ctl + + BUG: if the connection FD is closed, `epoll.poll()` hangs. Same for + EpollSelector. For this reason, wait_poll() is currently preferable. + To reproduce the bug: + + export PSYCOPG_WAIT_FUNC=wait_epoll + pytest tests/test_concurrency.py::test_concurrent_close """ try: s = next(gen) @@ -276,7 +284,7 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> timeout = int(timeout * 1000.0) with select.epoll() as epoll: - evmask = poll_evmasks[s] + evmask = _epoll_evmasks[s] epoll.register(fileno, evmask) while True: fileevs = None @@ -290,7 +298,7 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> ready |= READY_W # assert s & ready s = gen.send(ready) - evmask = poll_evmasks[s] + evmask = _epoll_evmasks[s] epoll.modify(fileno, evmask) except StopIteration as ex: @@ -298,6 +306,53 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> return rv +if hasattr(selectors, "PollSelector"): + _poll_evmasks = { + WAIT_R: select.POLLIN, + WAIT_W: select.POLLOUT, + WAIT_RW: select.POLLIN | select.POLLOUT, + } +else: + _poll_evmasks = {} + + +def wait_poll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV: + """ + Wait for a generator using poll where supported. + + Parameters are like for `wait()`. + """ + try: + s = next(gen) + + if timeout is None or timeout < 0: + timeout = 0 + else: + timeout = int(timeout * 1000.0) + + poll = select.poll() + evmask = _poll_evmasks[s] + poll.register(fileno, evmask) + while True: + fileevs = None + while not fileevs: + fileevs = poll.poll(timeout) + ev = fileevs[0][1] + ready = 0 + if ev & ~select.POLLOUT: + ready = READY_R + if ev & ~select.POLLIN: + ready |= READY_W + # assert s & ready + s = gen.send(ready) + evmask = _poll_evmasks[s] + poll.modify(fileno, evmask) + + except StopIteration as ex: + rv: RV = ex.args[0] if ex.args else None + return rv + + if _psycopg: wait_c = _psycopg.wait_c @@ -329,8 +384,10 @@ elif selectors.DefaultSelector is getattr(selectors, "SelectSelector", None): # On Windows, SelectSelector should be the default. wait = wait_select -elif selectors.DefaultSelector is getattr(selectors, "EpollSelector", None): - wait = wait_epoll +elif hasattr(selectors, "PollSelector"): + # On linux, EpollSelector is the default. However, it hangs if the fd is + # closed while polling. + wait = wait_poll else: wait = wait_selector diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index 9dbe9ace9..3dcc2fbeb 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -401,8 +401,8 @@ if __name__ == '__main__': reason="Fails with: An operation was attempted on something that is not a socket", ) def test_concurrent_close(dsn, conn): - # Verify something similar to the problem in #608, which doesn't affect - # sync connections anyway. + # Test issue #608: concurrent closing shouldn't hang the server + # (although, at the moment, it doesn't cancel a running query). pid = conn.info.backend_pid conn.autocommit = True diff --git a/tests/test_waiting.py b/tests/test_waiting.py index bcfd348c2..6a9ad88f3 100644 --- a/tests/test_waiting.py +++ b/tests/test_waiting.py @@ -22,6 +22,7 @@ waitfns = [ pytest.param( "wait_epoll", marks=pytest.mark.skipif("not hasattr(select, 'epoll')") ), + pytest.param("wait_poll", marks=pytest.mark.skipif("not hasattr(select, 'poll')")), pytest.param("wait_c", marks=pytest.mark.skipif("not psycopg._cmodule._psycopg")), ] -- 2.47.2