--- /dev/null
+.. change::
+ :tags: bug, typing
+ :tickets: 10182
+
+ Fixed issue in :class:`_orm.Session` and :class:`_asyncio.AsyncSession`
+ methods such as :meth:`_orm.Session.connection` where the
+ :paramref:`_orm.Session.connection.execution_options` parameter were
+ hardcoded to an internal type that is not user-facing.
from ...engine import Row
from ...engine import RowMapping
from ...engine.interfaces import _CoreAnyExecuteParams
- from ...engine.interfaces import _ExecuteOptions
+ from ...engine.interfaces import CoreExecuteOptionsParameter
from ...engine.result import ScalarResult
from ...orm._typing import _IdentityKeyType
from ...orm._typing import _O
async def connection(
self,
bind_arguments: Optional[_BindArguments] = None,
- execution_options: Optional[_ExecuteOptions] = None,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
**kw: Any,
) -> AsyncConnection:
r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
from ...engine import RowMapping
from ...engine import ScalarResult
from ...engine.interfaces import _CoreAnyExecuteParams
- from ...engine.interfaces import _ExecuteOptions
+ from ...engine.interfaces import CoreExecuteOptionsParameter
from ...event import dispatcher
from ...orm._typing import _IdentityKeyType
from ...orm._typing import _O
async def connection(
self,
bind_arguments: Optional[_BindArguments] = None,
- execution_options: Optional[_ExecuteOptions] = None,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
**kw: Any,
) -> AsyncConnection:
r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
from ..engine import RowMapping
from ..engine.interfaces import _CoreAnyExecuteParams
from ..engine.interfaces import _CoreSingleExecuteParams
- from ..engine.interfaces import _ExecuteOptions
+ from ..engine.interfaces import CoreExecuteOptionsParameter
from ..engine.result import ScalarResult
from ..sql._typing import _ColumnsClauseArgument
from ..sql._typing import _T0
def connection(
self,
bind_arguments: Optional[_BindArguments] = None,
- execution_options: Optional[_ExecuteOptions] = None,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
) -> Connection:
r"""Return a :class:`_engine.Connection` object corresponding to this
:class:`.Session` object's transactional state.
from ..engine.interfaces import _CoreAnyExecuteParams
from ..engine.interfaces import _CoreSingleExecuteParams
from ..engine.interfaces import _ExecuteOptions
+ from ..engine.interfaces import CoreExecuteOptionsParameter
from ..engine.result import ScalarResult
from ..event import _InstanceLevelDispatch
from ..sql._typing import _ColumnsClauseArgument
def _connection_for_bind(
self,
bind: _SessionBind,
- execution_options: Optional[_ExecuteOptions],
+ execution_options: Optional[CoreExecuteOptionsParameter],
) -> Connection:
if bind in self._connections:
if execution_options:
def connection(
self,
bind_arguments: Optional[_BindArguments] = None,
- execution_options: Optional[_ExecuteOptions] = None,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
) -> Connection:
r"""Return a :class:`_engine.Connection` object corresponding to this
:class:`.Session` object's transactional state.
def _connection_for_bind(
self,
engine: _SessionBind,
- execution_options: Optional[_ExecuteOptions] = None,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
**kw: Any,
) -> Connection:
TransactionalContext._trans_ctx_check(self)
await ss.refresh(u1)
await ss.refresh(u1, with_for_update=True)
+
+
+def test_exec_options() -> None:
+ """test #10182"""
+
+ session = Session()
+
+ session.connection(
+ execution_options={"isolation_level": "REPEATABLE READ"}
+ )
+
+ scoped = scoped_session(sessionmaker())
+
+ scoped.connection(execution_options={"isolation_level": "REPEATABLE READ"})
+
+
+async def async_test_exec_options() -> None:
+ """test #10182"""
+
+ session = AsyncSession()
+
+ await session.connection(
+ execution_options={"isolation_level": "REPEATABLE READ"}
+ )
+
+ scoped = async_scoped_session(
+ async_sessionmaker(), scopefunc=asyncio.current_task
+ )
+
+ await scoped.connection(
+ execution_options={"isolation_level": "REPEATABLE READ"}
+ )