From e70a0b0a0e52945e5b588b5cffec619a3f3e78a1 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Thu, 30 Nov 2023 09:11:25 -0500 Subject: [PATCH] try to gracefully close even in terminate Adjusted the asyncpg dialect such that when the ``terminate()`` method is used to discard an invalidated connection, the dialect will first attempt to gracefully close the conneciton using ``.close()`` with a timeout, if the operation is proceeding within an async event loop context only. This allows the asyncpg driver to attend to finalizing a ``TimeoutError`` including being able to close a long-running query server side, which otherwise can keep running after the program has exited. Fixes: #10717 Change-Id: Iaba0aeb67873a7a2b3981d43f4eb663005057309 --- doc/build/changelog/unreleased_20/10717.rst | 11 +++++++++++ lib/sqlalchemy/dialects/postgresql/asyncpg.py | 19 ++++++++++++++++++- lib/sqlalchemy/util/concurrency.py | 5 +++++ 3 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 doc/build/changelog/unreleased_20/10717.rst diff --git a/doc/build/changelog/unreleased_20/10717.rst b/doc/build/changelog/unreleased_20/10717.rst new file mode 100644 index 0000000000..2cd9303455 --- /dev/null +++ b/doc/build/changelog/unreleased_20/10717.rst @@ -0,0 +1,11 @@ +.. change:: + :tags: bug, postgresql + :tickets: 10717 + + Adjusted the asyncpg dialect such that when the ``terminate()`` method is + used to discard an invalidated connection, the dialect will first attempt + to gracefully close the conneciton using ``.close()`` with a timeout, if + the operation is proceeding within an async event loop context only. This + allows the asyncpg driver to attend to finalizing a ``TimeoutError`` + including being able to close a long-running query server side, which + otherwise can keep running after the program has exited. diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index d57c94a170..a8fcf15107 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -182,6 +182,7 @@ client using this setting passed to :func:`_asyncio.create_async_engine`:: from __future__ import annotations +import asyncio import collections import decimal import json as _py_json @@ -941,7 +942,23 @@ class AsyncAdapt_asyncpg_connection(AsyncAdapt_dbapi_connection): self.await_(self._connection.close()) def terminate(self): - self._connection.terminate() + if util.concurrency.in_greenlet(): + # in a greenlet; this is the connection was invalidated + # case. + try: + # try to gracefully close; see #10717 + # timeout added in asyncpg 0.14.0 December 2017 + self.await_(self._connection.close(timeout=2)) + except asyncio.TimeoutError: + # in the case where we are recycling an old connection + # that may have already been disconnected, close() will + # fail with the above timeout. in this case, terminate + # the connection without any further waiting. + # see issue #8419 + self._connection.terminate() + else: + # not in a greenlet; this is the gc cleanup case + self._connection.terminate() self._started = False @staticmethod diff --git a/lib/sqlalchemy/util/concurrency.py b/lib/sqlalchemy/util/concurrency.py index 084374040f..df5e03ae19 100644 --- a/lib/sqlalchemy/util/concurrency.py +++ b/lib/sqlalchemy/util/concurrency.py @@ -169,6 +169,11 @@ def _safe_cancel_awaitable(awaitable: Awaitable[Any]) -> None: awaitable.close() +def in_greenlet() -> bool: + current = _concurrency_shim.getcurrent() + return isinstance(current, _concurrency_shim._AsyncIoGreenlet) + + def await_only(awaitable: Awaitable[_T]) -> _T: """Awaits an async function in a sync method. -- 2.47.2