--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 9973
+
+ Additional hardening and documentation for the ORM :class:`_orm.Session`
+ "state change" system, which detects concurrent use of
+ :class:`_orm.Session` and :class:`_asyncio.AsyncSession` objects; an
+ additional check is added within the process to acquire connections from
+ the underlying engine, which is a critical section with regards to internal
+ connection management.
Object Relational Mapping
-------------------------
+.. _error_isce:
+
+IllegalStateChangeError and concurrency exceptions
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+SQLAlchemy 2.0 introduced a new system described at :ref:`change_7433`, which
+proactively detects concurrent methods being invoked on an individual instance of
+the :class:`_orm.Session`
+object and by extension the :class:`_asyncio.AsyncSession` proxy object.
+These concurrent access calls typically, though not exclusively, would occur
+when a single instance of :class:`_orm.Session` is shared among multiple
+concurrent threads without such access being synchronized, or similarly
+when a single instance of :class:`_asyncio.AsyncSession` is shared among
+multiple concurrent tasks (such as when using a function like ``asyncio.gather()``).
+These use patterns are not the appropriate use of these objects, where without
+the proactive warning system SQLAlchemy implements would still otherwise produce
+invalid state within the objects, producing hard-to-debug errors including
+driver-level errors on the database connections themselves.
+
+Instances of :class:`_orm.Session` and :class:`_asyncio.AsyncSession` are
+**mutable, stateful objects with no built-in synchronization** of method calls,
+and represent a **single, ongoing database transaction** upon a single database
+connection at a time for a particular :class:`.Engine` or :class:`.AsyncEngine`
+to which the object is bound (note that these objects both support being bound
+to multiple engines at once, however in this case there will still be only one
+connection per engine in play within the scope of a transaction). A single
+database transaction is not an appropriate target for concurrent SQL commands;
+instead, an application that runs concurrent database operations should use
+concurrent transactions. For these objects then it follows that the appropriate
+pattern is :class:`_orm.Session` per thread, or :class:`_asyncio.AsyncSession`
+per task.
+
+For more background on concurrency see the section
+:ref:`session_faq_threadsafe`.
+
+
.. _error_bhk3:
Parent instance <x> is not bound to a Session; (lazy load/deferred load/refresh/etc.) operation cannot proceed
---------------
Using :term:`2.0 style` querying, the :class:`_asyncio.AsyncSession` class
-provides full ORM functionality. Within the default mode of use, special care
-must be taken to avoid :term:`lazy loading` or other expired-attribute access
-involving ORM relationships and column attributes; the next
-section :ref:`asyncio_orm_avoid_lazyloads` details this. The example below
-illustrates a complete example including mapper and session configuration::
+provides full ORM functionality.
+
+Within the default mode of use, special care must be taken to avoid :term:`lazy
+loading` or other expired-attribute access involving ORM relationships and
+column attributes; the next section :ref:`asyncio_orm_avoid_lazyloads` details
+this.
+
+.. warning::
+
+ A single instance of :class:`_asyncio.AsyncSession` is **not safe for
+ use in multiple, concurrent tasks**. See the sections
+ :ref:`asyncio_concurrency` and :ref:`session_faq_threadsafe` for background.
+
+The example below illustrates a complete example including mapper and session
+configuration::
from __future__ import annotations
:meth:`_asyncio.AsyncSession.close` method.
+.. _asyncio_concurrency:
+
+Using AsyncSession with Concurrent Tasks
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The :class:`_asyncio.AsyncSession` object is a **mutable, stateful object**
+which represents a **single, stateful database transaction in progress**. Using
+concurrent tasks with asyncio, with APIs such as ``asyncio.gather()`` for
+example, should use a **separate** :class:`_asyncio.AsyncSession` **per individual
+task**.
+
+See the section :ref:`session_faq_threadsafe` for a general description of
+the :class:`_orm.Session` and :class:`_asyncio.AsyncSession` with regards to
+how they should be used with concurrent workloads.
+
.. _asyncio_orm_avoid_lazyloads:
Preventing Implicit IO when Using AsyncSession
.. _session_faq_threadsafe:
-Is the session thread-safe?
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The :class:`.Session` is very much intended to be used in a
-**non-concurrent** fashion, which usually means in only one thread at a
-time.
-
-The :class:`.Session` should be used in such a way that one
-instance exists for a single series of operations within a single
-transaction. One expedient way to get this effect is by associating
-a :class:`.Session` with the current thread (see :ref:`unitofwork_contextual`
-for background). Another is to use a pattern
-where the :class:`.Session` is passed between functions and is otherwise
-not shared with other threads.
-
-The bigger point is that you should not *want* to use the session
-with multiple concurrent threads. That would be like having everyone at a
-restaurant all eat from the same plate. The session is a local "workspace"
-that you use for a specific set of tasks; you don't want to, or need to,
-share that session with other threads who are doing some other task.
-
-Making sure the :class:`.Session` is only used in a single concurrent thread at a time
-is called a "share nothing" approach to concurrency. But actually, not
-sharing the :class:`.Session` implies a more significant pattern; it
-means not just the :class:`.Session` object itself, but
-also **all objects that are associated with that Session**, must be kept within
-the scope of a single concurrent thread. The set of mapped
-objects associated with a :class:`.Session` are essentially proxies for data
-within database rows accessed over a database connection, and so just like
-the :class:`.Session` itself, the whole
-set of objects is really just a large-scale proxy for a database connection
-(or connections). Ultimately, it's mostly the DBAPI connection itself that
-we're keeping away from concurrent access; but since the :class:`.Session`
-and all the objects associated with it are all proxies for that DBAPI connection,
-the entire graph is essentially not safe for concurrent access.
-
-If there are in fact multiple threads participating
-in the same task, then you may consider sharing the session and its objects between
-those threads; however, in this extremely unusual scenario the application would
-need to ensure that a proper locking scheme is implemented so that there isn't
-*concurrent* access to the :class:`.Session` or its state. A more common approach
-to this situation is to maintain a single :class:`.Session` per concurrent thread,
-but to instead *copy* objects from one :class:`.Session` to another, often
-using the :meth:`.Session.merge` method to copy the state of an object into
-a new object local to a different :class:`.Session`.
+Is the Session thread-safe? Is AsyncSession safe to share in concurrent tasks?
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The :class:`.Session` is a **mutable, stateful** object that represents a **single
+database transaction**. An instance of :class:`.Session` therefore **cannot
+be shared among concurrent threads or asyncio tasks without careful
+synchronization**. The :class:`.Session` is intended to be used in a
+**non-concurrent** fashion, that is, a particular instance of :class:`.Session`
+should be used in only one thread or task at a time.
+
+When using the :class:`_asyncio.AsyncSession` object from SQLAlchemy's
+:ref:`asyncio <asyncio_toplevel>` extension, this object is only a thin proxy
+on top of a :class:`_orm.Session`, and the same rules apply; it is an
+**unsynchronized, mutable, stateful object**, so it is **not** safe to use a single
+instance of :class:`_asyncio.AsyncSession` in multiple asyncio tasks at once.
+
+An instance of :class:`.Session` or :class:`_asyncio.AsyncSession` represents a
+single logical database transaction, referencing only a single
+:class:`_engine.Connection` at a time for a particular :class:`.Engine` or
+:class:`.AsyncEngine` to which the object is bound (note that these objects
+both support being bound to multiple engines at once, however in this case
+there will still be only one connection per engine in play within the
+scope of a transaction).
+
+A database connection within a transaction is also a stateful object that is
+intended to be operated upon in a non-concurrent, sequential fashion. Commands
+are issued on the connection in a sequence, which are handled by the database
+server in the exact order in which they are emitted. As the
+:class:`_orm.Session` emits commands upon this connection and receives results,
+the :class:`_orm.Session` itself is transitioning through internal state
+changes that align with the state of commands and data present on this
+connection; states which include if a transaction were begun, committed, or
+rolled back, what SAVEPOINTs if any are in play, as well as fine-grained
+synchronization of the state of individual database rows with local ORM-mapped
+objects.
+
+When designing database applications for concurrency, the appropriate model is
+that each concurrent task / thread works with its own database transaction.
+This is why when discussing the issue of database concurrency, the standard
+terminology used is **multiple, concurrent transactions**. Within traditional
+RDMS there is no analogue for a single database transaction that is receiving
+and processing multiple commands concurrently.
+
+The concurrency model for SQLAlchemy's :class:`_orm.Session` and
+:class:`_asyncio.AsyncSession` is therefore **Session per thread, AsyncSession per
+task**. An application that uses multiple threads, or multiple tasks in
+asyncio such as when using an API like ``asyncio.gather()`` would want to ensure
+that each thread has its own :class:`_orm.Session`, each asyncio task
+has its own :class:`_asyncio.AsyncSession`.
+
+The best way to ensure this use is by using the :ref:`standard context manager
+pattern <session_getting>` locally within the top level Python function that
+is inside the thread or task, which will ensure the lifespan of the
+:class:`_orm.Session` or :class:`_asyncio.AsyncSession` is maintained within
+a local scope.
+
+For applications that benefit from having a "global" :class:`.Session`
+where it's not an option to pass the :class:`.Session` object to specific
+functions and methods which require it, the :class:`.scoped_session`
+approach can provide for a "thread local" :class:`.Session` object;
+see the section :ref:`unitofwork_contextual` for background. Within
+the asyncio context, the :class:`.async_scoped_session`
+object is the asyncio analogue for :class:`.scoped_session`, however is more
+challenging to configure as it requires a custom "context" function.
+
The :class:`_asyncio.AsyncSession` is a proxy for a traditional
:class:`_orm.Session` instance.
+ The :class:`_asyncio.AsyncSession` is **not safe for use in concurrent
+ tasks.**. See :ref:`session_faq_threadsafe` for background.
+
.. versionadded:: 1.4
To use an :class:`_asyncio.AsyncSession` with custom :class:`_orm.Session`
COMMITTED = 3
DEACTIVE = 4
CLOSED = 5
+ PROVISIONING_CONNECTION = 6
# backwards compatibility
-ACTIVE, PREPARED, COMMITTED, DEACTIVE, CLOSED = tuple(SessionTransactionState)
+ACTIVE, PREPARED, COMMITTED, DEACTIVE, CLOSED, PROVISIONING_CONNECTION = tuple(
+ SessionTransactionState
+)
class ORMExecuteState(util.MemoizedSlots):
)
elif state is SessionTransactionState.CLOSED:
raise sa_exc.ResourceClosedError("This transaction is closed")
+ elif state is SessionTransactionState.PROVISIONING_CONNECTION:
+ raise sa_exc.InvalidRequestError(
+ "This session is provisioning a new connection; concurrent "
+ "operations are not permitted",
+ code="isce",
+ )
else:
raise sa_exc.InvalidRequestError(
f"This session is in '{state.name.lower()}' state; no "
)
return self._connections[bind][0]
+ self._state = SessionTransactionState.PROVISIONING_CONNECTION
+
local_connect = False
should_commit = True
- if self._parent:
- conn = self._parent._connection_for_bind(bind, execution_options)
- if not self.nested:
- return conn
- else:
- if isinstance(bind, engine.Connection):
- conn = bind
- if conn.engine in self._connections:
- raise sa_exc.InvalidRequestError(
- "Session already has a Connection associated for the "
- "given Connection's Engine"
- )
- else:
- conn = bind.connect()
- local_connect = True
-
try:
- if execution_options:
- conn = conn.execution_options(**execution_options)
-
- transaction: Transaction
- if self.session.twophase and self._parent is None:
- # TODO: shouldn't we only be here if not
- # conn.in_transaction() ?
- # if twophase is set and conn.in_transaction(), validate
- # that it is in fact twophase.
- transaction = conn.begin_twophase()
- elif self.nested:
- transaction = conn.begin_nested()
- elif conn.in_transaction():
- join_transaction_mode = self.session.join_transaction_mode
-
- if join_transaction_mode == "conditional_savepoint":
- if conn.in_nested_transaction():
- join_transaction_mode = "create_savepoint"
- else:
- join_transaction_mode = "rollback_only"
+ if self._parent:
+ conn = self._parent._connection_for_bind(
+ bind, execution_options
+ )
+ if not self.nested:
+ return conn
+ else:
+ if isinstance(bind, engine.Connection):
+ conn = bind
+ if conn.engine in self._connections:
+ raise sa_exc.InvalidRequestError(
+ "Session already has a Connection associated "
+ "for the given Connection's Engine"
+ )
+ else:
+ conn = bind.connect()
+ local_connect = True
- if join_transaction_mode in (
- "control_fully",
- "rollback_only",
- ):
- if conn.in_nested_transaction():
- transaction = conn._get_required_nested_transaction()
- else:
- transaction = conn._get_required_transaction()
- if join_transaction_mode == "rollback_only":
- should_commit = False
- elif join_transaction_mode == "create_savepoint":
+ try:
+ if execution_options:
+ conn = conn.execution_options(**execution_options)
+
+ transaction: Transaction
+ if self.session.twophase and self._parent is None:
+ # TODO: shouldn't we only be here if not
+ # conn.in_transaction() ?
+ # if twophase is set and conn.in_transaction(), validate
+ # that it is in fact twophase.
+ transaction = conn.begin_twophase()
+ elif self.nested:
transaction = conn.begin_nested()
+ elif conn.in_transaction():
+ join_transaction_mode = self.session.join_transaction_mode
+
+ if join_transaction_mode == "conditional_savepoint":
+ if conn.in_nested_transaction():
+ join_transaction_mode = "create_savepoint"
+ else:
+ join_transaction_mode = "rollback_only"
+
+ if join_transaction_mode in (
+ "control_fully",
+ "rollback_only",
+ ):
+ if conn.in_nested_transaction():
+ transaction = (
+ conn._get_required_nested_transaction()
+ )
+ else:
+ transaction = conn._get_required_transaction()
+ if join_transaction_mode == "rollback_only":
+ should_commit = False
+ elif join_transaction_mode == "create_savepoint":
+ transaction = conn.begin_nested()
+ else:
+ assert False, join_transaction_mode
else:
- assert False, join_transaction_mode
+ transaction = conn.begin()
+ except:
+ # connection will not not be associated with this Session;
+ # close it immediately so that it isn't closed under GC
+ if local_connect:
+ conn.close()
+ raise
else:
- transaction = conn.begin()
- except:
- # connection will not not be associated with this Session;
- # close it immediately so that it isn't closed under GC
- if local_connect:
- conn.close()
- raise
- else:
- bind_is_connection = isinstance(bind, engine.Connection)
+ bind_is_connection = isinstance(bind, engine.Connection)
- self._connections[conn] = self._connections[conn.engine] = (
- conn,
- transaction,
- should_commit,
- not bind_is_connection,
- )
- self.session.dispatch.after_begin(self.session, self, conn)
- return conn
+ self._connections[conn] = self._connections[conn.engine] = (
+ conn,
+ transaction,
+ should_commit,
+ not bind_is_connection,
+ )
+ self.session.dispatch.after_begin(self.session, self, conn)
+ return conn
+ finally:
+ self._state = SessionTransactionState.ACTIVE
def prepare(self) -> None:
if self._parent is not None or not self.session.twophase:
class Session(_SessionClassMethods, EventTarget):
"""Manages persistence operations for ORM-mapped objects.
+ The :class:`_orm.Session` is **not safe for use in concurrent threads.**.
+ See :ref:`session_faq_threadsafe` for background.
+
The Session's usage paradigm is described at :doc:`/orm/session`.
autocommit: Literal[False] = False,
join_transaction_mode: JoinTransactionMode = "conditional_savepoint",
):
- r"""Construct a new Session.
+ r"""Construct a new :class:`_orm.Session`.
See also the :class:`.sessionmaker` function which is used to
generate a :class:`.Session`-producing callable with a given
) -> NoReturn:
raise sa_exc.IllegalStateChangeError(
f"Can't run operation '{operation_name}()' when Session "
- f"is in state {state!r}"
+ f"is in state {state!r}",
+ code="isce",
)
@classmethod
f"Method '{fn.__name__}()' can't be called here; "
f"method '{existing_fn.__name__}()' is already "
f"in progress and this would cause an unexpected "
- f"state change to {moves_to!r}"
+ f"state change to {moves_to!r}",
+ code="isce",
)
else:
raise sa_exc.IllegalStateChangeError(
f"Cant run operation '{fn.__name__}()' here; "
f"will move to state {moves_to!r} where we are "
- f"expecting {next_state!r}"
+ f"expecting {next_state!r}",
+ code="isce",
)
self._current_fn = fn
raise sa_exc.IllegalStateChangeError(
f"Method '{fn.__name__}()' failed to "
"change state "
- f"to {moves_to!r} as expected"
+ f"to {moves_to!r} as expected",
+ code="isce",
)
elif existing_fn:
raise sa_exc.IllegalStateChangeError(
"running, "
f"method '{fn.__name__}()' caused an "
"unexpected "
- f"state change to {self._state!r}"
+ f"state change to {self._state!r}",
+ code="isce",
)
else:
raise sa_exc.IllegalStateChangeError(
f"Method '{fn.__name__}()' caused an unexpected "
- f"state change to {self._state!r}"
+ f"state change to {self._state!r}",
+ code="isce",
)
finally:
else:
if self._state is not expected:
raise sa_exc.IllegalStateChangeError(
- f"Unexpected state change to {self._state!r}"
+ f"Unexpected state change to {self._state!r}", code="isce"
)
finally:
self._next_state = _StateChangeStates.CHANGE_IN_PROGRESS