--- /dev/null
+.. change::
+ :tags: bug, asyncio
+ :tickets: 8419
+
+ Integrated support for asyncpg's ``terminate()`` method call for cases
+ where the connection pool is recycling a possibly timed-out connection,
+ where a connection is being garbage collected that wasn't gracefully
+ closed, as well as when the connection has been invalidated. This allows
+ asyncpg to abandon the connection without waiting for a response that may
+ incur long timeouts.
self.await_(self._connection.close())
+ def terminate(self):
+ self._connection.terminate()
+
class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
__slots__ = ()
supports_server_side_cursors = True
render_bind_cast = True
+ has_terminate = True
default_paramstyle = "format"
supports_sane_multi_rowcount = False
def get_deferrable(self, connection):
return connection.deferrable
+ def do_terminate(self, dbapi_connection) -> None:
+ dbapi_connection.terminate()
+
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
is_async = False
+ has_terminate = False
+
# TODO: this is not to be part of 2.0. implement rudimentary binary
# literals for SQLite, PostgreSQL, MySQL only within
# _Binary.literal_processor
def do_commit(self, dbapi_connection):
dbapi_connection.commit()
+ def do_terminate(self, dbapi_connection):
+ self.do_close(dbapi_connection)
+
def do_close(self, dbapi_connection):
dbapi_connection.close()
is_async: bool
"""Whether or not this dialect is intended for asyncio use."""
+ has_terminate: bool
+ """Whether or not this dialect has a separate "terminate" implementation
+ that does not block or require awaiting."""
+
engine_config_types: Mapping[str, Any]
"""a mapping of string keys that can be in an engine config linked to
type conversion functions.
raise NotImplementedError()
+ def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+ """Provide an implementation of ``connection.close()`` that tries as
+ much as possible to not block, given a DBAPI
+ connection.
+
+ In the vast majority of cases this just calls .close(), however
+ for some asyncio dialects may call upon different API features.
+
+ This hook is called by the :class:`_pool.Pool`
+ when a connection is being recycled or has been invalidated.
+
+ .. versionadded:: 1.4.41
+
+ """
+
+ raise NotImplementedError()
+
def do_close(self, dbapi_connection: DBAPIConnection) -> None:
"""Provide an implementation of ``connection.close()``, given a DBAPI
connection.
"""
is_async = False
+ has_terminate = False
def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
dbapi_connection.rollback()
def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
dbapi_connection.commit()
+ def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+ dbapi_connection.close()
+
def do_close(self, dbapi_connection: DBAPIConnection) -> None:
dbapi_connection.close()
creator_fn = cast(_CreatorFnType, creator)
return lambda rec: creator_fn()
- def _close_connection(self, connection: DBAPIConnection) -> None:
- self.logger.debug("Closing connection %r", connection)
+ def _close_connection(
+ self, connection: DBAPIConnection, *, terminate: bool = False
+ ) -> None:
+ self.logger.debug(
+ "%s connection %r",
+ "Hard-closing" if terminate else "Closing",
+ connection,
+ )
try:
- self._dialect.do_close(connection)
+ if terminate:
+ self._dialect.do_terminate(connection)
+ else:
+ self._dialect.do_close(connection)
except Exception:
self.logger.error(
"Exception closing connection %r", connection, exc_info=True
if soft:
self._soft_invalidate_time = time.time()
else:
- self.__close()
+ self.__close(terminate=True)
self.dbapi_connection = None
def get_connection(self) -> DBAPIConnection:
recycle = True
if recycle:
- self.__close()
+ self.__close(terminate=True)
self.info.clear() # type: ignore # our info is always present
self.__connect()
or (self._soft_invalidate_time > self.starttime)
)
- def __close(self) -> None:
+ def __close(self, *, terminate: bool = False) -> None:
self.finalize_callback.clear()
if self.__pool.dispatch.close:
self.__pool.dispatch.close(self.dbapi_connection, self)
assert self.dbapi_connection is not None
- self.__pool._close_connection(self.dbapi_connection)
+ self.__pool._close_connection(
+ self.dbapi_connection, terminate=terminate
+ )
self.dbapi_connection = None
def __connect(self) -> None:
dbapi_connection = connection_record.dbapi_connection
# null pool is not _is_asyncio but can be used also with async dialects
- dont_restore_gced = pool._dialect.is_async
+ dont_restore_gced = (
+ pool._dialect.is_async and not pool._dialect.has_terminate
+ )
if dont_restore_gced:
detach = connection_record is None or is_gc_cleanup
message = (
"The garbage collector is trying to clean up "
f"connection {dbapi_connection!r}. This feature is "
- "unsupported on async "
- "dbapi, since no IO can be performed at this stage to "
+ "unsupported on asyncio "
+ 'dbapis that lack a "terminate" feature, since no '
+ "IO can be performed at this stage to "
"reset the connection. Please close out all "
"connections when they are no longer used, calling "
"``close()`` or using a context manager to "
"Connection %r checked out from pool",
"Connection %r being returned to pool%s",
"Connection %s rollback-on-return",
- "Closing connection %r",
+ "%s connection %r",
]
+ (["Pool disposed. %s"] if dispose else []),
)
def _queuepool_dbapi_fixture(self, **kw):
dbapi = MockDBAPI()
_is_asyncio = kw.pop("_is_asyncio", False)
+ _has_terminate = kw.pop("_has_terminate", False)
p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw)
if _is_asyncio:
p._is_asyncio = True
p._dialect = _AsyncConnDialect()
+ if _has_terminate:
+ p._dialect.has_terminate = True
return dbapi, p
return p, canary
- def _checkin_event_fixture(self, _is_asyncio=False):
- p = self._queuepool_fixture(_is_asyncio=_is_asyncio)
+ def _checkin_event_fixture(self, _is_asyncio=False, _has_terminate=False):
+ p = self._queuepool_fixture(
+ _is_asyncio=_is_asyncio, _has_terminate=_has_terminate
+ )
canary = []
@event.listens_for(p, "checkin")
assert canary.call_args_list[0][0][0] is dbapi_con
assert canary.call_args_list[0][0][2] is exc
- @testing.combinations((True,), (False,))
- def test_checkin_event_gc(self, detach_gced):
- p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced)
+ @testing.combinations((True,), (False,), argnames="is_asyncio")
+ @testing.combinations((True,), (False,), argnames="has_terminate")
+ def test_checkin_event_gc(self, is_asyncio, has_terminate):
+ p, canary = self._checkin_event_fixture(
+ _is_asyncio=is_asyncio, _has_terminate=has_terminate
+ )
c1 = p.connect()
del c1
lazy_gc()
+ detach_gced = is_asyncio and not has_terminate
+
if detach_gced:
# "close_detached" is not called because for asyncio the
# connection is just lost.