From 27c0dafbeb26e6317663629cc61dd6fe2afb4d2a Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Tue, 26 Aug 2025 16:54:39 -0400 Subject: [PATCH] add session-wide execution_options Added support for per-session execution options that are merged into all queries executed within that session. The :class:`_orm.Session`, :class:`_orm.sessionmaker`, :class:`_orm.scoped_session`, :class:`_ext.asyncio.AsyncSession`, and :class:`_ext.asyncio.async_sessionmaker` constructors now accept an :paramref:`_orm.Session.execution_options` parameter that will be applied to all explicit query executions (e.g. using :meth:`_orm.Session.execute`, :meth:`_orm.Session.get`, :meth:`_orm.Session.scalars`) for that session instance. Fixes: #12659 Change-Id: I6e19e1567e0c04df32ba1d43baf420fb762f155c --- doc/build/changelog/unreleased_21/12659.rst | 13 +++ lib/sqlalchemy/ext/asyncio/scoping.py | 20 ++++ lib/sqlalchemy/ext/asyncio/session.py | 15 +++ lib/sqlalchemy/orm/scoping.py | 22 +++++ lib/sqlalchemy/orm/session.py | 67 +++++++++++-- test/ext/asyncio/test_session_py3k.py | 93 ++++++++++++++++++ test/orm/test_session.py | 100 ++++++++++++++++++++ tools/generate_proxy_methods.py | 16 +++- 8 files changed, 334 insertions(+), 12 deletions(-) create mode 100644 doc/build/changelog/unreleased_21/12659.rst diff --git a/doc/build/changelog/unreleased_21/12659.rst b/doc/build/changelog/unreleased_21/12659.rst new file mode 100644 index 0000000000..abee9e16f1 --- /dev/null +++ b/doc/build/changelog/unreleased_21/12659.rst @@ -0,0 +1,13 @@ +.. change:: + :tags: feature, orm + :tickets: 12659 + + Added support for per-session execution options that are merged into all + queries executed within that session. The :class:`_orm.Session`, + :class:`_orm.sessionmaker`, :class:`_orm.scoped_session`, + :class:`_ext.asyncio.AsyncSession`, and + :class:`_ext.asyncio.async_sessionmaker` constructors now accept an + :paramref:`_orm.Session.execution_options` parameter that will be applied + to all explicit query executions (e.g. using :meth:`_orm.Session.execute`, + :meth:`_orm.Session.get`, :meth:`_orm.Session.scalars`) for that session + instance. diff --git a/lib/sqlalchemy/ext/asyncio/scoping.py b/lib/sqlalchemy/ext/asyncio/scoping.py index 7730b7a52d..5a2064a230 100644 --- a/lib/sqlalchemy/ext/asyncio/scoping.py +++ b/lib/sqlalchemy/ext/asyncio/scoping.py @@ -114,6 +114,7 @@ _Ts = TypeVarTuple("_Ts") "autoflush", "no_autoflush", "info", + "execution_options", ], use_intermediate_variable=["get"], ) @@ -1571,6 +1572,25 @@ class async_scoped_session(Generic[_AS]): return self._proxied.info + @property + def execution_options(self) -> Any: + r"""Proxy for the :attr:`_orm.Session.execution_options` attribute + on behalf of the :class:`_asyncio.AsyncSession` class. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class + on behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + + """ # noqa: E501 + + return self._proxied.execution_options + + @execution_options.setter + def execution_options(self, attr: Any) -> None: + self._proxied.execution_options = attr + @classmethod async def close_all(cls) -> None: r"""Close all :class:`_asyncio.AsyncSession` sessions. diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py index 58958b7f10..adfc5adf29 100644 --- a/lib/sqlalchemy/ext/asyncio/session.py +++ b/lib/sqlalchemy/ext/asyncio/session.py @@ -55,6 +55,7 @@ if TYPE_CHECKING: from ...engine import RowMapping from ...engine import ScalarResult from ...engine.interfaces import _CoreAnyExecuteParams + from ...engine.interfaces import _ExecuteOptions from ...engine.interfaces import CoreExecuteOptionsParameter from ...event import dispatcher from ...orm._typing import _IdentityKeyType @@ -203,6 +204,7 @@ class AsyncAttrs: "autoflush", "no_autoflush", "info", + "execution_options", ], ) class AsyncSession(ReversibleProxy[Session]): @@ -1587,6 +1589,19 @@ class AsyncSession(ReversibleProxy[Session]): return self._proxied.info + @property + def execution_options(self) -> _ExecuteOptions: + r"""Proxy for the :attr:`_orm.Session.execution_options` attribute + on behalf of the :class:`_asyncio.AsyncSession` class. + + """ # noqa: E501 + + return self._proxied.execution_options + + @execution_options.setter + def execution_options(self, attr: _ExecuteOptions) -> None: + self._proxied.execution_options = attr + @classmethod def object_session(cls, instance: object) -> Optional[Session]: r"""Return the :class:`.Session` to which an object belongs. diff --git a/lib/sqlalchemy/orm/scoping.py b/lib/sqlalchemy/orm/scoping.py index 7bf77e20c7..f610948ef6 100644 --- a/lib/sqlalchemy/orm/scoping.py +++ b/lib/sqlalchemy/orm/scoping.py @@ -58,6 +58,7 @@ if TYPE_CHECKING: from ..engine import RowMapping from ..engine.interfaces import _CoreAnyExecuteParams from ..engine.interfaces import _CoreSingleExecuteParams + from ..engine.interfaces import _ExecuteOptions from ..engine.interfaces import CoreExecuteOptionsParameter from ..engine.result import ScalarResult from ..sql._typing import _ColumnsClauseArgument @@ -146,6 +147,7 @@ __all__ = ["scoped_session"] "autoflush", "no_autoflush", "info", + "execution_options", ], ) class scoped_session(Generic[_S]): @@ -774,6 +776,13 @@ class scoped_session(Generic[_S]): by :meth:`_engine.Connection.execution_options`, and may also provide additional options understood only in an ORM context. + The execution_options are passed along to methods like + :meth:`.Connection.execute` on :class:`.Connection` giving the + highest priority to execution_options that are passed to this + method explicitly, then the options that are present on the + statement object if any, and finally those options present + session-wide. + .. seealso:: :ref:`orm_queryguide_execution_options` - ORM-specific execution @@ -2145,6 +2154,19 @@ class scoped_session(Generic[_S]): return self._proxied.info + @property + def execution_options(self) -> _ExecuteOptions: + r"""Proxy for the :attr:`_orm.Session.execution_options` attribute + on behalf of the :class:`_orm.scoping.scoped_session` class. + + """ # noqa: E501 + + return self._proxied.execution_options + + @execution_options.setter + def execution_options(self, attr: _ExecuteOptions) -> None: + self._proxied.execution_options = attr + @classmethod def object_session(cls, instance: object) -> Optional[Session]: r"""Return the :class:`.Session` to which an object belongs. diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index 8c9a0daee8..100ef84fde 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -1484,6 +1484,7 @@ class Session(_SessionClassMethods, EventTarget): enable_baked_queries: bool twophase: bool join_transaction_mode: JoinTransactionMode + execution_options: _ExecuteOptions = util.EMPTY_DICT _query_cls: Type[Query[Any]] _close_state: _SessionCloseState @@ -1503,6 +1504,7 @@ class Session(_SessionClassMethods, EventTarget): autocommit: Literal[False] = False, join_transaction_mode: JoinTransactionMode = "conditional_savepoint", close_resets_only: Union[bool, _NoArg] = _NoArg.NO_ARG, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, ): r"""Construct a new :class:`_orm.Session`. @@ -1598,6 +1600,15 @@ class Session(_SessionClassMethods, EventTarget): flag therefore only affects applications that are making explicit use of this extension within their own code. + :param execution_options: optional dictionary of execution options + that will be applied to all calls to :meth:`_orm.Session.execute`, + :meth:`_orm.Session.scalars`, and similar. Execution options + present in statements as well as options passed to methods like + :meth:`_orm.Session.execute` explicitly take precedence over + the session-wide options. + + .. versionadded:: 2.1 + :param expire_on_commit: Defaults to ``True``. When ``True``, all instances will be fully expired after each :meth:`~.commit`, so that all attribute/object access subsequent to a completed @@ -1759,6 +1770,10 @@ class Session(_SessionClassMethods, EventTarget): self.autoflush = autoflush self.expire_on_commit = expire_on_commit self.enable_baked_queries = enable_baked_queries + if execution_options: + self.execution_options = self.execution_options.union( + execution_options + ) # the idea is that at some point NO_ARG will warn that in the future # the default will switch to close_resets_only=False. @@ -2157,7 +2172,28 @@ class Session(_SessionClassMethods, EventTarget): compile_state_cls = None bind_arguments.setdefault("clause", statement) - execution_options = util.coerce_to_immutabledict(execution_options) + combined_execution_options: util.immutabledict[str, Any] = ( + util.coerce_to_immutabledict(execution_options) + ) + if self.execution_options: + # merge given execution options with session-wide execution + # options. if the statement also has execution_options, + # maintain priority of session.execution_options -> + # statement.execution_options -> method passed execution_options + # by omitting from the base execution options those keys that + # will come from the statement + if statement._execution_options: + combined_execution_options = util.immutabledict( + { + k: v + for k, v in self.execution_options.items() + if k not in statement._execution_options + } + ).union(combined_execution_options) + else: + combined_execution_options = self.execution_options.union( + combined_execution_options + ) if _parent_execute_state: events_todo = _parent_execute_state._remaining_events() @@ -2176,12 +2212,12 @@ class Session(_SessionClassMethods, EventTarget): # as "pre fetch" for DML, etc. ( statement, - execution_options, + combined_execution_options, ) = compile_state_cls.orm_pre_session_exec( self, statement, params, - execution_options, + combined_execution_options, bind_arguments, True, ) @@ -2190,7 +2226,7 @@ class Session(_SessionClassMethods, EventTarget): self, statement, params, - execution_options, + combined_execution_options, bind_arguments, compile_state_cls, events_todo, @@ -2207,7 +2243,7 @@ class Session(_SessionClassMethods, EventTarget): return fn_result statement = orm_exec_state.statement - execution_options = orm_exec_state.local_execution_options + combined_execution_options = orm_exec_state.local_execution_options if compile_state_cls is not None: # now run orm_pre_session_exec() "for real". if there were @@ -2217,12 +2253,12 @@ class Session(_SessionClassMethods, EventTarget): # autoflush will also be invoked in this step if enabled. ( statement, - execution_options, + combined_execution_options, ) = compile_state_cls.orm_pre_session_exec( self, statement, params, - execution_options, + combined_execution_options, bind_arguments, False, ) @@ -2238,7 +2274,9 @@ class Session(_SessionClassMethods, EventTarget): if TYPE_CHECKING: params = cast(_CoreSingleExecuteParams, params) return conn.scalar( - statement, params or {}, execution_options=execution_options + statement, + params or {}, + execution_options=combined_execution_options, ) if compile_state_cls: @@ -2247,14 +2285,14 @@ class Session(_SessionClassMethods, EventTarget): self, statement, params or {}, - execution_options, + combined_execution_options, bind_arguments, conn, ) ) else: result = conn.execute( - statement, params, execution_options=execution_options + statement, params, execution_options=combined_execution_options ) if _scalar_result: @@ -2332,6 +2370,13 @@ class Session(_SessionClassMethods, EventTarget): by :meth:`_engine.Connection.execution_options`, and may also provide additional options understood only in an ORM context. + The execution_options are passed along to methods like + :meth:`.Connection.execute` on :class:`.Connection` giving the + highest priority to execution_options that are passed to this + method explicitly, then the options that are present on the + statement object if any, and finally those options present + session-wide. + .. seealso:: :ref:`orm_queryguide_execution_options` - ORM-specific execution @@ -3876,6 +3921,8 @@ class Session(_SessionClassMethods, EventTarget): if options: statement = statement.options(*options) + if self.execution_options: + execution_options = self.execution_options.union(execution_options) return db_load_fn( self, statement, diff --git a/test/ext/asyncio/test_session_py3k.py b/test/ext/asyncio/test_session_py3k.py index 3ad10337b9..def178208b 100644 --- a/test/ext/asyncio/test_session_py3k.py +++ b/test/ext/asyncio/test_session_py3k.py @@ -170,6 +170,99 @@ class AsyncSessionTest(AsyncFixture): not_in(u1, s1) not_in(u2, s2) + @async_test + @testing.variation("session_type", ["plain", "sessionmaker"]) + @testing.variation("merge", [True, False]) + @testing.variation("method", ["scalar", "execute", "scalars", "get"]) + @testing.variation("add_statement_options", [True, False]) + async def test_execution_options( + self, + async_engine, + session_type: testing.Variation, + merge: testing.Variation, + method: testing.Variation, + add_statement_options: testing.Variation, + ): + User = self.classes.User + + session_execution_options = { + "populate_existing": True, + "autoflush": False, + "opt1": "z", + "opt5": "q", + } + + expected_opts = session_execution_options + + if add_statement_options: + statement_options = {"opt2": "w", "opt4": "y", "opt5": "w"} + expected_opts = {**expected_opts, **statement_options} + else: + statement_options = {} + + if merge: + query_opts = { + "compiled_cache": {}, + "opt1": "q", + "opt2": "p", + "opt3": "r", + "populate_existing": False, + } + expected_opts = {**expected_opts, **query_opts} + else: + query_opts = {} + + if session_type.plain: + sess = AsyncSession( + async_engine, execution_options=session_execution_options + ) + elif session_type.sessionmaker: + maker = async_sessionmaker( + async_engine, execution_options=session_execution_options + ) + sess = maker() + else: + session_type.fail() + + gather_options = {} + + @event.listens_for(sess.sync_session, "do_orm_execute") + def check(ctx) -> None: + assert not gather_options + gather_options.update(ctx.execution_options) + + if method.scalar: + statement = select(User).limit(1) + if add_statement_options: + statement = statement.execution_options(**statement_options) + await sess.scalar(statement, execution_options=query_opts) + elif method.execute: + statement = select(User).limit(1) + if add_statement_options: + statement = statement.execution_options(**statement_options) + await sess.execute(statement, execution_options=query_opts) + elif method.scalars: + statement = select(User).limit(1) + if add_statement_options: + statement = statement.execution_options(**statement_options) + await sess.scalars(statement, execution_options=query_opts) + elif method.get: + if add_statement_options: + await sess.get( + User, + 1, + execution_options={**statement_options, **query_opts}, + ) + else: + await sess.get(User, 1, execution_options=query_opts) + else: + method.fail() + + await sess.close() + + for key, value in expected_opts.items(): + eq_(gather_options[key], value) + class AsyncSessionQueryTest(AsyncFixture): @async_test diff --git a/test/orm/test_session.py b/test/orm/test_session.py index 7f61b6ce7b..5d1875bd80 100644 --- a/test/orm/test_session.py +++ b/test/orm/test_session.py @@ -721,6 +721,106 @@ class SessionStateTest(_fixtures.FixtureTest): s4 = maker2(info={"s4": 8}) eq_(s4.info, {"s4": 8}) + @testing.variation("session_type", ["plain", "sessionmaker"]) + @testing.variation("merge", [True, False]) + @testing.variation( + "method", ["scalar", "execute", "scalars", "get", "query"] + ) + @testing.variation("add_statement_options", [True, False]) + def test_execution_options( + self, + session_type: testing.Variation, + merge: testing.Variation, + method: testing.Variation, + add_statement_options: testing.Variation, + ): + users, User = self.tables.users, self.classes.User + self.mapper_registry.map_imperatively(User, users) + + session_execution_options = { + "populate_existing": True, + "autoflush": False, + "opt1": "z", + "opt5": "q", + } + + expected_opts = session_execution_options + + if add_statement_options: + statement_options = {"opt2": "w", "opt4": "y", "opt5": "w"} + expected_opts = {**expected_opts, **statement_options} + else: + statement_options = {} + + if merge: + query_opts = { + "compiled_cache": {}, + "opt1": "q", + "opt2": "p", + "opt3": "r", + "populate_existing": False, + } + expected_opts = {**expected_opts, **query_opts} + else: + query_opts = {} + + if session_type.plain: + sess = Session( + testing.db, execution_options=session_execution_options + ) + elif session_type.sessionmaker: + maker = sessionmaker( + testing.db, execution_options=session_execution_options + ) + sess = maker() + else: + session_type.fail() + + gather_options = {} + + @event.listens_for(sess, "do_orm_execute") + def check(ctx: ORMExecuteState) -> None: + assert not gather_options + gather_options.update(ctx.execution_options) + + if method.scalar: + statement = select(User).limit(1) + if add_statement_options: + statement = statement.execution_options(**statement_options) + sess.scalar(statement, execution_options=query_opts) + elif method.execute: + statement = select(User).limit(1) + if add_statement_options: + statement = statement.execution_options(**statement_options) + sess.execute(statement, execution_options=query_opts) + elif method.scalars: + statement = select(User).limit(1) + if add_statement_options: + statement = statement.execution_options(**statement_options) + sess.scalars(statement, execution_options=query_opts) + elif method.get: + if add_statement_options: + sess.get( + User, + 1, + execution_options={**statement_options, **query_opts}, + ) + else: + sess.get(User, 1, execution_options=query_opts) + elif method.query: + q = sess.query(User).limit(1) + if add_statement_options: + q = q.execution_options(**statement_options) + q = q.execution_options(**query_opts) + q.all() + else: + method.fail() + + sess.close() + + for key, value in expected_opts.items(): + eq_(gather_options[key], value) + def test_autocommit_kw_accepted_but_must_be_false(self): Session(autocommit=False) diff --git a/tools/generate_proxy_methods.py b/tools/generate_proxy_methods.py index b9f9d572b0..95bb1a4180 100644 --- a/tools/generate_proxy_methods.py +++ b/tools/generate_proxy_methods.py @@ -307,13 +307,26 @@ def process_class( "import annotations set up?" ) + existing_doc = None + if attr is not None: if isinstance(attr, property): readonly = attr.fset is None + existing_doc = attr.__doc__ elif isinstance(attr, langhelpers.generic_fn_descriptor): readonly = True - else: + existing_doc = attr.__doc__ + elif hasattr(attr, "__get__"): readonly = not hasattr(attr, "__set__") + existing_doc = attr.__doc__ + else: + # not a descriptor + readonly = False + + else: + readonly = False + + if existing_doc: doc = textwrap.indent( inject_docstring_text( attr.__doc__, @@ -330,7 +343,6 @@ def process_class( " ", ).lstrip() else: - readonly = False doc = ( f"Proxy for the :attr:`{sphinx_symbol}.{name}` " "attribute \n" -- 2.47.3