]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
try to gracefully close even in terminate
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 30 Nov 2023 14:11:25 +0000 (09:11 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 30 Nov 2023 20:05:33 +0000 (15:05 -0500)
Adjusted the asyncpg dialect such that when the ``terminate()`` method is
used to discard an invalidated connection, the dialect will first attempt
to gracefully close the conneciton using ``.close()`` with a timeout, if
the operation is proceeding within an async event loop context only. This
allows the asyncpg driver to attend to finalizing a ``TimeoutError``
including being able to close a long-running query server side, which
otherwise can keep running after the program has exited.

Fixes: #10717
Change-Id: Iaba0aeb67873a7a2b3981d43f4eb663005057309

doc/build/changelog/unreleased_20/10717.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/asyncpg.py
lib/sqlalchemy/util/concurrency.py

diff --git a/doc/build/changelog/unreleased_20/10717.rst b/doc/build/changelog/unreleased_20/10717.rst
new file mode 100644 (file)
index 0000000..2cd9303
--- /dev/null
@@ -0,0 +1,11 @@
+.. change::
+    :tags: bug, postgresql
+    :tickets: 10717
+
+    Adjusted the asyncpg dialect such that when the ``terminate()`` method is
+    used to discard an invalidated connection, the dialect will first attempt
+    to gracefully close the conneciton using ``.close()`` with a timeout, if
+    the operation is proceeding within an async event loop context only. This
+    allows the asyncpg driver to attend to finalizing a ``TimeoutError``
+    including being able to close a long-running query server side, which
+    otherwise can keep running after the program has exited.
index d57c94a170f9c3d085be8cc4e8ba9a473b808b27..a8fcf15107cd7281233dd1055904a8de3017fce2 100644 (file)
@@ -182,6 +182,7 @@ client using this setting passed to :func:`_asyncio.create_async_engine`::
 
 from __future__ import annotations
 
+import asyncio
 import collections
 import decimal
 import json as _py_json
@@ -941,7 +942,23 @@ class AsyncAdapt_asyncpg_connection(AsyncAdapt_dbapi_connection):
         self.await_(self._connection.close())
 
     def terminate(self):
-        self._connection.terminate()
+        if util.concurrency.in_greenlet():
+            # in a greenlet; this is the connection was invalidated
+            # case.
+            try:
+                # try to gracefully close; see #10717
+                # timeout added in asyncpg 0.14.0 December 2017
+                self.await_(self._connection.close(timeout=2))
+            except asyncio.TimeoutError:
+                # in the case where we are recycling an old connection
+                # that may have already been disconnected, close() will
+                # fail with the above timeout.  in this case, terminate
+                # the connection without any further waiting.
+                # see issue #8419
+                self._connection.terminate()
+        else:
+            # not in a greenlet; this is the gc cleanup case
+            self._connection.terminate()
         self._started = False
 
     @staticmethod
index 084374040f82e944061111fc0fc3cfe0330c4a6c..df5e03ae19c26e8e4090ba13b436db8abfae99d4 100644 (file)
@@ -169,6 +169,11 @@ def _safe_cancel_awaitable(awaitable: Awaitable[Any]) -> None:
         awaitable.close()
 
 
+def in_greenlet() -> bool:
+    current = _concurrency_shim.getcurrent()
+    return isinstance(current, _concurrency_shim._AsyncIoGreenlet)
+
+
 def await_only(awaitable: Awaitable[_T]) -> _T:
     """Awaits an async function in a sync method.