]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
integrate connection.terminate() for supporting dialects
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 23 Aug 2022 13:28:06 +0000 (09:28 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 24 Aug 2022 17:02:23 +0000 (13:02 -0400)
Integrated support for asyncpg's ``terminate()`` method call for cases
where the connection pool is recycling a possibly timed-out connection,
where a connection is being garbage collected that wasn't gracefully
closed, as well as when the connection has been invalidated. This allows
asyncpg to abandon the connection without waiting for a response that may
incur long timeouts.

Fixes: #8419
Change-Id: Ia575af779d5733b483a72dff3690b8bbbad2bb05

doc/build/changelog/unreleased_14/8419.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/asyncpg.py
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/engine/interfaces.py
lib/sqlalchemy/pool/base.py
test/engine/test_logging.py
test/engine/test_pool.py

diff --git a/doc/build/changelog/unreleased_14/8419.rst b/doc/build/changelog/unreleased_14/8419.rst
new file mode 100644 (file)
index 0000000..a095d85
--- /dev/null
@@ -0,0 +1,10 @@
+.. change::
+    :tags: bug, asyncio
+    :tickets: 8419
+
+    Integrated support for asyncpg's ``terminate()`` method call for cases
+    where the connection pool is recycling a possibly timed-out connection,
+    where a connection is being garbage collected that wasn't gracefully
+    closed, as well as when the connection has been invalidated. This allows
+    asyncpg to abandon the connection without waiting for a response that may
+    incur long timeouts.
index 6888959f0a27e1ae4fc1372a29a533cfa2e3755d..a84bece4fd46fa7f5afd3fa62314308e69d7de31 100644 (file)
@@ -793,6 +793,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
 
         self.await_(self._connection.close())
 
+    def terminate(self):
+        self._connection.terminate()
+
 
 class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
     __slots__ = ()
@@ -895,6 +898,7 @@ class PGDialect_asyncpg(PGDialect):
     supports_server_side_cursors = True
 
     render_bind_cast = True
+    has_terminate = True
 
     default_paramstyle = "format"
     supports_sane_multi_rowcount = False
@@ -981,6 +985,9 @@ class PGDialect_asyncpg(PGDialect):
     def get_deferrable(self, connection):
         return connection.deferrable
 
+    def do_terminate(self, dbapi_connection) -> None:
+        dbapi_connection.terminate()
+
     def create_connect_args(self, url):
         opts = url.translate_connect_args(username="user")
 
index 80e687c3296ac0b3100d7e6bc61123b65ea8f926..9ad0ebbfc05f84665ce77794e9a9d84f903326d8 100644 (file)
@@ -237,6 +237,8 @@ class DefaultDialect(Dialect):
 
     is_async = False
 
+    has_terminate = False
+
     # TODO: this is not to be part of 2.0.  implement rudimentary binary
     # literals for SQLite, PostgreSQL, MySQL only within
     # _Binary.literal_processor
@@ -620,6 +622,9 @@ class DefaultDialect(Dialect):
     def do_commit(self, dbapi_connection):
         dbapi_connection.commit()
 
+    def do_terminate(self, dbapi_connection):
+        self.do_close(dbapi_connection)
+
     def do_close(self, dbapi_connection):
         dbapi_connection.close()
 
index 778c07592f5400cbdcc7e67b97f4c05b2176d6c7..01b266d68ace6aea27381b21dc6845345cfff12f 100644 (file)
@@ -966,6 +966,10 @@ class Dialect(EventTarget):
     is_async: bool
     """Whether or not this dialect is intended for asyncio use."""
 
+    has_terminate: bool
+    """Whether or not this dialect has a separate "terminate" implementation
+    that does not block or require awaiting."""
+
     engine_config_types: Mapping[str, Any]
     """a mapping of string keys that can be in an engine config linked to
     type conversion functions.
@@ -1784,6 +1788,23 @@ class Dialect(EventTarget):
 
         raise NotImplementedError()
 
+    def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+        """Provide an implementation of ``connection.close()`` that tries as
+        much as possible to not block, given a DBAPI
+        connection.
+
+        In the vast majority of cases this just calls .close(), however
+        for some asyncio dialects may call upon different API features.
+
+        This hook is called by the :class:`_pool.Pool`
+        when a connection is being recycled or has been invalidated.
+
+        .. versionadded:: 1.4.41
+
+        """
+
+        raise NotImplementedError()
+
     def do_close(self, dbapi_connection: DBAPIConnection) -> None:
         """Provide an implementation of ``connection.close()``, given a DBAPI
         connection.
index 51eb5e9f5e29f40a599f9bfc9eda6da84c442a36..41f2f03b278eddeddc3c5a1be304827168879139 100644 (file)
@@ -72,6 +72,7 @@ class _ConnDialect:
     """
 
     is_async = False
+    has_terminate = False
 
     def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
         dbapi_connection.rollback()
@@ -79,6 +80,9 @@ class _ConnDialect:
     def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
         dbapi_connection.commit()
 
+    def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+        dbapi_connection.close()
+
     def do_close(self, dbapi_connection: DBAPIConnection) -> None:
         dbapi_connection.close()
 
@@ -310,10 +314,19 @@ class Pool(log.Identified, event.EventTarget):
             creator_fn = cast(_CreatorFnType, creator)
             return lambda rec: creator_fn()
 
-    def _close_connection(self, connection: DBAPIConnection) -> None:
-        self.logger.debug("Closing connection %r", connection)
+    def _close_connection(
+        self, connection: DBAPIConnection, *, terminate: bool = False
+    ) -> None:
+        self.logger.debug(
+            "%s connection %r",
+            "Hard-closing" if terminate else "Closing",
+            connection,
+        )
         try:
-            self._dialect.do_close(connection)
+            if terminate:
+                self._dialect.do_terminate(connection)
+            else:
+                self._dialect.do_close(connection)
         except Exception:
             self.logger.error(
                 "Exception closing connection %r", connection, exc_info=True
@@ -742,7 +755,7 @@ class _ConnectionRecord(ConnectionPoolEntry):
         if soft:
             self._soft_invalidate_time = time.time()
         else:
-            self.__close()
+            self.__close(terminate=True)
             self.dbapi_connection = None
 
     def get_connection(self) -> DBAPIConnection:
@@ -789,7 +802,7 @@ class _ConnectionRecord(ConnectionPoolEntry):
             recycle = True
 
         if recycle:
-            self.__close()
+            self.__close(terminate=True)
             self.info.clear()  # type: ignore  # our info is always present
 
             self.__connect()
@@ -804,12 +817,14 @@ class _ConnectionRecord(ConnectionPoolEntry):
             or (self._soft_invalidate_time > self.starttime)
         )
 
-    def __close(self) -> None:
+    def __close(self, *, terminate: bool = False) -> None:
         self.finalize_callback.clear()
         if self.__pool.dispatch.close:
             self.__pool.dispatch.close(self.dbapi_connection, self)
         assert self.dbapi_connection is not None
-        self.__pool._close_connection(self.dbapi_connection)
+        self.__pool._close_connection(
+            self.dbapi_connection, terminate=terminate
+        )
         self.dbapi_connection = None
 
     def __connect(self) -> None:
@@ -877,7 +892,9 @@ def _finalize_fairy(
         dbapi_connection = connection_record.dbapi_connection
 
     # null pool is not _is_asyncio but can be used also with async dialects
-    dont_restore_gced = pool._dialect.is_async
+    dont_restore_gced = (
+        pool._dialect.is_async and not pool._dialect.has_terminate
+    )
 
     if dont_restore_gced:
         detach = connection_record is None or is_gc_cleanup
@@ -923,8 +940,9 @@ def _finalize_fairy(
                     message = (
                         "The garbage collector is trying to clean up "
                         f"connection {dbapi_connection!r}. This feature is "
-                        "unsupported on async "
-                        "dbapi, since no IO can be performed at this stage to "
+                        "unsupported on asyncio "
+                        'dbapis that lack a "terminate" feature, since no '
+                        "IO can be performed at this stage to "
                         "reset the connection. Please close out all "
                         "connections when they are no longer used, calling "
                         "``close()`` or using a context manager to "
index c6fd8568471e1ca169cd5d68b6f83c996d60e852..38e1c436c0c79863f397d40bbb0ecafd5d152f9a 100644 (file)
@@ -453,7 +453,7 @@ class PoolLoggingTest(fixtures.TestBase):
                 "Connection %r checked out from pool",
                 "Connection %r being returned to pool%s",
                 "Connection %s rollback-on-return",
-                "Closing connection %r",
+                "%s connection %r",
             ]
             + (["Pool disposed. %s"] if dispose else []),
         )
index 2bbb976a84c50086d9ac07e5e8ee0c73958f6da3..473d462a3173dea40de5b5363c043e13c5a50fc4 100644 (file)
@@ -87,10 +87,13 @@ class PoolTestBase(fixtures.TestBase):
     def _queuepool_dbapi_fixture(self, **kw):
         dbapi = MockDBAPI()
         _is_asyncio = kw.pop("_is_asyncio", False)
+        _has_terminate = kw.pop("_has_terminate", False)
         p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw)
         if _is_asyncio:
             p._is_asyncio = True
             p._dialect = _AsyncConnDialect()
+        if _has_terminate:
+            p._dialect.has_terminate = True
         return dbapi, p
 
 
@@ -445,8 +448,10 @@ class PoolEventsTest(PoolTestBase):
 
         return p, canary
 
-    def _checkin_event_fixture(self, _is_asyncio=False):
-        p = self._queuepool_fixture(_is_asyncio=_is_asyncio)
+    def _checkin_event_fixture(self, _is_asyncio=False, _has_terminate=False):
+        p = self._queuepool_fixture(
+            _is_asyncio=_is_asyncio, _has_terminate=_has_terminate
+        )
         canary = []
 
         @event.listens_for(p, "checkin")
@@ -721,9 +726,12 @@ class PoolEventsTest(PoolTestBase):
         assert canary.call_args_list[0][0][0] is dbapi_con
         assert canary.call_args_list[0][0][2] is exc
 
-    @testing.combinations((True,), (False,))
-    def test_checkin_event_gc(self, detach_gced):
-        p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced)
+    @testing.combinations((True,), (False,), argnames="is_asyncio")
+    @testing.combinations((True,), (False,), argnames="has_terminate")
+    def test_checkin_event_gc(self, is_asyncio, has_terminate):
+        p, canary = self._checkin_event_fixture(
+            _is_asyncio=is_asyncio, _has_terminate=has_terminate
+        )
 
         c1 = p.connect()
 
@@ -733,6 +741,8 @@ class PoolEventsTest(PoolTestBase):
         del c1
         lazy_gc()
 
+        detach_gced = is_asyncio and not has_terminate
+
         if detach_gced:
             # "close_detached" is not called because for asyncio the
             # connection is just lost.