--- /dev/null
+.. change::
+ :tags: usecase, asyncio
+ :tickets: 12273
+
+ Generalize the terminate logic employed by the asyncpg dialect to reuse
+ it in the aiomysql and asyncmy dialect implementation.
from typing import NoReturn
from typing import Optional
from typing import Sequence
+from typing import Tuple
+from typing import Type
from typing import TYPE_CHECKING
from ..engine import AdaptedConnection
__slots__ = ()
await_ = staticmethod(await_fallback)
+
+
+class AsyncAdapt_terminate:
+ """Mixin for a AsyncAdapt_dbapi_connection to add terminate support."""
+
+ __slots__ = ()
+
+ def terminate(self) -> None:
+ if in_greenlet():
+ # in a greenlet; this is the connection was invalidated case.
+ try:
+ # try to gracefully close; see #10717
+ self.await_(asyncio.shield(self._terminate_graceful_close())) # type: ignore[attr-defined] # noqa: E501
+ except self._terminate_handled_exceptions() as e:
+ # in the case where we are recycling an old connection
+ # that may have already been disconnected, close() will
+ # fail. In this case, terminate
+ # the connection without any further waiting.
+ # see issue #8419
+ self._terminate_force_close()
+ if isinstance(e, asyncio.CancelledError):
+ # re-raise CancelledError if we were cancelled
+ raise
+ else:
+ # not in a greenlet; this is the gc cleanup case
+ self._terminate_force_close()
+
+ def _terminate_handled_exceptions(self) -> Tuple[Type[BaseException], ...]:
+ """Returns the exceptions that should be handled when
+ calling _graceful_close.
+ """
+ return (asyncio.TimeoutError, asyncio.CancelledError, OSError)
+
+ async def _terminate_graceful_close(self) -> None:
+ """Try to close connection gracefully"""
+ raise NotImplementedError
+
+ def _terminate_force_close(self) -> None:
+ """Terminate the connection"""
+ raise NotImplementedError
from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
from ...connectors.asyncio import AsyncAdapt_dbapi_module
from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
+from ...connectors.asyncio import AsyncAdapt_terminate
from ...util.concurrency import await_fallback
from ...util.concurrency import await_only
)
-class AsyncAdapt_aiomysql_connection(AsyncAdapt_dbapi_connection):
+class AsyncAdapt_aiomysql_connection(
+ AsyncAdapt_terminate, AsyncAdapt_dbapi_connection
+):
__slots__ = ()
_cursor_cls = AsyncAdapt_aiomysql_cursor
def get_autocommit(self) -> bool:
return self._connection.get_autocommit() # type: ignore
- def terminate(self) -> None:
- # it's not awaitable.
- self._connection.close()
-
def close(self) -> None:
self.await_(self._connection.ensure_closed())
+ async def _terminate_graceful_close(self) -> None:
+ await self._connection.ensure_closed()
+
+ def _terminate_force_close(self) -> None:
+ # it's not awaitable.
+ self._connection.close()
+
class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection):
__slots__ = ()
from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
from ...connectors.asyncio import AsyncAdapt_dbapi_module
from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
+from ...connectors.asyncio import AsyncAdapt_terminate
from ...util.concurrency import await_fallback
from ...util.concurrency import await_only
)
-class AsyncAdapt_asyncmy_connection(AsyncAdapt_dbapi_connection):
+class AsyncAdapt_asyncmy_connection(
+ AsyncAdapt_terminate, AsyncAdapt_dbapi_connection
+):
__slots__ = ()
_cursor_cls = AsyncAdapt_asyncmy_cursor
def get_autocommit(self) -> bool:
return self._connection.get_autocommit() # type: ignore
- def terminate(self) -> None:
- # it's not awaitable.
- self._connection.close()
-
def close(self) -> None:
self.await_(self._connection.ensure_closed())
+ async def _terminate_graceful_close(self) -> None:
+ await self._connection.ensure_closed()
+
+ def _terminate_force_close(self) -> None:
+ # it's not awaitable.
+ self._connection.close()
+
class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection):
__slots__ = ()
from ... import exc
from ... import pool
from ... import util
+from ...connectors.asyncio import AsyncAdapt_terminate
from ...engine import AdaptedConnection
from ...engine import processors
from ...sql import sqltypes
)
-class AsyncAdapt_asyncpg_connection(AdaptedConnection):
+class AsyncAdapt_asyncpg_connection(AsyncAdapt_terminate, AdaptedConnection):
__slots__ = (
"dbapi",
"isolation_level",
self.await_(self._connection.close())
- def terminate(self):
- if util.concurrency.in_greenlet():
- # in a greenlet; this is the connection was invalidated
- # case.
- try:
- # try to gracefully close; see #10717
- # timeout added in asyncpg 0.14.0 December 2017
- self.await_(asyncio.shield(self._connection.close(timeout=2)))
- except (
- asyncio.TimeoutError,
- asyncio.CancelledError,
- OSError,
- self.dbapi.asyncpg.PostgresError,
- ) as e:
- # in the case where we are recycling an old connection
- # that may have already been disconnected, close() will
- # fail with the above timeout. in this case, terminate
- # the connection without any further waiting.
- # see issue #8419
- self._connection.terminate()
- if isinstance(e, asyncio.CancelledError):
- # re-raise CancelledError if we were cancelled
- raise
- else:
- # not in a greenlet; this is the gc cleanup case
- self._connection.terminate()
+ def _terminate_handled_exceptions(self):
+ return super()._terminate_handled_exceptions() + (
+ self.dbapi.asyncpg.PostgresError,
+ )
+
+ async def _terminate_graceful_close(self) -> None:
+ # timeout added in asyncpg 0.14.0 December 2017
+ await self._connection.close(timeout=2)
+ self._started = False
+
+ def _terminate_force_close(self) -> None:
+ self._connection.terminate()
self._started = False
@staticmethod