]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Handle mappings passed to ``execution_options``.
authorFederico Caselli <cfederico87@gmail.com>
Thu, 26 Aug 2021 20:00:33 +0000 (22:00 +0200)
committerFederico Caselli <cfederico87@gmail.com>
Thu, 26 Aug 2021 20:00:58 +0000 (22:00 +0200)
Fixed a bug in :meth:`_asyncio.AsyncSession.execute` and
:meth:`_asyncio.AsyncSession.stream` that required ``execution_options``
to be an instance of ``immutabledict`` when defined. It now
correctly accepts any mapping.

Fixes: #6943
Change-Id: Ic09de480dc2da1b0bdce25acb60b8f01371971f9

doc/build/changelog/unreleased_14/6943.rst [new file with mode: 0644]
lib/sqlalchemy/ext/asyncio/session.py
lib/sqlalchemy/orm/persistence.py
lib/sqlalchemy/orm/session.py
test/ext/asyncio/test_session_py3k.py

diff --git a/doc/build/changelog/unreleased_14/6943.rst b/doc/build/changelog/unreleased_14/6943.rst
new file mode 100644 (file)
index 0000000..4b980d0
--- /dev/null
@@ -0,0 +1,8 @@
+.. change::
+    :tags: bug, asyncio
+    :tickets: 6943
+
+    Fixed a bug in :meth:`_asyncio.AsyncSession.execute` and
+    :meth:`_asyncio.AsyncSession.stream` that required ``execution_options``
+    to be an instance of ``immutabledict`` when defined. It now
+    correctly accepts any mapping.
index a10621eef356ffae301d86343c73c5c6a8ae18b3..5c6e7f5a7c23cedfacc046862c62d2d554591811 100644 (file)
@@ -14,6 +14,9 @@ from ...orm import Session
 from ...orm import state as _instance_state
 from ...util.concurrency import greenlet_spawn
 
+_EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True})
+_STREAM_OPTIONS = util.immutabledict({"stream_results": True})
+
 
 @util.create_proxy_methods(
     Session,
@@ -140,7 +143,12 @@ class AsyncSession(ReversibleProxy):
         """Execute a statement and return a buffered
         :class:`_engine.Result` object."""
 
-        execution_options = execution_options.union({"prebuffer_rows": True})
+        if execution_options:
+            execution_options = util.immutabledict(execution_options).union(
+                _EXECUTE_OPTIONS
+            )
+        else:
+            execution_options = _EXECUTE_OPTIONS
 
         return await greenlet_spawn(
             self.sync_session.execute,
@@ -205,7 +213,12 @@ class AsyncSession(ReversibleProxy):
         """Execute a statement and return a streaming
         :class:`_asyncio.AsyncResult` object."""
 
-        execution_options = execution_options.union({"stream_results": True})
+        if execution_options:
+            execution_options = util.immutabledict(execution_options).union(
+                _STREAM_OPTIONS
+            )
+        else:
+            execution_options = _STREAM_OPTIONS
 
         result = await greenlet_spawn(
             self.sync_session.execute,
index 4747d0bbacd12aac2f45fc06e20bcbe88ff969ae..fd484b52b30df3d690df97aefb0f5480bba94037 100644 (file)
@@ -1833,7 +1833,7 @@ class BulkUDCompileState(CompileState):
         return (
             statement,
             util.immutabledict(execution_options).union(
-                dict(_sa_orm_update_options=update_options)
+                {"_sa_orm_update_options": update_options}
             ),
         )
 
index af803a1b036dbcfc39365a8309dd9ecd7ba7056f..0bdd5cc959d4ee568cc469001b2f9bba366351b9 100644 (file)
@@ -1581,7 +1581,7 @@ class Session(_SessionClassMethods):
         :param execution_options: optional dictionary of execution options,
          which will be associated with the statement execution.  This
          dictionary can provide a subset of the options that are accepted
-         by :meth:`_future.Connection.execution_options`, and may also
+         by :meth:`_engine.Connection.execution_options`, and may also
          provide additional options understood only in an ORM context.
 
         :param bind_arguments: dictionary of additional arguments to determine
index 0883cb026d1b1c8aec46ebb50604c8b786f4a04b..ebedfedbfba0926a0ab3607d54bc0669c0e50f4d 100644 (file)
@@ -65,7 +65,10 @@ class AsyncSessionTest(AsyncFixture):
 
 class AsyncSessionQueryTest(AsyncFixture):
     @async_test
-    async def test_execute(self, async_session):
+    @testing.combinations(
+        {}, dict(execution_options={"logging_token": "test"}), argnames="kw"
+    )
+    async def test_execute(self, async_session, kw):
         User = self.classes.User
 
         stmt = (
@@ -74,7 +77,7 @@ class AsyncSessionQueryTest(AsyncFixture):
             .order_by(User.id)
         )
 
-        result = await async_session.execute(stmt)
+        result = await async_session.execute(stmt, **kw)
         eq_(result.scalars().all(), self.static.user_address_result)
 
     @async_test
@@ -103,7 +106,10 @@ class AsyncSessionQueryTest(AsyncFixture):
 
     @async_test
     @testing.requires.independent_cursors
-    async def test_stream_partitions(self, async_session):
+    @testing.combinations(
+        {}, dict(execution_options={"logging_token": "test"}), argnames="kw"
+    )
+    async def test_stream_partitions(self, async_session, kw):
         User = self.classes.User
 
         stmt = (
@@ -112,7 +118,7 @@ class AsyncSessionQueryTest(AsyncFixture):
             .order_by(User.id)
         )
 
-        result = await async_session.stream(stmt)
+        result = await async_session.stream(stmt, **kw)
 
         assert_result = []
         async for partition in result.scalars().partitions(3):