From: Mike Bayer Date: Tue, 23 Aug 2022 13:28:06 +0000 (-0400) Subject: integrate connection.terminate() for supporting dialects X-Git-Tag: rel_1_4_41~14 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1c43ad66478905c8d6cf40d9c818fc1ceeb4efbb;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git integrate connection.terminate() for supporting dialects Integrated support for asyncpg's ``terminate()`` method call for cases where the connection pool is recycling a possibly timed-out connection, where a connection is being garbage collected that wasn't gracefully closed, as well as when the connection has been invalidated. This allows asyncpg to abandon the connection without waiting for a response that may incur long timeouts. Fixes: #8419 Change-Id: Ia575af779d5733b483a72dff3690b8bbbad2bb05 (cherry picked from commit 3b7e621aa728d9b01dbac4150e13ea2ef6af35a3) --- diff --git a/doc/build/changelog/unreleased_14/8419.rst b/doc/build/changelog/unreleased_14/8419.rst new file mode 100644 index 0000000000..a095d858d2 --- /dev/null +++ b/doc/build/changelog/unreleased_14/8419.rst @@ -0,0 +1,10 @@ +.. change:: + :tags: bug, asyncio + :tickets: 8419 + + Integrated support for asyncpg's ``terminate()`` method call for cases + where the connection pool is recycling a possibly timed-out connection, + where a connection is being garbage collected that wasn't gracefully + closed, as well as when the connection has been invalidated. This allows + asyncpg to abandon the connection without waiting for a response that may + incur long timeouts. diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 305ad46a32..39b0f544cb 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -748,6 +748,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection): self.await_(self._connection.close()) + def terminate(self): + self._connection.terminate() + class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection): __slots__ = () @@ -891,6 +894,7 @@ class PGDialect_asyncpg(PGDialect): supports_server_side_cursors = True supports_unicode_binds = True + has_terminate = True default_paramstyle = "format" supports_sane_multi_rowcount = False @@ -987,6 +991,9 @@ class PGDialect_asyncpg(PGDialect): def get_deferrable(self, connection): return connection.deferrable + def do_terminate(self, dbapi_connection) -> None: + dbapi_connection.terminate() + def create_connect_args(self, url): opts = url.translate_connect_args(username="user") diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 268a2d6093..6b58c44696 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -231,6 +231,7 @@ class DefaultDialect(interfaces.Dialect): CACHING_DISABLED = CACHING_DISABLED NO_CACHE_KEY = NO_CACHE_KEY NO_DIALECT_SUPPORT = NO_DIALECT_SUPPORT + has_terminate = False @util.deprecated_params( convert_unicode=( @@ -684,6 +685,9 @@ class DefaultDialect(interfaces.Dialect): def do_commit(self, dbapi_connection): dbapi_connection.commit() + def do_terminate(self, dbapi_connection): + self.do_close(dbapi_connection) + def do_close(self, dbapi_connection): dbapi_connection.close() diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 4f2524aef2..4e0ab8e72f 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -583,6 +583,23 @@ class Dialect(object): raise NotImplementedError() + def do_terminate(self, dbapi_connection): + """Provide an implementation of ``connection.close()`` that tries as + much as possible to not block, given a DBAPI + connection. + + In the vast majority of cases this just calls .close(), however + for some asyncio dialects may call upon different API features. + + This hook is called by the :class:`_pool.Pool` + when a connection is being recycled or has been invalidated. + + .. versionadded:: 1.4.41 + + """ + + raise NotImplementedError() + def do_close(self, dbapi_connection): """Provide an implementation of ``connection.close()``, given a DBAPI connection. diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index cde28c2fb0..9f16c65433 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -36,6 +36,7 @@ class _ConnDialect(object): """ is_async = False + has_terminate = False def do_rollback(self, dbapi_connection): dbapi_connection.rollback() @@ -43,6 +44,9 @@ class _ConnDialect(object): def do_commit(self, dbapi_connection): dbapi_connection.commit() + def do_terminate(self, dbapi_connection): + dbapi_connection.close() + def do_close(self, dbapi_connection): dbapi_connection.close() @@ -240,11 +244,17 @@ class Pool(log.Identified): else: return lambda crec: creator() - def _close_connection(self, connection): - self.logger.debug("Closing connection %r", connection) - + def _close_connection(self, connection, terminate=False): + self.logger.debug( + "%s connection %r", + "Hard-closing" if terminate else "Closing", + connection, + ) try: - self._dialect.do_close(connection) + if terminate: + self._dialect.do_terminate(connection) + else: + self._dialect.do_close(connection) except Exception: self.logger.error( "Exception closing connection %r", connection, exc_info=True @@ -584,7 +594,7 @@ class _ConnectionRecord(object): if soft: self._soft_invalidate_time = time.time() else: - self.__close() + self.__close(terminate=True) self.dbapi_connection = None def get_connection(self): @@ -630,7 +640,7 @@ class _ConnectionRecord(object): recycle = True if recycle: - self.__close() + self.__close(terminate=True) self.info.clear() self.__connect() @@ -643,11 +653,13 @@ class _ConnectionRecord(object): or (self._soft_invalidate_time > self.starttime) ) - def __close(self): + def __close(self, terminate=False): self.finalize_callback.clear() if self.__pool.dispatch.close: self.__pool.dispatch.close(self.dbapi_connection, self) - self.__pool._close_connection(self.dbapi_connection) + self.__pool._close_connection( + self.dbapi_connection, terminate=terminate + ) self.dbapi_connection = None def __connect(self): @@ -709,7 +721,9 @@ def _finalize_fairy( dbapi_connection = connection_record.dbapi_connection # null pool is not _is_asyncio but can be used also with async dialects - dont_restore_gced = pool._dialect.is_async + dont_restore_gced = ( + pool._dialect.is_async and not pool._dialect.has_terminate + ) if dont_restore_gced: detach = not connection_record or ref @@ -751,8 +765,10 @@ def _finalize_fairy( else: message = ( "The garbage collector is trying to clean up " - "connection %r. This feature is unsupported on async " - "dbapi, since no IO can be performed at this stage to " + "connection %r. This feature is unsupported on " + "unsupported on asyncio " + 'dbapis that lack a "terminate" feature, ' + "since no IO can be performed at this stage to " "reset the connection. Please close out all " "connections when they are no longer used, calling " "``close()`` or using a context manager to " diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py index 7a0ed6e793..5b0d6c762e 100644 --- a/test/engine/test_logging.py +++ b/test/engine/test_logging.py @@ -468,7 +468,7 @@ class PoolLoggingTest(fixtures.TestBase): "Connection %r checked out from pool", "Connection %r being returned to pool%s", "Connection %s rollback-on-return", - "Closing connection %r", + "%s connection %r", ] + (["Pool disposed. %s"] if dispose else []), ) diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 320a9bb585..879369a9ff 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -92,10 +92,13 @@ class PoolTestBase(fixtures.TestBase): def _queuepool_dbapi_fixture(self, **kw): dbapi = MockDBAPI() _is_asyncio = kw.pop("_is_asyncio", False) + _has_terminate = kw.pop("_has_terminate", False) p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw) if _is_asyncio: p._is_asyncio = True p._dialect = _AsyncConnDialect() + if _has_terminate: + p._dialect.has_terminate = True return dbapi, p @@ -468,8 +471,10 @@ class PoolEventsTest(PoolTestBase): return p, canary - def _checkin_event_fixture(self, _is_asyncio=False): - p = self._queuepool_fixture(_is_asyncio=_is_asyncio) + def _checkin_event_fixture(self, _is_asyncio=False, _has_terminate=False): + p = self._queuepool_fixture( + _is_asyncio=_is_asyncio, _has_terminate=_has_terminate + ) canary = [] @event.listens_for(p, "checkin") @@ -744,9 +749,13 @@ class PoolEventsTest(PoolTestBase): assert canary.call_args_list[0][0][0] is dbapi_con assert canary.call_args_list[0][0][2] is exc - @testing.combinations((True, testing.requires.python3), (False,)) - def test_checkin_event_gc(self, detach_gced): - p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced) + @testing.combinations((True,), (False,), argnames="is_asyncio") + @testing.combinations((True,), (False,), argnames="has_terminate") + @testing.requires.python3 + def test_checkin_event_gc(self, is_asyncio, has_terminate): + p, canary = self._checkin_event_fixture( + _is_asyncio=is_asyncio, _has_terminate=has_terminate + ) c1 = p.connect() @@ -756,6 +765,8 @@ class PoolEventsTest(PoolTestBase): del c1 lazy_gc() + detach_gced = is_asyncio and not has_terminate + if detach_gced: # "close_detached" is not called because for asyncio the # connection is just lost.