From: Mike Bayer Date: Thu, 30 Nov 2023 14:11:25 +0000 (-0500) Subject: try to gracefully close even in terminate X-Git-Tag: rel_2_0_24~16^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3aaa69118c0b90ae3892cb507ad97677573466c5;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git try to gracefully close even in terminate Adjusted the asyncpg dialect such that when the ``terminate()`` method is used to discard an invalidated connection, the dialect will first attempt to gracefully close the conneciton using ``.close()`` with a timeout, if the operation is proceeding within an async event loop context only. This allows the asyncpg driver to attend to finalizing a ``TimeoutError`` including being able to close a long-running query server side, which otherwise can keep running after the program has exited. Fixes: #10717 Change-Id: Iaba0aeb67873a7a2b3981d43f4eb663005057309 (cherry picked from commit e70a0b0a0e52945e5b588b5cffec619a3f3e78a1) --- diff --git a/doc/build/changelog/unreleased_20/10717.rst b/doc/build/changelog/unreleased_20/10717.rst new file mode 100644 index 0000000000..2cd9303455 --- /dev/null +++ b/doc/build/changelog/unreleased_20/10717.rst @@ -0,0 +1,11 @@ +.. change:: + :tags: bug, postgresql + :tickets: 10717 + + Adjusted the asyncpg dialect such that when the ``terminate()`` method is + used to discard an invalidated connection, the dialect will first attempt + to gracefully close the conneciton using ``.close()`` with a timeout, if + the operation is proceeding within an async event loop context only. This + allows the asyncpg driver to attend to finalizing a ``TimeoutError`` + including being able to close a long-running query server side, which + otherwise can keep running after the program has exited. diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index ca35bf9607..a116a470a7 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -894,7 +894,23 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection): self.await_(self._connection.close()) def terminate(self): - self._connection.terminate() + if util.concurrency.in_greenlet(): + # in a greenlet; this is the connection was invalidated + # case. + try: + # try to gracefully close; see #10717 + # timeout added in asyncpg 0.14.0 December 2017 + self.await_(self._connection.close(timeout=2)) + except asyncio.TimeoutError: + # in the case where we are recycling an old connection + # that may have already been disconnected, close() will + # fail with the above timeout. in this case, terminate + # the connection without any further waiting. + # see issue #8419 + self._connection.terminate() + else: + # not in a greenlet; this is the gc cleanup case + self._connection.terminate() self._started = False @staticmethod diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py index 71d10a6857..83201dd95c 100644 --- a/lib/sqlalchemy/util/_concurrency_py3k.py +++ b/lib/sqlalchemy/util/_concurrency_py3k.py @@ -99,6 +99,11 @@ def _safe_cancel_awaitable(awaitable: Awaitable[Any]) -> None: awaitable.close() +def in_greenlet() -> bool: + current = getcurrent() + return isinstance(current, _AsyncIoGreenlet) + + def await_only(awaitable: Awaitable[_T]) -> _T: """Awaits an async function in a sync method. diff --git a/lib/sqlalchemy/util/concurrency.py b/lib/sqlalchemy/util/concurrency.py index 53a70070b7..1141cbc165 100644 --- a/lib/sqlalchemy/util/concurrency.py +++ b/lib/sqlalchemy/util/concurrency.py @@ -22,6 +22,7 @@ else: have_greenlet = True from ._concurrency_py3k import await_only as await_only from ._concurrency_py3k import await_fallback as await_fallback + from ._concurrency_py3k import in_greenlet as in_greenlet from ._concurrency_py3k import greenlet_spawn as greenlet_spawn from ._concurrency_py3k import is_exit_exception as is_exit_exception from ._concurrency_py3k import AsyncAdaptedLock as AsyncAdaptedLock @@ -56,6 +57,9 @@ if not typing.TYPE_CHECKING and not have_greenlet: def await_fallback(thing): # type: ignore # noqa: F811 return thing + def in_greenlet(): # type: ignore # noqa: F811 + _not_implemented() + def greenlet_spawn(fn, *args, **kw): # type: ignore # noqa: F811 _not_implemented()