]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Support aiosqlite 0.22.0+
authorFederico Caselli <cfederico87@gmail.com>
Wed, 24 Dec 2025 13:02:17 +0000 (14:02 +0100)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 1 Jan 2026 18:01:44 +0000 (13:01 -0500)
Fixed issue in the aiosqlite driver where SQLAlchemy's setting of
aiosqlite's worker thread to "daemon" stopped working because the aiosqlite
architecture moved the location of the worker thread in version 0.22.0.
This "daemon" flag is necessary so that a program is able to exit if the
SQLite connection itself was not explicitly closed, which is particularly
likely with SQLAlchemy as it maintains SQLite connections in a connection
pool.  While it's perfectly fine to call :meth:`.AsyncEngine.dispose`
before program exit, this is not historically or technically necessary for
any driver of any known backend, since a primary feature of relational
databases is durability.  The change also implements support for
"terminate" with aiosqlite when using version version 0.22.1 or greater,
which implements a sync ``.stop()`` method.

Fixes: #13039
Change-Id: I46efcbaab9dd028f673e113d5f6f2ceddfd133ca
(cherry picked from commit 380c234ce901416ca3c04453744f33d53cc4bd55)

doc/build/changelog/unreleased_20/13039.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/sqlite/aiosqlite.py
test/ext/asyncio/test_engine.py

diff --git a/doc/build/changelog/unreleased_20/13039.rst b/doc/build/changelog/unreleased_20/13039.rst
new file mode 100644 (file)
index 0000000..296a8aa
--- /dev/null
@@ -0,0 +1,16 @@
+.. change::
+    :tags: bug, sqlite
+    :tickets: 13039
+
+    Fixed issue in the aiosqlite driver where SQLAlchemy's setting of
+    aiosqlite's worker thread to "daemon" stopped working because the aiosqlite
+    architecture moved the location of the worker thread in version 0.22.0.
+    This "daemon" flag is necessary so that a program is able to exit if the
+    SQLite connection itself was not explicitly closed, which is particularly
+    likely with SQLAlchemy as it maintains SQLite connections in a connection
+    pool.  While it's perfectly fine to call :meth:`.AsyncEngine.dispose`
+    before program exit, this is not historically or technically necessary for
+    any driver of any known backend, since a primary feature of relational
+    databases is durability.  The change also implements support for
+    "terminate" with aiosqlite when using version version 0.22.1 or greater,
+    which implements a sync ``.stop()`` method.
index 63cf8190b7c100824c651f351509109a3eccac53..7df7e7d8ef3861ad5ada4a491e010bf57e5f7634 100644 (file)
@@ -82,6 +82,7 @@ from __future__ import annotations
 import asyncio
 from collections import deque
 from functools import partial
+from threading import Thread
 from types import ModuleType
 from typing import Any
 from typing import cast
@@ -98,6 +99,7 @@ from .pysqlite import SQLiteDialect_pysqlite
 from ... import pool
 from ... import util
 from ...connectors.asyncio import AsyncAdapt_dbapi_module
+from ...connectors.asyncio import AsyncAdapt_terminate
 from ...engine import AdaptedConnection
 from ...util.concurrency import await_fallback
 from ...util.concurrency import await_only
@@ -251,7 +253,7 @@ class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
         return self.await_(self._cursor.fetchall())
 
 
-class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
+class AsyncAdapt_aiosqlite_connection(AsyncAdapt_terminate, AdaptedConnection):
     await_ = staticmethod(await_only)
     __slots__ = ("dbapi",)
 
@@ -339,6 +341,24 @@ class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
         else:
             raise error
 
+    async def _terminate_graceful_close(self) -> None:
+        """Try to close connection gracefully"""
+        await self._connection.close()
+
+    def _terminate_force_close(self) -> None:
+        """Terminate the connection"""
+
+        # this was added in aiosqlite 0.22.1.  if stop() is not present,
+        # the dialect should indicate has_terminate=False
+        try:
+            meth = self._connection.stop
+        except AttributeError as ae:
+            raise NotImplementedError(
+                "terminate_force_close() not implemented by this DBAPI shim"
+            ) from ae
+        else:
+            meth()
+
 
 class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
     __slots__ = ()
