--- /dev/null
+.. change::
+ :tags: bug, sqlite
+ :tickets: 13039
+
+ Fixed issue in the aiosqlite driver where SQLAlchemy's setting of
+ aiosqlite's worker thread to "daemon" stopped working because the aiosqlite
+ architecture moved the location of the worker thread in version 0.22.0.
+ This "daemon" flag is necessary so that a program is able to exit if the
+ SQLite connection itself was not explicitly closed, which is particularly
+ likely with SQLAlchemy as it maintains SQLite connections in a connection
+ pool. While it's perfectly fine to call :meth:`.AsyncEngine.dispose`
+ before program exit, this is not historically or technically necessary for
+ any driver of any known backend, since a primary feature of relational
+ databases is durability. The change also implements support for
+ "terminate" with aiosqlite when using version version 0.22.1 or greater,
+ which implements a sync ``.stop()`` method.
import asyncio
from functools import partial
+from threading import Thread
from types import ModuleType
from typing import Any
from typing import cast
from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
from ...connectors.asyncio import AsyncAdapt_dbapi_module
from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
+from ...connectors.asyncio import AsyncAdapt_terminate
from ...util.concurrency import await_
if TYPE_CHECKING:
__slots__ = ()
-class AsyncAdapt_aiosqlite_connection(AsyncAdapt_dbapi_connection):
+class AsyncAdapt_aiosqlite_connection(
+ AsyncAdapt_terminate, AsyncAdapt_dbapi_connection
+):
__slots__ = ()
_cursor_cls = AsyncAdapt_aiosqlite_cursor
else:
super()._handle_exception_no_connection(dbapi, error)
+ async def _terminate_graceful_close(self) -> None:
+ """Try to close connection gracefully"""
+ await self._connection.close()
+
+ def _terminate_force_close(self) -> None:
+ """Terminate the connection"""
+
+ # this was added in aiosqlite 0.22.1. if stop() is not present,
+ # the dialect should indicate has_terminate=False
+ try:
+ meth = self._connection.stop
+ except AttributeError as ae:
+ raise NotImplementedError(
+ "terminate_force_close() not implemented by this DBAPI shim"
+ ) from ae
+ else:
+ meth()
+
class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
self.aiosqlite = aiosqlite
self.sqlite = sqlite
self.paramstyle = "qmark"
+ self.has_stop = hasattr(aiosqlite.Connection, "stop")
self._init_dbapi_attributes()
def _init_dbapi_attributes(self) -> None:
connection = creator_fn(*arg, **kw)
else:
connection = self.aiosqlite.connect(*arg, **kw)
- # it's a Thread. you'll thank us later
- connection.daemon = True
- return AsyncAdapt_aiosqlite_connection(
- self,
- await_(connection),
- )
+ # aiosqlite uses a Thread. you'll thank us later
+ if isinstance(connection, Thread):
+ # Connection itself was a thread in version prior to 0.22
+ connection.daemon = True
+ else:
+ # in 0.22+ instead it contains a thread.
+ connection._thread.daemon = True
+
+ return AsyncAdapt_aiosqlite_connection(self, await_(connection))
class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
supports_statement_cache = True
is_async = True
+ has_terminate = True
supports_server_side_cursors = True
execution_ctx_cls = SQLiteExecutionContext_aiosqlite
+ def __init__(self, **kwargs: Any):
+ super().__init__(**kwargs)
+ if self.dbapi and not self.dbapi.has_stop:
+ self.has_terminate = False
+
@classmethod
def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
return AsyncAdapt_aiosqlite_dbapi(
) -> AsyncIODBAPIConnection:
return connection._connection # type: ignore[no-any-return]
+ def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+ dbapi_connection.terminate()
+
dialect = SQLiteDialect_aiosqlite
asyncio=True, options={"sqlite_share_pool": True}
)
+ @testing.fixture
+ def adhoc_async_engine(self):
+ return engines.testing_engine(asyncio=True)
+
@testing.fixture
def async_connection(self, async_engine):
with async_engine.sync_engine.connect() as conn:
@testing.variation("simulate_gc", [True, False])
def test_appropriate_warning_for_gced_connection(
- self, async_engine, simulate_gc
+ self, adhoc_async_engine, simulate_gc
):
"""test #9237 which builds upon a not really complete solution
added for #8419."""
async def go():
- conn = await async_engine.connect()
+ conn = await adhoc_async_engine.connect()
await conn.begin()
await conn.execute(select(1))
pool_connection = await conn.get_raw_connection()
None, rec, pool, ref, echo, transaction_was_reset=False
)
- if async_engine.dialect.has_terminate:
+ if adhoc_async_engine.dialect.has_terminate:
expected_msg = (
"The garbage collector is trying to clean up.*which will "
"be terminated."
eq_(m.mock_calls, [])
+ @async_test
+ @testing.skip_if(lambda config: not config.db.dialect.has_terminate)
+ async def test_dbapi_terminate(self, adhoc_async_engine):
+
+ conn = await adhoc_async_engine.raw_connection()
+ dbapi_conn = conn.dbapi_connection
+ dbapi_conn.terminate()
+ conn.invalidate()
+
@async_test
async def test_statement_compile(self, async_engine):
stmt = str(select(1).compile(async_engine))