From: Mike Bayer Date: Tue, 23 Aug 2022 13:28:06 +0000 (-0400) Subject: integrate connection.terminate() for supporting dialects X-Git-Tag: rel_2_0_0b1~92^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=776abf43d7404a3fa165588fd1e1e2d5ef9a9f04;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git integrate connection.terminate() for supporting dialects Integrated support for asyncpg's ``terminate()`` method call for cases where the connection pool is recycling a possibly timed-out connection, where a connection is being garbage collected that wasn't gracefully closed, as well as when the connection has been invalidated. This allows asyncpg to abandon the connection without waiting for a response that may incur long timeouts. Fixes: #8419 Change-Id: Ia575af779d5733b483a72dff3690b8bbbad2bb05 --- diff --git a/doc/build/changelog/unreleased_14/8419.rst b/doc/build/changelog/unreleased_14/8419.rst new file mode 100644 index 0000000000..a095d858d2 --- /dev/null +++ b/doc/build/changelog/unreleased_14/8419.rst @@ -0,0 +1,10 @@ +.. change:: + :tags: bug, asyncio + :tickets: 8419 + + Integrated support for asyncpg's ``terminate()`` method call for cases + where the connection pool is recycling a possibly timed-out connection, + where a connection is being garbage collected that wasn't gracefully + closed, as well as when the connection has been invalidated. This allows + asyncpg to abandon the connection without waiting for a response that may + incur long timeouts. diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 6888959f0a..a84bece4fd 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -793,6 +793,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection): self.await_(self._connection.close()) + def terminate(self): + self._connection.terminate() + class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection): __slots__ = () @@ -895,6 +898,7 @@ class PGDialect_asyncpg(PGDialect): supports_server_side_cursors = True render_bind_cast = True + has_terminate = True default_paramstyle = "format" supports_sane_multi_rowcount = False @@ -981,6 +985,9 @@ class PGDialect_asyncpg(PGDialect): def get_deferrable(self, connection): return connection.deferrable + def do_terminate(self, dbapi_connection) -> None: + dbapi_connection.terminate() + def create_connect_args(self, url): opts = url.translate_connect_args(username="user") diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 80e687c329..9ad0ebbfc0 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -237,6 +237,8 @@ class DefaultDialect(Dialect): is_async = False + has_terminate = False + # TODO: this is not to be part of 2.0. implement rudimentary binary # literals for SQLite, PostgreSQL, MySQL only within # _Binary.literal_processor @@ -620,6 +622,9 @@ class DefaultDialect(Dialect): def do_commit(self, dbapi_connection): dbapi_connection.commit() + def do_terminate(self, dbapi_connection): + self.do_close(dbapi_connection) + def do_close(self, dbapi_connection): dbapi_connection.close() diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 778c07592f..01b266d68a 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -966,6 +966,10 @@ class Dialect(EventTarget): is_async: bool """Whether or not this dialect is intended for asyncio use.""" + has_terminate: bool + """Whether or not this dialect has a separate "terminate" implementation + that does not block or require awaiting.""" + engine_config_types: Mapping[str, Any] """a mapping of string keys that can be in an engine config linked to type conversion functions. @@ -1784,6 +1788,23 @@ class Dialect(EventTarget): raise NotImplementedError() + def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: + """Provide an implementation of ``connection.close()`` that tries as + much as possible to not block, given a DBAPI + connection. + + In the vast majority of cases this just calls .close(), however + for some asyncio dialects may call upon different API features. + + This hook is called by the :class:`_pool.Pool` + when a connection is being recycled or has been invalidated. + + .. versionadded:: 1.4.41 + + """ + + raise NotImplementedError() + def do_close(self, dbapi_connection: DBAPIConnection) -> None: """Provide an implementation of ``connection.close()``, given a DBAPI connection. diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index 51eb5e9f5e..41f2f03b27 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -72,6 +72,7 @@ class _ConnDialect: """ is_async = False + has_terminate = False def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None: dbapi_connection.rollback() @@ -79,6 +80,9 @@ class _ConnDialect: def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None: dbapi_connection.commit() + def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: + dbapi_connection.close() + def do_close(self, dbapi_connection: DBAPIConnection) -> None: dbapi_connection.close() @@ -310,10 +314,19 @@ class Pool(log.Identified, event.EventTarget): creator_fn = cast(_CreatorFnType, creator) return lambda rec: creator_fn() - def _close_connection(self, connection: DBAPIConnection) -> None: - self.logger.debug("Closing connection %r", connection) + def _close_connection( + self, connection: DBAPIConnection, *, terminate: bool = False + ) -> None: + self.logger.debug( + "%s connection %r", + "Hard-closing" if terminate else "Closing", + connection, + ) try: - self._dialect.do_close(connection) + if terminate: + self._dialect.do_terminate(connection) + else: + self._dialect.do_close(connection) except Exception: self.logger.error( "Exception closing connection %r", connection, exc_info=True @@ -742,7 +755,7 @@ class _ConnectionRecord(ConnectionPoolEntry): if soft: self._soft_invalidate_time = time.time() else: - self.__close() + self.__close(terminate=True) self.dbapi_connection = None def get_connection(self) -> DBAPIConnection: @@ -789,7 +802,7 @@ class _ConnectionRecord(ConnectionPoolEntry): recycle = True if recycle: - self.__close() + self.__close(terminate=True) self.info.clear() # type: ignore # our info is always present self.__connect() @@ -804,12 +817,14 @@ class _ConnectionRecord(ConnectionPoolEntry): or (self._soft_invalidate_time > self.starttime) ) - def __close(self) -> None: + def __close(self, *, terminate: bool = False) -> None: self.finalize_callback.clear() if self.__pool.dispatch.close: self.__pool.dispatch.close(self.dbapi_connection, self) assert self.dbapi_connection is not None - self.__pool._close_connection(self.dbapi_connection) + self.__pool._close_connection( + self.dbapi_connection, terminate=terminate + ) self.dbapi_connection = None def __connect(self) -> None: @@ -877,7 +892,9 @@ def _finalize_fairy( dbapi_connection = connection_record.dbapi_connection # null pool is not _is_asyncio but can be used also with async dialects - dont_restore_gced = pool._dialect.is_async + dont_restore_gced = ( + pool._dialect.is_async and not pool._dialect.has_terminate + ) if dont_restore_gced: detach = connection_record is None or is_gc_cleanup @@ -923,8 +940,9 @@ def _finalize_fairy( message = ( "The garbage collector is trying to clean up " f"connection {dbapi_connection!r}. This feature is " - "unsupported on async " - "dbapi, since no IO can be performed at this stage to " + "unsupported on asyncio " + 'dbapis that lack a "terminate" feature, since no ' + "IO can be performed at this stage to " "reset the connection. Please close out all " "connections when they are no longer used, calling " "``close()`` or using a context manager to " diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py index c6fd856847..38e1c436c0 100644 --- a/test/engine/test_logging.py +++ b/test/engine/test_logging.py @@ -453,7 +453,7 @@ class PoolLoggingTest(fixtures.TestBase): "Connection %r checked out from pool", "Connection %r being returned to pool%s", "Connection %s rollback-on-return", - "Closing connection %r", + "%s connection %r", ] + (["Pool disposed. %s"] if dispose else []), ) diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 2bbb976a84..473d462a31 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -87,10 +87,13 @@ class PoolTestBase(fixtures.TestBase): def _queuepool_dbapi_fixture(self, **kw): dbapi = MockDBAPI() _is_asyncio = kw.pop("_is_asyncio", False) + _has_terminate = kw.pop("_has_terminate", False) p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw) if _is_asyncio: p._is_asyncio = True p._dialect = _AsyncConnDialect() + if _has_terminate: + p._dialect.has_terminate = True return dbapi, p @@ -445,8 +448,10 @@ class PoolEventsTest(PoolTestBase): return p, canary - def _checkin_event_fixture(self, _is_asyncio=False): - p = self._queuepool_fixture(_is_asyncio=_is_asyncio) + def _checkin_event_fixture(self, _is_asyncio=False, _has_terminate=False): + p = self._queuepool_fixture( + _is_asyncio=_is_asyncio, _has_terminate=_has_terminate + ) canary = [] @event.listens_for(p, "checkin") @@ -721,9 +726,12 @@ class PoolEventsTest(PoolTestBase): assert canary.call_args_list[0][0][0] is dbapi_con assert canary.call_args_list[0][0][2] is exc - @testing.combinations((True,), (False,)) - def test_checkin_event_gc(self, detach_gced): - p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced) + @testing.combinations((True,), (False,), argnames="is_asyncio") + @testing.combinations((True,), (False,), argnames="has_terminate") + def test_checkin_event_gc(self, is_asyncio, has_terminate): + p, canary = self._checkin_event_fixture( + _is_asyncio=is_asyncio, _has_terminate=has_terminate + ) c1 = p.connect() @@ -733,6 +741,8 @@ class PoolEventsTest(PoolTestBase): del c1 lazy_gc() + detach_gced = is_asyncio and not has_terminate + if detach_gced: # "close_detached" is not called because for asyncio the # connection is just lost.