@@ -351,6 +371,7 @@ class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
         self.aiosqlite = aiosqlite
         self.sqlite = sqlite
         self.paramstyle = "qmark"
+        self.has_stop = hasattr(aiosqlite.Connection, "stop")
         self._init_dbapi_attributes()
 
     def _init_dbapi_attributes(self) -> None:
@@ -380,8 +401,14 @@ class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
             connection = creator_fn(*arg, **kw)
         else:
             connection = self.aiosqlite.connect(*arg, **kw)
-            # it's a Thread.   you'll thank us later
-            connection.daemon = True
+
+            # aiosqlite uses a Thread.   you'll thank us later
+            if isinstance(connection, Thread):
+                # Connection itself was a thread in version prior to 0.22
+                connection.daemon = True
+            else:
+                # in 0.22+ instead it contains a thread.
+                connection._thread.daemon = True
 
         if util.asbool(async_fallback):
             return AsyncAdaptFallback_aiosqlite_connection(
@@ -405,11 +432,17 @@ class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
     supports_statement_cache = True
 
     is_async = True
+    has_terminate = True
 
     supports_server_side_cursors = True
 
     execution_ctx_cls = SQLiteExecutionContext_aiosqlite
 
+    def __init__(self, **kwargs: Any):
+        super().__init__(**kwargs)
+        if self.dbapi and not self.dbapi.has_stop:
+            self.has_terminate = False
+
     @classmethod
     def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
         return AsyncAdapt_aiosqlite_dbapi(
@@ -442,5 +475,8 @@ class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
     ) -> AsyncIODBAPIConnection:
         return connection._connection  # type: ignore[no-any-return]
 
+    def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+        dbapi_connection.terminate()
+
 
 dialect = SQLiteDialect_aiosqlite
index fef3a7f0b7c078edad1566392c03131dc61587e1..bb901970e296562a5003f219f5561edef566d1cc 100644 (file)
@@ -186,6 +186,10 @@ class EngineFixture(AsyncFixture, fixtures.TablesTest):
             asyncio=True, options={"sqlite_share_pool": True}
         )
 
+    @testing.fixture
+    def adhoc_async_engine(self):
+        return engines.testing_engine(asyncio=True)
+
     @testing.fixture
     def async_connection(self, async_engine):
         with async_engine.sync_engine.connect() as conn:
@@ -349,13 +353,13 @@ class AsyncEngineTest(EngineFixture):
 
     @testing.variation("simulate_gc", [True, False])
     def test_appropriate_warning_for_gced_connection(
-        self, async_engine, simulate_gc
+        self, adhoc_async_engine, simulate_gc
     ):
         """test #9237 which builds upon a not really complete solution
         added for #8419."""
 
         async def go():
-            conn = await async_engine.connect()
+            conn = await adhoc_async_engine.connect()
             await conn.begin()
             await conn.execute(select(1))
             pool_connection = await conn.get_raw_connection()
@@ -385,7 +389,7 @@ class AsyncEngineTest(EngineFixture):
                     None, rec, pool, ref, echo, transaction_was_reset=False
                 )
 
-            if async_engine.dialect.has_terminate:
+            if adhoc_async_engine.dialect.has_terminate:
                 expected_msg = (
                     "The garbage collector is trying to clean up.*which will "
                     "be terminated."
@@ -408,6 +412,15 @@ class AsyncEngineTest(EngineFixture):
 
             eq_(m.mock_calls, [])
 
+    @async_test
+    @testing.skip_if(lambda config: not config.db.dialect.has_terminate)
+    async def test_dbapi_terminate(self, adhoc_async_engine):
+
+        conn = await adhoc_async_engine.raw_connection()
+        dbapi_conn = conn.dbapi_connection
+        dbapi_conn.terminate()
+        conn.invalidate()
+
     @async_test
     async def test_statement_compile(self, async_engine):
         stmt = str(select(1).compile(async_engine))