--- /dev/null
+.. change::
+ :tags: bug, asyncio
+ :tickets: 8419
+
+ Integrated support for asyncpg's ``terminate()`` method call for cases
+ where the connection pool is recycling a possibly timed-out connection,
+ where a connection is being garbage collected that wasn't gracefully
+ closed, as well as when the connection has been invalidated. This allows
+ asyncpg to abandon the connection without waiting for a response that may
+ incur long timeouts.
self.await_(self._connection.close())
+ def terminate(self):
+ self._connection.terminate()
+
class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
__slots__ = ()
supports_server_side_cursors = True
supports_unicode_binds = True
+ has_terminate = True
default_paramstyle = "format"
supports_sane_multi_rowcount = False
def get_deferrable(self, connection):
return connection.deferrable
+ def do_terminate(self, dbapi_connection) -> None:
+ dbapi_connection.terminate()
+
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
CACHING_DISABLED = CACHING_DISABLED
NO_CACHE_KEY = NO_CACHE_KEY
NO_DIALECT_SUPPORT = NO_DIALECT_SUPPORT
+ has_terminate = False
@util.deprecated_params(
convert_unicode=(
def do_commit(self, dbapi_connection):
dbapi_connection.commit()
+ def do_terminate(self, dbapi_connection):
+ self.do_close(dbapi_connection)
+
def do_close(self, dbapi_connection):
dbapi_connection.close()
raise NotImplementedError()
+ def do_terminate(self, dbapi_connection):
+ """Provide an implementation of ``connection.close()`` that tries as
+ much as possible to not block, given a DBAPI
+ connection.
+
+ In the vast majority of cases this just calls .close(), however
+ for some asyncio dialects may call upon different API features.
+
+ This hook is called by the :class:`_pool.Pool`
+ when a connection is being recycled or has been invalidated.
+
+ .. versionadded:: 1.4.41
+
+ """
+
+ raise NotImplementedError()
+
def do_close(self, dbapi_connection):
"""Provide an implementation of ``connection.close()``, given a DBAPI
connection.
"""
is_async = False
+ has_terminate = False
def do_rollback(self, dbapi_connection):
dbapi_connection.rollback()
def do_commit(self, dbapi_connection):
dbapi_connection.commit()
+ def do_terminate(self, dbapi_connection):
+ dbapi_connection.close()
+
def do_close(self, dbapi_connection):
dbapi_connection.close()
else:
return lambda crec: creator()
- def _close_connection(self, connection):
- self.logger.debug("Closing connection %r", connection)
-
+ def _close_connection(self, connection, terminate=False):
+ self.logger.debug(
+ "%s connection %r",
+ "Hard-closing" if terminate else "Closing",
+ connection,
+ )
try:
- self._dialect.do_close(connection)
+ if terminate:
+ self._dialect.do_terminate(connection)
+ else:
+ self._dialect.do_close(connection)
except Exception:
self.logger.error(
"Exception closing connection %r", connection, exc_info=True
if soft:
self._soft_invalidate_time = time.time()
else:
- self.__close()
+ self.__close(terminate=True)
self.dbapi_connection = None
def get_connection(self):
recycle = True
if recycle:
- self.__close()
+ self.__close(terminate=True)
self.info.clear()
self.__connect()
or (self._soft_invalidate_time > self.starttime)
)
- def __close(self):
+ def __close(self, terminate=False):
self.finalize_callback.clear()
if self.__pool.dispatch.close:
self.__pool.dispatch.close(self.dbapi_connection, self)
- self.__pool._close_connection(self.dbapi_connection)
+ self.__pool._close_connection(
+ self.dbapi_connection, terminate=terminate
+ )
self.dbapi_connection = None
def __connect(self):
dbapi_connection = connection_record.dbapi_connection
# null pool is not _is_asyncio but can be used also with async dialects
- dont_restore_gced = pool._dialect.is_async
+ dont_restore_gced = (
+ pool._dialect.is_async and not pool._dialect.has_terminate
+ )
if dont_restore_gced:
detach = not connection_record or ref
else:
message = (
"The garbage collector is trying to clean up "
- "connection %r. This feature is unsupported on async "
- "dbapi, since no IO can be performed at this stage to "
+ "connection %r. This feature is unsupported on "
+ "unsupported on asyncio "
+ 'dbapis that lack a "terminate" feature, '
+ "since no IO can be performed at this stage to "
"reset the connection. Please close out all "
"connections when they are no longer used, calling "
"``close()`` or using a context manager to "
"Connection %r checked out from pool",
"Connection %r being returned to pool%s",
"Connection %s rollback-on-return",
- "Closing connection %r",
+ "%s connection %r",
]
+ (["Pool disposed. %s"] if dispose else []),
)
def _queuepool_dbapi_fixture(self, **kw):
dbapi = MockDBAPI()
_is_asyncio = kw.pop("_is_asyncio", False)
+ _has_terminate = kw.pop("_has_terminate", False)
p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw)
if _is_asyncio:
p._is_asyncio = True
p._dialect = _AsyncConnDialect()
+ if _has_terminate:
+ p._dialect.has_terminate = True
return dbapi, p
return p, canary
- def _checkin_event_fixture(self, _is_asyncio=False):
- p = self._queuepool_fixture(_is_asyncio=_is_asyncio)
+ def _checkin_event_fixture(self, _is_asyncio=False, _has_terminate=False):
+ p = self._queuepool_fixture(
+ _is_asyncio=_is_asyncio, _has_terminate=_has_terminate
+ )
canary = []
@event.listens_for(p, "checkin")
assert canary.call_args_list[0][0][0] is dbapi_con
assert canary.call_args_list[0][0][2] is exc
- @testing.combinations((True, testing.requires.python3), (False,))
- def test_checkin_event_gc(self, detach_gced):
- p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced)
+ @testing.combinations((True,), (False,), argnames="is_asyncio")
+ @testing.combinations((True,), (False,), argnames="has_terminate")
+ @testing.requires.python3
+ def test_checkin_event_gc(self, is_asyncio, has_terminate):
+ p, canary = self._checkin_event_fixture(
+ _is_asyncio=is_asyncio, _has_terminate=has_terminate
+ )
c1 = p.connect()
del c1
lazy_gc()
+ detach_gced = is_asyncio and not has_terminate
+
if detach_gced:
# "close_detached" is not called because for asyncio the
# connection is just lost.