]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
try to gracefully close even in terminate
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 30 Nov 2023 14:11:25 +0000 (09:11 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 30 Nov 2023 20:08:41 +0000 (15:08 -0500)
Adjusted the asyncpg dialect such that when the ``terminate()`` method is
used to discard an invalidated connection, the dialect will first attempt
to gracefully close the conneciton using ``.close()`` with a timeout, if
the operation is proceeding within an async event loop context only. This
allows the asyncpg driver to attend to finalizing a ``TimeoutError``
including being able to close a long-running query server side, which
otherwise can keep running after the program has exited.

Fixes: #10717
Change-Id: Iaba0aeb67873a7a2b3981d43f4eb663005057309
(cherry picked from commit e70a0b0a0e52945e5b588b5cffec619a3f3e78a1)

doc/build/changelog/unreleased_20/10717.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/asyncpg.py
lib/sqlalchemy/util/_concurrency_py3k.py
lib/sqlalchemy/util/concurrency.py

diff --git a/doc/build/changelog/unreleased_20/10717.rst b/doc/build/changelog/unreleased_20/10717.rst
new file mode 100644 (file)
index 0000000..2cd9303
--- /dev/null
@@ -0,0 +1,11 @@
+.. change::
+    :tags: bug, postgresql
+    :tickets: 10717
+
+    Adjusted the asyncpg dialect such that when the ``terminate()`` method is
+    used to discard an invalidated connection, the dialect will first attempt
+    to gracefully close the conneciton using ``.close()`` with a timeout, if
+    the operation is proceeding within an async event loop context only. This
+    allows the asyncpg driver to attend to finalizing a ``TimeoutError``
+    including being able to close a long-running query server side, which
+    otherwise can keep running after the program has exited.
index ca35bf96075d249d679243e9df922bdd2f4cfc28..a116a470a7cbf00386fd95779bc791bea064072f 100644 (file)
@@ -894,7 +894,23 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
         self.await_(self._connection.close())
 
     def terminate(self):
-        self._connection.terminate()
+        if util.concurrency.in_greenlet():
+            # in a greenlet; this is the connection was invalidated
+            # case.
+            try:
+                # try to gracefully close; see #10717
+                # timeout added in asyncpg 0.14.0 December 2017
+                self.await_(self._connection.close(timeout=2))
+            except asyncio.TimeoutError:
+                # in the case where we are recycling an old connection
+                # that may have already been disconnected, close() will
+                # fail with the above timeout.  in this case, terminate
+                # the connection without any further waiting.
+                # see issue #8419
+                self._connection.terminate()
+        else:
+            # not in a greenlet; this is the gc cleanup case
+            self._connection.terminate()
         self._started = False
 
     @staticmethod
index 71d10a68579902689dc3ff75c0976c36d02bd99e..83201dd95c764835dbdd2877e55ae0aad90e18ce 100644 (file)
@@ -99,6 +99,11 @@ def _safe_cancel_awaitable(awaitable: Awaitable[Any]) -> None:
         awaitable.close()
 
 
+def in_greenlet() -> bool:
+    current = getcurrent()
+    return isinstance(current, _AsyncIoGreenlet)
+
+
 def await_only(awaitable: Awaitable[_T]) -> _T:
     """Awaits an async function in a sync method.
 
index 53a70070b765c0f840f9de68892be02a3185eb6c..1141cbc165a1af21f7428d7a5893daee268357dc 100644 (file)
@@ -22,6 +22,7 @@ else:
     have_greenlet = True
     from ._concurrency_py3k import await_only as await_only
     from ._concurrency_py3k import await_fallback as await_fallback
+    from ._concurrency_py3k import in_greenlet as in_greenlet
     from ._concurrency_py3k import greenlet_spawn as greenlet_spawn
     from ._concurrency_py3k import is_exit_exception as is_exit_exception
     from ._concurrency_py3k import AsyncAdaptedLock as AsyncAdaptedLock
@@ -56,6 +57,9 @@ if not typing.TYPE_CHECKING and not have_greenlet:
     def await_fallback(thing):  # type: ignore  # noqa: F811
         return thing
 
+    def in_greenlet():  # type: ignore  # noqa: F811
+        _not_implemented()
+
     def greenlet_spawn(fn, *args, **kw):  # type: ignore  # noqa: F811
         _not_implemented()