From: Mike Bayer Date: Wed, 3 Jun 2026 13:46:19 +0000 (-0400) Subject: send execution options to connection also X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=01194229387ab3d5e43d8395ce368b858dcd2b69;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git send execution options to connection also Session level :paramref:`_orm.Session.execution_options` now take effect for Core level SQL emitted by unit of work operations, in addition to their existing use within ORM statement executions. This is to provide for Core options such as :paramref:`_engine.Connection.execution_options.schema_translate_map` to be applicable to a :class:`.Session` overall. Fixes: #13346 Change-Id: Icb453b7925f1bc38f30538d7726d222842bc65bd --- diff --git a/doc/build/changelog/migration_21.rst b/doc/build/changelog/migration_21.rst index 166f80f9ec..881e79a5d6 100644 --- a/doc/build/changelog/migration_21.rst +++ b/doc/build/changelog/migration_21.rst @@ -89,6 +89,64 @@ all types of SQL execution. :ticket:`9809` +.. _change_13346: + +Session-level execution options applied to Connection at procurement time +-------------------------------------------------------------------------- + +Building on the session-level execution options feature introduced in +:ticket:`12659`, the :paramref:`_orm.Session.execution_options` parameter +now applies its options to the :class:`_engine.Connection` when it is first +procured for a transaction, in addition to being merged into explicit query +executions as before. This means that execution options such as +``schema_translate_map`` as well as custom user-defined options now take +effect for **all** operations within the session, including: + +* Flush operations (INSERT/UPDATE/DELETE emitted by the unit of work) +* Event hooks such as :meth:`_events.ConnectionEvents.before_cursor_execute` +* Eager loader queries + +Previously, session-level execution options were only applied to explicit calls +such as :meth:`_orm.Session.execute`, which meant that +``schema_translate_map`` set on the :class:`_orm.Session` would not take effect +for flush operations. The prior workaround was to set ``schema_translate_map`` +on the :class:`_engine.Engine` itself, which remains supported. + +The new behavior allows ``schema_translate_map`` to be set directly on the +:class:`_orm.Session`:: + + session = Session( + engine, + execution_options={"schema_translate_map": {None: "my_schema"}}, + ) + + # schema_translate_map takes effect for both queries and flushes + session.add(MyObject(data="some data")) + session.commit() + + results = session.scalars(select(MyObject)).all() + +Custom execution options that are consumed in event hooks such as +:meth:`_events.ConnectionEvents.before_cursor_execute` are also available +during flush operations:: + + session = Session(engine, execution_options={"my_audit_flag": True}) + + + @event.listens_for(engine, "before_cursor_execute") + def receive_before_cursor_execute( + conn, cursor, statement, parameters, context, executemany + ): + if context.execution_options.get("my_audit_flag"): + log.info("Executing: %s", statement) + + + session.add(SomeObject()) + session.flush() # before_cursor_execute sees my_audit_flag=True + +:ticket:`13346` + + .. _change_10050: ORM Relationship allows callable for back_populates diff --git a/doc/build/changelog/unreleased_21/13346.rst b/doc/build/changelog/unreleased_21/13346.rst new file mode 100644 index 0000000000..08210f85b0 --- /dev/null +++ b/doc/build/changelog/unreleased_21/13346.rst @@ -0,0 +1,14 @@ +.. change:: + :tags: usecase, orm + :tickets: 13346 + + Session level :paramref:`_orm.Session.execution_options` now take + effect for Core level SQL emitted by unit of work operations, in + addition to their existing use within ORM statement executions. + This is to provide for Core options such as + :paramref:`_engine.Connection.execution_options.schema_translate_map` + to be applicable to a :class:`.Session` overall. + + .. seealso:: + + :ref:`change_13346` diff --git a/doc/build/core/connections.rst b/doc/build/core/connections.rst index d13046305a..8d41d5d8a9 100644 --- a/doc/build/core/connections.rst +++ b/doc/build/core/connections.rst @@ -885,15 +885,34 @@ as the schema name is passed to these methods explicitly. .. tip:: To use the schema translation feature with the ORM :class:`_orm.Session`, - set this option at the level of the :class:`_engine.Engine`, then pass that engine - to the :class:`_orm.Session`. The :class:`_orm.Session` uses a new - :class:`_engine.Connection` for each transaction:: + pass ``schema_translate_map`` as part of the + :paramref:`_orm.Session.execution_options` parameter. The map will be + applied to the :class:`_engine.Connection` when it is first procured for + the transaction, so that it takes effect for all operations including + queries, flushes (INSERT/UPDATE/DELETE), and event hooks:: + + session = Session( + engine, + execution_options={"schema_translate_map": {None: "my_schema"}}, + ) + + # schema_translate_map is applied to both queries and flush operations + session.add(MyObject(data="some data")) + session.commit() + + The map may alternatively be set at the :class:`_engine.Engine` level, + which applies it to all connections and sessions that use that engine:: schema_engine = engine.execution_options(schema_translate_map={...}) session = Session(schema_engine) - ... + .. versionchanged:: 2.1.0b3 + Session-level execution options including ``schema_translate_map`` are + now applied to the :class:`_engine.Connection` at procurement time, + so that they take effect for flush operations. Previously, setting + ``schema_translate_map`` on the :class:`_orm.Session` required that + it be set at the :class:`_engine.Engine` level instead. .. warning:: diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index eae68db6e6..5b71b21437 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -1203,8 +1203,13 @@ class SessionTransaction(_StateChange, TransactionalContext): local_connect = True try: + conn_exec_opts: Dict[str, Any] = {} + if self.session.execution_options: + conn_exec_opts.update(self.session.execution_options) if execution_options: - conn = conn.execution_options(**execution_options) + conn_exec_opts.update(execution_options) + if conn_exec_opts: + conn = conn.execution_options(**conn_exec_opts) transaction: Transaction if self.session.twophase and self._parent is None: @@ -1622,14 +1627,27 @@ class Session(_SessionClassMethods, EventTarget): use of this extension within their own code. :param execution_options: optional dictionary of execution options - that will be applied to all calls to :meth:`_orm.Session.execute`, - :meth:`_orm.Session.scalars`, and similar. Execution options - present in statements as well as options passed to methods like - :meth:`_orm.Session.execute` explicitly take precedence over - the session-wide options. + that will be applied to the :class:`_engine.Connection` when first + procured for a transaction, as well as to all explicit query + executions such as :meth:`_orm.Session.execute`, + :meth:`_orm.Session.scalars`, and similar. This includes + flush (INSERT/UPDATE/DELETE) operations and is visible within + event hooks such as + :meth:`_events.ConnectionEvents.before_cursor_execute`. + + Execution options present in statements as well as options passed + to methods like :meth:`_orm.Session.execute` explicitly take + precedence over the session-wide options. .. versionadded:: 2.1 + .. versionchanged:: 2.1.0b3 + Session-level execution options are now applied to the + :class:`_engine.Connection` at procurement time, so that + they take effect for flush operations as well as explicit + query executions. Previously, options were only applied to + explicit calls such as :meth:`_orm.Session.execute`. + :param expire_on_commit: Defaults to ``True``. When ``True``, all instances will be fully expired after each :meth:`~.commit`, so that all attribute/object access subsequent to a completed diff --git a/test/orm/test_session.py b/test/orm/test_session.py index 065ca8145e..da85cef88d 100644 --- a/test/orm/test_session.py +++ b/test/orm/test_session.py @@ -863,6 +863,118 @@ class SessionStateTest(_fixtures.FixtureTest): for key, value in expected_opts.items(): eq_(gather_options[key], value) + @testing.variation("operation", ["insert", "update", "delete"]) + def test_execution_options_flush_before_cursor_execute( + self, + operation: testing.Variation, + ): + """test #13346 - custom session execution options are visible + in before_cursor_execute during flush operations""" + + users, User = self.tables.users, self.classes.User + self.mapper_registry.map_imperatively(User, users) + + sess = Session( + testing.db, + execution_options={ + "my_custom_opt": "my_value", + "my_other_opt": 42, + }, + ) + + gather_options = [] + + @event.listens_for(sess.get_bind(), "before_cursor_execute") + def before_cursor_execute( + conn, cursor, statement, parameters, context, executemany + ): + gather_options.append( + { + k: v + for k, v in context.execution_options.items() + if k in ("my_custom_opt", "my_other_opt") + } + ) + + expected = { + "my_custom_opt": "my_value", + "my_other_opt": 42, + } + + if operation.insert: + sess.add(User(name="u1")) + sess.flush() + elif operation.update: + sess.add(User(name="u1")) + sess.flush() + gather_options.clear() + u1 = sess.scalars(select(User)).first() + gather_options.clear() + u1.name = "u1modified" + sess.flush() + elif operation.delete: + sess.add(User(name="u1")) + sess.flush() + gather_options.clear() + u1 = sess.scalars(select(User)).first() + gather_options.clear() + sess.delete(u1) + sess.flush() + else: + operation.fail() + + sess.close() + + eq_(gather_options, [expected]) + + def test_execution_options_flush_and_execute(self): + """test #13346 - session execution options are visible in both + do_orm_execute (explicit queries) and before_cursor_execute + (flush and queries) for the same session""" + + users, User = self.tables.users, self.classes.User + self.mapper_registry.map_imperatively(User, users) + + sess = Session( + testing.db, + execution_options={ + "my_custom_opt": "my_value", + }, + ) + + cursor_options = [] + orm_options = [] + + @event.listens_for(sess.get_bind(), "before_cursor_execute") + def before_cursor_execute( + conn, cursor, statement, parameters, context, executemany + ): + cursor_options.append( + context.execution_options.get("my_custom_opt") + ) + + @event.listens_for(sess, "do_orm_execute") + def do_orm_execute(ctx: ORMExecuteState) -> None: + orm_options.append(ctx.execution_options.get("my_custom_opt")) + + sess.add(User(name="u1")) + sess.flush() + + # flush fires before_cursor_execute but not do_orm_execute + is_true(len(cursor_options) > 0) + eq_(cursor_options, ["my_value"] * len(cursor_options)) + eq_(orm_options, []) + + cursor_options.clear() + + # explicit execute fires both + sess.execute(select(User)) + is_true(len(cursor_options) > 0) + eq_(cursor_options, ["my_value"] * len(cursor_options)) + eq_(orm_options, ["my_value"]) + + sess.close() + @testing.combinations( ("default", None, {}, None), ("arg_true", True, {}, True), @@ -1607,6 +1719,122 @@ class SessionStateTest(_fixtures.FixtureTest): assertions.in_(u2, s) +class SessionSchemaTranslateTest( + fixtures.MappedTest, testing.AssertsExecutionResults +): + __requires__ = ("schemas",) + __sparse_driver_backend__ = True + run_inserts = None + run_setup_mappers = "each" + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column( + "id", + Integer, + primary_key=True, + test_needs_autoincrement=True, + ), + Column("name", String(30), nullable=False), + schema=config.test_schema, + ) + + @classmethod + def setup_classes(cls): + class User(cls.Basic): + pass + + @classmethod + def setup_mappers(cls): + User = cls.classes.User + user_table = Table( + "users", + sa.MetaData(), + Column( + "id", + Integer, + primary_key=True, + test_needs_autoincrement=True, + ), + Column("name", String(30), nullable=False), + schema="placeholder", + ) + cls.mapper_registry.map_imperatively(User, user_table) + + @testing.variation("operation", ["insert", "update", "delete"]) + def test_schema_translate_map_flush( + self, + operation: testing.Variation, + connection, + ): + """test #13346 - schema_translate_map on Session + execution_options works for flush operations""" + + User = self.classes.User + + sess = Session( + connection, + execution_options={ + "schema_translate_map": {"placeholder": config.test_schema}, + }, + ) + + if operation.insert: + sess.add(User(name="u1")) + sess.flush() + eq_( + sess.connection() + .execute( + sa.text(f"SELECT name FROM {config.test_schema}.users") + ) + .fetchall(), + [("u1",)], + ) + elif operation.update: + sess.add(User(name="u1")) + sess.flush() + u1 = sess.scalars( + select(User).execution_options( + schema_translate_map={"placeholder": config.test_schema} + ) + ).first() + u1.name = "u1modified" + sess.flush() + eq_( + sess.connection() + .execute( + sa.text(f"SELECT name FROM {config.test_schema}.users") + ) + .fetchall(), + [("u1modified",)], + ) + elif operation.delete: + sess.add(User(name="u1")) + sess.flush() + u1 = sess.scalars( + select(User).execution_options( + schema_translate_map={"placeholder": config.test_schema} + ) + ).first() + sess.delete(u1) + sess.flush() + eq_( + sess.connection() + .execute( + sa.text(f"SELECT name FROM {config.test_schema}.users") + ) + .fetchall(), + [], + ) + else: + operation.fail() + + sess.close() + + class DeferredRelationshipExpressionTest(_fixtures.FixtureTest): run_inserts = None run_deletes = "each"