]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
send execution options to connection also
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 3 Jun 2026 13:46:19 +0000 (09:46 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 4 Jun 2026 17:08:24 +0000 (13:08 -0400)
Session level :paramref:`_orm.Session.execution_options` now take
effect for Core level SQL emitted by unit of work operations, in
addition to their existing use within ORM statement executions.
This is to provide for Core options such as
:paramref:`_engine.Connection.execution_options.schema_translate_map`
to be applicable to a :class:`.Session` overall.

Fixes: #13346
Change-Id: Icb453b7925f1bc38f30538d7726d222842bc65bd

doc/build/changelog/migration_21.rst
doc/build/changelog/unreleased_21/13346.rst [new file with mode: 0644]
doc/build/core/connections.rst
lib/sqlalchemy/orm/session.py
test/orm/test_session.py

index 166f80f9ec54e775987b38bd8726c04860d019dc..881e79a5d63ba6617bf2e9c913ed41cea7ef8048 100644 (file)
@@ -89,6 +89,64 @@ all types of SQL execution.
 :ticket:`9809`
 
 
+.. _change_13346:
+
+Session-level execution options applied to Connection at procurement time
+--------------------------------------------------------------------------
+
+Building on the session-level execution options feature introduced in
+:ticket:`12659`, the :paramref:`_orm.Session.execution_options` parameter
+now applies its options to the :class:`_engine.Connection` when it is first
+procured for a transaction, in addition to being merged into explicit query
+executions as before.  This means that execution options such as
+``schema_translate_map`` as well as custom user-defined options now take
+effect for **all** operations within the session, including:
+
+* Flush operations (INSERT/UPDATE/DELETE emitted by the unit of work)
+* Event hooks such as :meth:`_events.ConnectionEvents.before_cursor_execute`
+* Eager loader queries
+
+Previously, session-level execution options were only applied to explicit calls
+such as :meth:`_orm.Session.execute`, which meant that
+``schema_translate_map`` set on the :class:`_orm.Session` would not take effect
+for flush operations.  The prior workaround was to set ``schema_translate_map``
+on the :class:`_engine.Engine` itself, which remains supported.
+
+The new behavior allows ``schema_translate_map`` to be set directly on the
+:class:`_orm.Session`::
+
+    session = Session(
+        engine,
+        execution_options={"schema_translate_map": {None: "my_schema"}},
+    )
+
+    # schema_translate_map takes effect for both queries and flushes
+    session.add(MyObject(data="some data"))
+    session.commit()
+
+    results = session.scalars(select(MyObject)).all()
+
+Custom execution options that are consumed in event hooks such as
+:meth:`_events.ConnectionEvents.before_cursor_execute` are also available
+during flush operations::
+
+    session = Session(engine, execution_options={"my_audit_flag": True})
+
+
+    @event.listens_for(engine, "before_cursor_execute")
+    def receive_before_cursor_execute(
+        conn, cursor, statement, parameters, context, executemany
+    ):
+        if context.execution_options.get("my_audit_flag"):
+            log.info("Executing: %s", statement)
+
+
+    session.add(SomeObject())
+    session.flush()  # before_cursor_execute sees my_audit_flag=True
+
+:ticket:`13346`
+
+
 .. _change_10050:
 
 ORM Relationship allows callable for back_populates
diff --git a/doc/build/changelog/unreleased_21/13346.rst b/doc/build/changelog/unreleased_21/13346.rst
new file mode 100644 (file)
index 0000000..08210f8
--- /dev/null
@@ -0,0 +1,14 @@
+.. change::
+    :tags: usecase, orm
+    :tickets: 13346
+
+    Session level :paramref:`_orm.Session.execution_options` now take
+    effect for Core level SQL emitted by unit of work operations, in
+    addition to their existing use within ORM statement executions.
+    This is to provide for Core options such as
+    :paramref:`_engine.Connection.execution_options.schema_translate_map`
+    to be applicable to a :class:`.Session` overall.
+
+    .. seealso::
+
+        :ref:`change_13346`
index d13046305a28e1fb08eff56fff3a2b960c9f3010..8d41d5d8a9c267558443d7da721958c301f72780 100644 (file)
@@ -885,15 +885,34 @@ as the schema name is passed to these methods explicitly.
 .. tip::
 
   To use the schema translation feature with the ORM :class:`_orm.Session`,
-  set this option at the level of the :class:`_engine.Engine`, then pass that engine
-  to the :class:`_orm.Session`.  The :class:`_orm.Session` uses a new
-  :class:`_engine.Connection` for each transaction::
+  pass ``schema_translate_map`` as part of the
+  :paramref:`_orm.Session.execution_options` parameter.  The map will be
+  applied to the :class:`_engine.Connection` when it is first procured for
+  the transaction, so that it takes effect for all operations including
+  queries, flushes (INSERT/UPDATE/DELETE), and event hooks::
+
+      session = Session(
+          engine,
+          execution_options={"schema_translate_map": {None: "my_schema"}},
+      )
+
+      # schema_translate_map is applied to both queries and flush operations
+      session.add(MyObject(data="some data"))
+      session.commit()
+
+  The map may alternatively be set at the :class:`_engine.Engine` level,
+  which applies it to all connections and sessions that use that engine::
 
       schema_engine = engine.execution_options(schema_translate_map={...})
 
       session = Session(schema_engine)
 
-      ...
+  .. versionchanged:: 2.1.0b3
+     Session-level execution options including ``schema_translate_map`` are
+     now applied to the :class:`_engine.Connection` at procurement time,
+     so that they take effect for flush operations.  Previously, setting
+     ``schema_translate_map`` on the :class:`_orm.Session` required that
+     it be set at the :class:`_engine.Engine` level instead.
 
   .. warning::
 
index eae68db6e6e96ab030fff02dab62485aca92a707..5b71b214376327ea43be235b371302b35310cac5 100644 (file)
@@ -1203,8 +1203,13 @@ class SessionTransaction(_StateChange, TransactionalContext):
                     local_connect = True
 
             try:
+                conn_exec_opts: Dict[str, Any] = {}
+                if self.session.execution_options:
+                    conn_exec_opts.update(self.session.execution_options)
                 if execution_options:
-                    conn = conn.execution_options(**execution_options)
+                    conn_exec_opts.update(execution_options)
+                if conn_exec_opts:
+                    conn = conn.execution_options(**conn_exec_opts)
 
                 transaction: Transaction
                 if self.session.twophase and self._parent is None:
@@ -1622,14 +1627,27 @@ class Session(_SessionClassMethods, EventTarget):
               use of this extension within their own code.
 
         :param execution_options: optional dictionary of execution options
-           that will be applied to all calls to :meth:`_orm.Session.execute`,
-           :meth:`_orm.Session.scalars`, and similar.  Execution options
-           present in statements as well as options passed to methods like
-           :meth:`_orm.Session.execute` explicitly take precedence over
-           the session-wide options.
+           that will be applied to the :class:`_engine.Connection` when first
+           procured for a transaction, as well as to all explicit query
+           executions such as :meth:`_orm.Session.execute`,
+           :meth:`_orm.Session.scalars`, and similar.  This includes
+           flush (INSERT/UPDATE/DELETE) operations and is visible within
+           event hooks such as
+           :meth:`_events.ConnectionEvents.before_cursor_execute`.
+
+           Execution options present in statements as well as options passed
+           to methods like :meth:`_orm.Session.execute` explicitly take
+           precedence over the session-wide options.
 
            .. versionadded:: 2.1
 
+           .. versionchanged:: 2.1.0b3
+              Session-level execution options are now applied to the
+              :class:`_engine.Connection` at procurement time, so that
+              they take effect for flush operations as well as explicit
+              query executions. Previously, options were only applied to
+              explicit calls such as :meth:`_orm.Session.execute`.
+
         :param expire_on_commit:  Defaults to ``True``. When ``True``, all
            instances will be fully expired after each :meth:`~.commit`,
            so that all attribute/object access subsequent to a completed
index 065ca8145ea594fdc4af91dc5bfc7c9ae5306b18..da85cef88d945382fd34f61f48a7005564d999ee 100644 (file)
@@ -863,6 +863,118 @@ class SessionStateTest(_fixtures.FixtureTest):
         for key, value in expected_opts.items():
             eq_(gather_options[key], value)
 
+    @testing.variation("operation", ["insert", "update", "delete"])
+    def test_execution_options_flush_before_cursor_execute(
+        self,
+        operation: testing.Variation,
+    ):
+        """test #13346 - custom session execution options are visible
+        in before_cursor_execute during flush operations"""
+
+        users, User = self.tables.users, self.classes.User
+        self.mapper_registry.map_imperatively(User, users)
+
+        sess = Session(
+            testing.db,
+            execution_options={
+                "my_custom_opt": "my_value",
+                "my_other_opt": 42,
+            },
+        )
+
+        gather_options = []
+
+        @event.listens_for(sess.get_bind(), "before_cursor_execute")
+        def before_cursor_execute(
+            conn, cursor, statement, parameters, context, executemany
+        ):
+            gather_options.append(
+                {
+                    k: v
+                    for k, v in context.execution_options.items()
+                    if k in ("my_custom_opt", "my_other_opt")
+                }
+            )
+
+        expected = {
+            "my_custom_opt": "my_value",
+            "my_other_opt": 42,
+        }
+
+        if operation.insert:
+            sess.add(User(name="u1"))
+            sess.flush()
+        elif operation.update:
+            sess.add(User(name="u1"))
+            sess.flush()
+            gather_options.clear()
+            u1 = sess.scalars(select(User)).first()
+            gather_options.clear()
+            u1.name = "u1modified"
+            sess.flush()
+        elif operation.delete:
+            sess.add(User(name="u1"))
+            sess.flush()
+            gather_options.clear()
+            u1 = sess.scalars(select(User)).first()
+            gather_options.clear()
+            sess.delete(u1)
+            sess.flush()
+        else:
+            operation.fail()
+
+        sess.close()
+
+        eq_(gather_options, [expected])
+
+    def test_execution_options_flush_and_execute(self):
+        """test #13346 - session execution options are visible in both
+        do_orm_execute (explicit queries) and before_cursor_execute
+        (flush and queries) for the same session"""
+
+        users, User = self.tables.users, self.classes.User
+        self.mapper_registry.map_imperatively(User, users)
+
+        sess = Session(
+            testing.db,
+            execution_options={
+                "my_custom_opt": "my_value",
+            },
+        )
+
+        cursor_options = []
+        orm_options = []
+
+        @event.listens_for(sess.get_bind(), "before_cursor_execute")
+        def before_cursor_execute(
+            conn, cursor, statement, parameters, context, executemany
+        ):
+            cursor_options.append(
+                context.execution_options.get("my_custom_opt")
+            )
+
+        @event.listens_for(sess, "do_orm_execute")
+        def do_orm_execute(ctx: ORMExecuteState) -> None:
+            orm_options.append(ctx.execution_options.get("my_custom_opt"))
+
+        sess.add(User(name="u1"))
+        sess.flush()
+
+        # flush fires before_cursor_execute but not do_orm_execute
+        is_true(len(cursor_options) > 0)
+        eq_(cursor_options, ["my_value"] * len(cursor_options))
+        eq_(orm_options, [])
+
+        cursor_options.clear()
+
+        # explicit execute fires both
+        sess.execute(select(User))
+        is_true(len(cursor_options) > 0)
+        eq_(cursor_options, ["my_value"] * len(cursor_options))
+        eq_(orm_options, ["my_value"])
+
+        sess.close()
+
     @testing.combinations(
         ("default", None, {}, None),
         ("arg_true", True, {}, True),
@@ -1607,6 +1719,122 @@ class SessionStateTest(_fixtures.FixtureTest):
             assertions.in_(u2, s)
 
 
+class SessionSchemaTranslateTest(
+    fixtures.MappedTest, testing.AssertsExecutionResults
+):
+    __requires__ = ("schemas",)
+    __sparse_driver_backend__ = True
+    run_inserts = None
+    run_setup_mappers = "each"
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            "users",
+            metadata,
+            Column(
+                "id",
+                Integer,
+                primary_key=True,
+                test_needs_autoincrement=True,
+            ),
+            Column("name", String(30), nullable=False),
+            schema=config.test_schema,
+        )
+
+    @classmethod
+    def setup_classes(cls):
+        class User(cls.Basic):
+            pass
+
+    @classmethod
+    def setup_mappers(cls):
+        User = cls.classes.User
+        user_table = Table(
+            "users",
+            sa.MetaData(),
+            Column(
+                "id",
+                Integer,
+                primary_key=True,
+                test_needs_autoincrement=True,
+            ),
+            Column("name", String(30), nullable=False),
+            schema="placeholder",
+        )
+        cls.mapper_registry.map_imperatively(User, user_table)
+
+    @testing.variation("operation", ["insert", "update", "delete"])
+    def test_schema_translate_map_flush(
+        self,
+        operation: testing.Variation,
+        connection,
+    ):
+        """test #13346 - schema_translate_map on Session
+        execution_options works for flush operations"""
+
+        User = self.classes.User
+
+        sess = Session(
+            connection,
+            execution_options={
+                "schema_translate_map": {"placeholder": config.test_schema},
+            },
+        )
+
+        if operation.insert:
+            sess.add(User(name="u1"))
+            sess.flush()
+            eq_(
+                sess.connection()
+                .execute(
+                    sa.text(f"SELECT name FROM {config.test_schema}.users")
+                )
+                .fetchall(),
+                [("u1",)],
+            )
+        elif operation.update:
+            sess.add(User(name="u1"))
+            sess.flush()
+            u1 = sess.scalars(
+                select(User).execution_options(
+                    schema_translate_map={"placeholder": config.test_schema}
+                )
+            ).first()
+            u1.name = "u1modified"
+            sess.flush()
+            eq_(
+                sess.connection()
+                .execute(
+                    sa.text(f"SELECT name FROM {config.test_schema}.users")
+                )
+                .fetchall(),
+                [("u1modified",)],
+            )
+        elif operation.delete:
+            sess.add(User(name="u1"))
+            sess.flush()
+            u1 = sess.scalars(
+                select(User).execution_options(
+                    schema_translate_map={"placeholder": config.test_schema}
+                )
+            ).first()
+            sess.delete(u1)
+            sess.flush()
+            eq_(
+                sess.connection()
+                .execute(
+                    sa.text(f"SELECT name FROM {config.test_schema}.users")
+                )
+                .fetchall(),
+                [],
+            )
+        else:
+            operation.fail()
+
+        sess.close()
+
+
 class DeferredRelationshipExpressionTest(_fixtures.FixtureTest):
     run_inserts = None
     run_deletes = "each"