]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
integrate connection.terminate() for supporting dialects
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 23 Aug 2022 13:28:06 +0000 (09:28 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 23 Aug 2022 13:48:00 +0000 (09:48 -0400)
Integrated support for asyncpg's ``terminate()`` method call for cases
where the connection pool is recycling a possibly timed-out connection,
where a connection is being garbage collected that wasn't gracefully
closed, as well as when the connection has been invalidated. This allows
asyncpg to abandon the connection without waiting for a response that may
incur long timeouts.

Fixes: #8419
Change-Id: Ia575af779d5733b483a72dff3690b8bbbad2bb05
(cherry picked from commit 3b7e621aa728d9b01dbac4150e13ea2ef6af35a3)

doc/build/changelog/unreleased_14/8419.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/asyncpg.py
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/engine/interfaces.py
lib/sqlalchemy/pool/base.py
test/engine/test_logging.py
test/engine/test_pool.py

diff --git a/doc/build/changelog/unreleased_14/8419.rst b/doc/build/changelog/unreleased_14/8419.rst
new file mode 100644 (file)
index 0000000..a095d85
--- /dev/null
@@ -0,0 +1,10 @@
+.. change::
+    :tags: bug, asyncio
+    :tickets: 8419
+
+    Integrated support for asyncpg's ``terminate()`` method call for cases
+    where the connection pool is recycling a possibly timed-out connection,
+    where a connection is being garbage collected that wasn't gracefully
+    closed, as well as when the connection has been invalidated. This allows
+    asyncpg to abandon the connection without waiting for a response that may
+    incur long timeouts.
index 305ad46a32ba4efc9c70d373b2895ac03a5ea062..39b0f544cb4af98ecda71f70d1af498cef9a9f35 100644 (file)
@@ -748,6 +748,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
 
         self.await_(self._connection.close())
 
+    def terminate(self):
+        self._connection.terminate()
+
 
 class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
     __slots__ = ()
@@ -891,6 +894,7 @@ class PGDialect_asyncpg(PGDialect):
     supports_server_side_cursors = True
 
     supports_unicode_binds = True
+    has_terminate = True
 
     default_paramstyle = "format"
     supports_sane_multi_rowcount = False
@@ -987,6 +991,9 @@ class PGDialect_asyncpg(PGDialect):
     def get_deferrable(self, connection):
         return connection.deferrable
 
+    def do_terminate(self, dbapi_connection) -> None:
+        dbapi_connection.terminate()
+
     def create_connect_args(self, url):
         opts = url.translate_connect_args(username="user")
 
index 268a2d60930d460320f6585b5b749aedaddee927..6b58c44696b7c5b4db9f51bc3078f7f8e5ffb1b1 100644 (file)
@@ -231,6 +231,7 @@ class DefaultDialect(interfaces.Dialect):
     CACHING_DISABLED = CACHING_DISABLED
     NO_CACHE_KEY = NO_CACHE_KEY
     NO_DIALECT_SUPPORT = NO_DIALECT_SUPPORT
+    has_terminate = False
 
     @util.deprecated_params(
         convert_unicode=(
@@ -684,6 +685,9 @@ class DefaultDialect(interfaces.Dialect):
     def do_commit(self, dbapi_connection):
         dbapi_connection.commit()
 
+    def do_terminate(self, dbapi_connection):
+        self.do_close(dbapi_connection)
+
     def do_close(self, dbapi_connection):
         dbapi_connection.close()
 
index 4f2524aef2dfa4155ab657592e7bc8798ccd4b41..4e0ab8e72fd586ae13e18aa536695c74cefc41ab 100644 (file)
@@ -583,6 +583,23 @@ class Dialect(object):
 
         raise NotImplementedError()
 
+    def do_terminate(self, dbapi_connection):
+        """Provide an implementation of ``connection.close()`` that tries as
+        much as possible to not block, given a DBAPI
+        connection.
+
+        In the vast majority of cases this just calls .close(), however
+        for some asyncio dialects may call upon different API features.
+
+        This hook is called by the :class:`_pool.Pool`
+        when a connection is being recycled or has been invalidated.
+
+        .. versionadded:: 1.4.41
+
+        """
+
+        raise NotImplementedError()
+
     def do_close(self, dbapi_connection):
         """Provide an implementation of ``connection.close()``, given a DBAPI
         connection.
index cde28c2fb021363efa108e0505759ab51da9a2e5..9f16c654334e271eecf0c06a72811b9bf5544358 100644 (file)
@@ -36,6 +36,7 @@ class _ConnDialect(object):
     """
 
     is_async = False
+    has_terminate = False
 
     def do_rollback(self, dbapi_connection):
         dbapi_connection.rollback()
@@ -43,6 +44,9 @@ class _ConnDialect(object):
     def do_commit(self, dbapi_connection):
         dbapi_connection.commit()
 
+    def do_terminate(self, dbapi_connection):
+        dbapi_connection.close()
+
     def do_close(self, dbapi_connection):
         dbapi_connection.close()
 
@@ -240,11 +244,17 @@ class Pool(log.Identified):
         else:
             return lambda crec: creator()
 
-    def _close_connection(self, connection):
-        self.logger.debug("Closing connection %r", connection)
-
+    def _close_connection(self, connection, terminate=False):
+        self.logger.debug(
+            "%s connection %r",
+            "Hard-closing" if terminate else "Closing",
+            connection,
+        )
         try:
-            self._dialect.do_close(connection)
+            if terminate:
+                self._dialect.do_terminate(connection)
+            else:
+                self._dialect.do_close(connection)
         except Exception:
             self.logger.error(
                 "Exception closing connection %r", connection, exc_info=True
@@ -584,7 +594,7 @@ class _ConnectionRecord(object):
         if soft:
             self._soft_invalidate_time = time.time()
         else:
-            self.__close()
+            self.__close(terminate=True)
             self.dbapi_connection = None
 
     def get_connection(self):
@@ -630,7 +640,7 @@ class _ConnectionRecord(object):
             recycle = True
 
         if recycle:
-            self.__close()
+            self.__close(terminate=True)
             self.info.clear()
 
             self.__connect()
@@ -643,11 +653,13 @@ class _ConnectionRecord(object):
             or (self._soft_invalidate_time > self.starttime)
         )
 
-    def __close(self):
+    def __close(self, terminate=False):
         self.finalize_callback.clear()
         if self.__pool.dispatch.close:
             self.__pool.dispatch.close(self.dbapi_connection, self)
-        self.__pool._close_connection(self.dbapi_connection)
+        self.__pool._close_connection(
+            self.dbapi_connection, terminate=terminate
+        )
         self.dbapi_connection = None
 
     def __connect(self):
@@ -709,7 +721,9 @@ def _finalize_fairy(
         dbapi_connection = connection_record.dbapi_connection
 
     # null pool is not _is_asyncio but can be used also with async dialects
-    dont_restore_gced = pool._dialect.is_async
+    dont_restore_gced = (
+        pool._dialect.is_async and not pool._dialect.has_terminate
+    )
 
     if dont_restore_gced:
         detach = not connection_record or ref
@@ -751,8 +765,10 @@ def _finalize_fairy(
                 else:
                     message = (
                         "The garbage collector is trying to clean up "
-                        "connection %r. This feature is unsupported on async "
-                        "dbapi, since no IO can be performed at this stage to "
+                        "connection %r. This feature is unsupported on "
+                        "unsupported on asyncio "
+                        'dbapis that lack a "terminate" feature, '
+                        "since no IO can be performed at this stage to "
                         "reset the connection. Please close out all "
                         "connections when they are no longer used, calling "
                         "``close()`` or using a context manager to "
index 7a0ed6e7934bfe5c9fbfee2d6d4ab0849a5c3a0e..5b0d6c762e26de3313a71d80fde9d3729b57c067 100644 (file)
@@ -468,7 +468,7 @@ class PoolLoggingTest(fixtures.TestBase):
                 "Connection %r checked out from pool",
                 "Connection %r being returned to pool%s",
                 "Connection %s rollback-on-return",
-                "Closing connection %r",
+                "%s connection %r",
             ]
             + (["Pool disposed. %s"] if dispose else []),
         )
index 320a9bb585409334da29886a77d7997b5f575568..879369a9ffd344d0ce709cabe141bceceb894e78 100644 (file)
@@ -92,10 +92,13 @@ class PoolTestBase(fixtures.TestBase):
     def _queuepool_dbapi_fixture(self, **kw):
         dbapi = MockDBAPI()
         _is_asyncio = kw.pop("_is_asyncio", False)
+        _has_terminate = kw.pop("_has_terminate", False)
         p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw)
         if _is_asyncio:
             p._is_asyncio = True
             p._dialect = _AsyncConnDialect()
+        if _has_terminate:
+            p._dialect.has_terminate = True
         return dbapi, p
 
 
@@ -468,8 +471,10 @@ class PoolEventsTest(PoolTestBase):
 
         return p, canary
 
-    def _checkin_event_fixture(self, _is_asyncio=False):
-        p = self._queuepool_fixture(_is_asyncio=_is_asyncio)
+    def _checkin_event_fixture(self, _is_asyncio=False, _has_terminate=False):
+        p = self._queuepool_fixture(
+            _is_asyncio=_is_asyncio, _has_terminate=_has_terminate
+        )
         canary = []
 
         @event.listens_for(p, "checkin")
@@ -744,9 +749,13 @@ class PoolEventsTest(PoolTestBase):
         assert canary.call_args_list[0][0][0] is dbapi_con
         assert canary.call_args_list[0][0][2] is exc
 
-    @testing.combinations((True, testing.requires.python3), (False,))
-    def test_checkin_event_gc(self, detach_gced):
-        p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced)
+    @testing.combinations((True,), (False,), argnames="is_asyncio")
+    @testing.combinations((True,), (False,), argnames="has_terminate")
+    @testing.requires.python3
+    def test_checkin_event_gc(self, is_asyncio, has_terminate):
+        p, canary = self._checkin_event_fixture(
+            _is_asyncio=is_asyncio, _has_terminate=has_terminate
+        )
 
         c1 = p.connect()
 
@@ -756,6 +765,8 @@ class PoolEventsTest(PoolTestBase):
         del c1
         lazy_gc()
 
+        detach_gced = is_asyncio and not has_terminate
+
         if detach_gced:
             # "close_detached" is not called because for asyncio the
             # connection is just lost.