--- /dev/null
+.. change::
+ :tags: feature, orm
+ :tickets: 12659
+
+ Added support for per-session execution options that are merged into all
+ queries executed within that session. The :class:`_orm.Session`,
+ :class:`_orm.sessionmaker`, :class:`_orm.scoped_session`,
+ :class:`_ext.asyncio.AsyncSession`, and
+ :class:`_ext.asyncio.async_sessionmaker` constructors now accept an
+ :paramref:`_orm.Session.execution_options` parameter that will be applied
+ to all explicit query executions (e.g. using :meth:`_orm.Session.execute`,
+ :meth:`_orm.Session.get`, :meth:`_orm.Session.scalars`) for that session
+ instance.
"autoflush",
"no_autoflush",
"info",
+ "execution_options",
],
use_intermediate_variable=["get"],
)
return self._proxied.info
+ @property
+ def execution_options(self) -> Any:
+ r"""Proxy for the :attr:`_orm.Session.execution_options` attribute
+ on behalf of the :class:`_asyncio.AsyncSession` class.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_asyncio.AsyncSession` class
+ on behalf of the :class:`_asyncio.scoping.async_scoped_session` class.
+
+
+ """ # noqa: E501
+
+ return self._proxied.execution_options
+
+ @execution_options.setter
+ def execution_options(self, attr: Any) -> None:
+ self._proxied.execution_options = attr
+
@classmethod
async def close_all(cls) -> None:
r"""Close all :class:`_asyncio.AsyncSession` sessions.
from ...engine import RowMapping
from ...engine import ScalarResult
from ...engine.interfaces import _CoreAnyExecuteParams
+ from ...engine.interfaces import _ExecuteOptions
from ...engine.interfaces import CoreExecuteOptionsParameter
from ...event import dispatcher
from ...orm._typing import _IdentityKeyType
"autoflush",
"no_autoflush",
"info",
+ "execution_options",
],
)
class AsyncSession(ReversibleProxy[Session]):
return self._proxied.info
+ @property
+ def execution_options(self) -> _ExecuteOptions:
+ r"""Proxy for the :attr:`_orm.Session.execution_options` attribute
+ on behalf of the :class:`_asyncio.AsyncSession` class.
+
+ """ # noqa: E501
+
+ return self._proxied.execution_options
+
+ @execution_options.setter
+ def execution_options(self, attr: _ExecuteOptions) -> None:
+ self._proxied.execution_options = attr
+
@classmethod
def object_session(cls, instance: object) -> Optional[Session]:
r"""Return the :class:`.Session` to which an object belongs.
from ..engine import RowMapping
from ..engine.interfaces import _CoreAnyExecuteParams
from ..engine.interfaces import _CoreSingleExecuteParams
+ from ..engine.interfaces import _ExecuteOptions
from ..engine.interfaces import CoreExecuteOptionsParameter
from ..engine.result import ScalarResult
from ..sql._typing import _ColumnsClauseArgument
"autoflush",
"no_autoflush",
"info",
+ "execution_options",
],
)
class scoped_session(Generic[_S]):
by :meth:`_engine.Connection.execution_options`, and may also
provide additional options understood only in an ORM context.
+ The execution_options are passed along to methods like
+ :meth:`.Connection.execute` on :class:`.Connection` giving the
+ highest priority to execution_options that are passed to this
+ method explicitly, then the options that are present on the
+ statement object if any, and finally those options present
+ session-wide.
+
.. seealso::
:ref:`orm_queryguide_execution_options` - ORM-specific execution
return self._proxied.info
+ @property
+ def execution_options(self) -> _ExecuteOptions:
+ r"""Proxy for the :attr:`_orm.Session.execution_options` attribute
+ on behalf of the :class:`_orm.scoping.scoped_session` class.
+
+ """ # noqa: E501
+
+ return self._proxied.execution_options
+
+ @execution_options.setter
+ def execution_options(self, attr: _ExecuteOptions) -> None:
+ self._proxied.execution_options = attr
+
@classmethod
def object_session(cls, instance: object) -> Optional[Session]:
r"""Return the :class:`.Session` to which an object belongs.
enable_baked_queries: bool
twophase: bool
join_transaction_mode: JoinTransactionMode
+ execution_options: _ExecuteOptions = util.EMPTY_DICT
_query_cls: Type[Query[Any]]
_close_state: _SessionCloseState
autocommit: Literal[False] = False,
join_transaction_mode: JoinTransactionMode = "conditional_savepoint",
close_resets_only: Union[bool, _NoArg] = _NoArg.NO_ARG,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
):
r"""Construct a new :class:`_orm.Session`.
flag therefore only affects applications that are making explicit
use of this extension within their own code.
+ :param execution_options: optional dictionary of execution options
+ that will be applied to all calls to :meth:`_orm.Session.execute`,
+ :meth:`_orm.Session.scalars`, and similar. Execution options
+ present in statements as well as options passed to methods like
+ :meth:`_orm.Session.execute` explicitly take precedence over
+ the session-wide options.
+
+ .. versionadded:: 2.1
+
:param expire_on_commit: Defaults to ``True``. When ``True``, all
instances will be fully expired after each :meth:`~.commit`,
so that all attribute/object access subsequent to a completed
self.autoflush = autoflush
self.expire_on_commit = expire_on_commit
self.enable_baked_queries = enable_baked_queries
+ if execution_options:
+ self.execution_options = self.execution_options.union(
+ execution_options
+ )
# the idea is that at some point NO_ARG will warn that in the future
# the default will switch to close_resets_only=False.
compile_state_cls = None
bind_arguments.setdefault("clause", statement)
- execution_options = util.coerce_to_immutabledict(execution_options)
+ combined_execution_options: util.immutabledict[str, Any] = (
+ util.coerce_to_immutabledict(execution_options)
+ )
+ if self.execution_options:
+ # merge given execution options with session-wide execution
+ # options. if the statement also has execution_options,
+ # maintain priority of session.execution_options ->
+ # statement.execution_options -> method passed execution_options
+ # by omitting from the base execution options those keys that
+ # will come from the statement
+ if statement._execution_options:
+ combined_execution_options = util.immutabledict(
+ {
+ k: v
+ for k, v in self.execution_options.items()
+ if k not in statement._execution_options
+ }
+ ).union(combined_execution_options)
+ else:
+ combined_execution_options = self.execution_options.union(
+ combined_execution_options
+ )
if _parent_execute_state:
events_todo = _parent_execute_state._remaining_events()
# as "pre fetch" for DML, etc.
(
statement,
- execution_options,
+ combined_execution_options,
) = compile_state_cls.orm_pre_session_exec(
self,
statement,
params,
- execution_options,
+ combined_execution_options,
bind_arguments,
True,
)
self,
statement,
params,
- execution_options,
+ combined_execution_options,
bind_arguments,
compile_state_cls,
events_todo,
return fn_result
statement = orm_exec_state.statement
- execution_options = orm_exec_state.local_execution_options
+ combined_execution_options = orm_exec_state.local_execution_options
if compile_state_cls is not None:
# now run orm_pre_session_exec() "for real". if there were
# autoflush will also be invoked in this step if enabled.
(
statement,
- execution_options,
+ combined_execution_options,
) = compile_state_cls.orm_pre_session_exec(
self,
statement,
params,
- execution_options,
+ combined_execution_options,
bind_arguments,
False,
)
if TYPE_CHECKING:
params = cast(_CoreSingleExecuteParams, params)
return conn.scalar(
- statement, params or {}, execution_options=execution_options
+ statement,
+ params or {},
+ execution_options=combined_execution_options,
)
if compile_state_cls:
self,
statement,
params or {},
- execution_options,
+ combined_execution_options,
bind_arguments,
conn,
)
)
else:
result = conn.execute(
- statement, params, execution_options=execution_options
+ statement, params, execution_options=combined_execution_options
)
if _scalar_result:
by :meth:`_engine.Connection.execution_options`, and may also
provide additional options understood only in an ORM context.
+ The execution_options are passed along to methods like
+ :meth:`.Connection.execute` on :class:`.Connection` giving the
+ highest priority to execution_options that are passed to this
+ method explicitly, then the options that are present on the
+ statement object if any, and finally those options present
+ session-wide.
+
.. seealso::
:ref:`orm_queryguide_execution_options` - ORM-specific execution
if options:
statement = statement.options(*options)
+ if self.execution_options:
+ execution_options = self.execution_options.union(execution_options)
return db_load_fn(
self,
statement,
not_in(u1, s1)
not_in(u2, s2)
+ @async_test
+ @testing.variation("session_type", ["plain", "sessionmaker"])
+ @testing.variation("merge", [True, False])
+ @testing.variation("method", ["scalar", "execute", "scalars", "get"])
+ @testing.variation("add_statement_options", [True, False])
+ async def test_execution_options(
+ self,
+ async_engine,
+ session_type: testing.Variation,
+ merge: testing.Variation,
+ method: testing.Variation,
+ add_statement_options: testing.Variation,
+ ):
+ User = self.classes.User
+
+ session_execution_options = {
+ "populate_existing": True,
+ "autoflush": False,
+ "opt1": "z",
+ "opt5": "q",
+ }
+
+ expected_opts = session_execution_options
+
+ if add_statement_options:
+ statement_options = {"opt2": "w", "opt4": "y", "opt5": "w"}
+ expected_opts = {**expected_opts, **statement_options}
+ else:
+ statement_options = {}
+
+ if merge:
+ query_opts = {
+ "compiled_cache": {},
+ "opt1": "q",
+ "opt2": "p",
+ "opt3": "r",
+ "populate_existing": False,
+ }
+ expected_opts = {**expected_opts, **query_opts}
+ else:
+ query_opts = {}
+
+ if session_type.plain:
+ sess = AsyncSession(
+ async_engine, execution_options=session_execution_options
+ )
+ elif session_type.sessionmaker:
+ maker = async_sessionmaker(
+ async_engine, execution_options=session_execution_options
+ )
+ sess = maker()
+ else:
+ session_type.fail()
+
+ gather_options = {}
+
+ @event.listens_for(sess.sync_session, "do_orm_execute")
+ def check(ctx) -> None:
+ assert not gather_options
+ gather_options.update(ctx.execution_options)
+
+ if method.scalar:
+ statement = select(User).limit(1)
+ if add_statement_options:
+ statement = statement.execution_options(**statement_options)
+ await sess.scalar(statement, execution_options=query_opts)
+ elif method.execute:
+ statement = select(User).limit(1)
+ if add_statement_options:
+ statement = statement.execution_options(**statement_options)
+ await sess.execute(statement, execution_options=query_opts)
+ elif method.scalars:
+ statement = select(User).limit(1)
+ if add_statement_options:
+ statement = statement.execution_options(**statement_options)
+ await sess.scalars(statement, execution_options=query_opts)
+ elif method.get:
+ if add_statement_options:
+ await sess.get(
+ User,
+ 1,
+ execution_options={**statement_options, **query_opts},
+ )
+ else:
+ await sess.get(User, 1, execution_options=query_opts)
+ else:
+ method.fail()
+
+ await sess.close()
+
+ for key, value in expected_opts.items():
+ eq_(gather_options[key], value)
+
class AsyncSessionQueryTest(AsyncFixture):
@async_test
s4 = maker2(info={"s4": 8})
eq_(s4.info, {"s4": 8})
+ @testing.variation("session_type", ["plain", "sessionmaker"])
+ @testing.variation("merge", [True, False])
+ @testing.variation(
+ "method", ["scalar", "execute", "scalars", "get", "query"]
+ )
+ @testing.variation("add_statement_options", [True, False])
+ def test_execution_options(
+ self,
+ session_type: testing.Variation,
+ merge: testing.Variation,
+ method: testing.Variation,
+ add_statement_options: testing.Variation,
+ ):
+ users, User = self.tables.users, self.classes.User
+ self.mapper_registry.map_imperatively(User, users)
+
+ session_execution_options = {
+ "populate_existing": True,
+ "autoflush": False,
+ "opt1": "z",
+ "opt5": "q",
+ }
+
+ expected_opts = session_execution_options
+
+ if add_statement_options:
+ statement_options = {"opt2": "w", "opt4": "y", "opt5": "w"}
+ expected_opts = {**expected_opts, **statement_options}
+ else:
+ statement_options = {}
+
+ if merge:
+ query_opts = {
+ "compiled_cache": {},
+ "opt1": "q",
+ "opt2": "p",
+ "opt3": "r",
+ "populate_existing": False,
+ }
+ expected_opts = {**expected_opts, **query_opts}
+ else:
+ query_opts = {}
+
+ if session_type.plain:
+ sess = Session(
+ testing.db, execution_options=session_execution_options
+ )
+ elif session_type.sessionmaker:
+ maker = sessionmaker(
+ testing.db, execution_options=session_execution_options
+ )
+ sess = maker()
+ else:
+ session_type.fail()
+
+ gather_options = {}
+
+ @event.listens_for(sess, "do_orm_execute")
+ def check(ctx: ORMExecuteState) -> None:
+ assert not gather_options
+ gather_options.update(ctx.execution_options)
+
+ if method.scalar:
+ statement = select(User).limit(1)
+ if add_statement_options:
+ statement = statement.execution_options(**statement_options)
+ sess.scalar(statement, execution_options=query_opts)
+ elif method.execute:
+ statement = select(User).limit(1)
+ if add_statement_options:
+ statement = statement.execution_options(**statement_options)
+ sess.execute(statement, execution_options=query_opts)
+ elif method.scalars:
+ statement = select(User).limit(1)
+ if add_statement_options:
+ statement = statement.execution_options(**statement_options)
+ sess.scalars(statement, execution_options=query_opts)
+ elif method.get:
+ if add_statement_options:
+ sess.get(
+ User,
+ 1,
+ execution_options={**statement_options, **query_opts},
+ )
+ else:
+ sess.get(User, 1, execution_options=query_opts)
+ elif method.query:
+ q = sess.query(User).limit(1)
+ if add_statement_options:
+ q = q.execution_options(**statement_options)
+ q = q.execution_options(**query_opts)
+ q.all()
+ else:
+ method.fail()
+
+ sess.close()
+
+ for key, value in expected_opts.items():
+ eq_(gather_options[key], value)
+
def test_autocommit_kw_accepted_but_must_be_false(self):
Session(autocommit=False)
"import annotations set up?"
)
+ existing_doc = None
+
if attr is not None:
if isinstance(attr, property):
readonly = attr.fset is None
+ existing_doc = attr.__doc__
elif isinstance(attr, langhelpers.generic_fn_descriptor):
readonly = True
- else:
+ existing_doc = attr.__doc__
+ elif hasattr(attr, "__get__"):
readonly = not hasattr(attr, "__set__")
+ existing_doc = attr.__doc__
+ else:
+ # not a descriptor
+ readonly = False
+
+ else:
+ readonly = False
+
+ if existing_doc:
doc = textwrap.indent(
inject_docstring_text(
attr.__doc__,
" ",
).lstrip()
else:
- readonly = False
doc = (
f"Proxy for the :attr:`{sphinx_symbol}.{name}` "
"attribute \n"