From: Mike Bayer Date: Mon, 19 Jun 2023 15:54:35 +0000 (-0400) Subject: track state change within _connection_for_bind() X-Git-Tag: rel_2_0_17~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=70e0f4c0fae1f7410a55c19e1282d4b4a179adc9;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git track state change within _connection_for_bind() Additional hardening and documentation for the ORM :class:`_orm.Session` "state change" system, which detects concurrent use of :class:`_orm.Session` and :class:`_asyncio.AsyncSession` objects; an additional check is added within the process to acquire connections from the underlying engine, which is a critical section with regards to internal connection management. Change-Id: I6893c53e016218ff6cfb39709179ca4bf73a2414 Fixes: #9973 --- diff --git a/doc/build/changelog/unreleased_20/9973.rst b/doc/build/changelog/unreleased_20/9973.rst new file mode 100644 index 0000000000..c9d74d27e0 --- /dev/null +++ b/doc/build/changelog/unreleased_20/9973.rst @@ -0,0 +1,10 @@ +.. change:: + :tags: bug, orm + :tickets: 9973 + + Additional hardening and documentation for the ORM :class:`_orm.Session` + "state change" system, which detects concurrent use of + :class:`_orm.Session` and :class:`_asyncio.AsyncSession` objects; an + additional check is added within the process to acquire connections from + the underlying engine, which is a critical section with regards to internal + connection management. diff --git a/doc/build/errors.rst b/doc/build/errors.rst index e02dfcb3fa..bcfb43efb7 100644 --- a/doc/build/errors.rst +++ b/doc/build/errors.rst @@ -835,6 +835,42 @@ and instead keep the SQL construction as explicit as possible. Object Relational Mapping ------------------------- +.. _error_isce: + +IllegalStateChangeError and concurrency exceptions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +SQLAlchemy 2.0 introduced a new system described at :ref:`change_7433`, which +proactively detects concurrent methods being invoked on an individual instance of +the :class:`_orm.Session` +object and by extension the :class:`_asyncio.AsyncSession` proxy object. +These concurrent access calls typically, though not exclusively, would occur +when a single instance of :class:`_orm.Session` is shared among multiple +concurrent threads without such access being synchronized, or similarly +when a single instance of :class:`_asyncio.AsyncSession` is shared among +multiple concurrent tasks (such as when using a function like ``asyncio.gather()``). +These use patterns are not the appropriate use of these objects, where without +the proactive warning system SQLAlchemy implements would still otherwise produce +invalid state within the objects, producing hard-to-debug errors including +driver-level errors on the database connections themselves. + +Instances of :class:`_orm.Session` and :class:`_asyncio.AsyncSession` are +**mutable, stateful objects with no built-in synchronization** of method calls, +and represent a **single, ongoing database transaction** upon a single database +connection at a time for a particular :class:`.Engine` or :class:`.AsyncEngine` +to which the object is bound (note that these objects both support being bound +to multiple engines at once, however in this case there will still be only one +connection per engine in play within the scope of a transaction). A single +database transaction is not an appropriate target for concurrent SQL commands; +instead, an application that runs concurrent database operations should use +concurrent transactions. For these objects then it follows that the appropriate +pattern is :class:`_orm.Session` per thread, or :class:`_asyncio.AsyncSession` +per task. + +For more background on concurrency see the section +:ref:`session_faq_threadsafe`. + + .. _error_bhk3: Parent instance is not bound to a Session; (lazy load/deferred load/refresh/etc.) operation cannot proceed diff --git a/doc/build/orm/extensions/asyncio.rst b/doc/build/orm/extensions/asyncio.rst index 016a3d908e..e4782f2bbb 100644 --- a/doc/build/orm/extensions/asyncio.rst +++ b/doc/build/orm/extensions/asyncio.rst @@ -140,11 +140,21 @@ Synopsis - ORM --------------- Using :term:`2.0 style` querying, the :class:`_asyncio.AsyncSession` class -provides full ORM functionality. Within the default mode of use, special care -must be taken to avoid :term:`lazy loading` or other expired-attribute access -involving ORM relationships and column attributes; the next -section :ref:`asyncio_orm_avoid_lazyloads` details this. The example below -illustrates a complete example including mapper and session configuration:: +provides full ORM functionality. + +Within the default mode of use, special care must be taken to avoid :term:`lazy +loading` or other expired-attribute access involving ORM relationships and +column attributes; the next section :ref:`asyncio_orm_avoid_lazyloads` details +this. + +.. warning:: + + A single instance of :class:`_asyncio.AsyncSession` is **not safe for + use in multiple, concurrent tasks**. See the sections + :ref:`asyncio_concurrency` and :ref:`session_faq_threadsafe` for background. + +The example below illustrates a complete example including mapper and session +configuration:: from __future__ import annotations @@ -264,6 +274,21 @@ the end of the block; this is equivalent to calling the :meth:`_asyncio.AsyncSession.close` method. +.. _asyncio_concurrency: + +Using AsyncSession with Concurrent Tasks +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The :class:`_asyncio.AsyncSession` object is a **mutable, stateful object** +which represents a **single, stateful database transaction in progress**. Using +concurrent tasks with asyncio, with APIs such as ``asyncio.gather()`` for +example, should use a **separate** :class:`_asyncio.AsyncSession` **per individual +task**. + +See the section :ref:`session_faq_threadsafe` for a general description of +the :class:`_orm.Session` and :class:`_asyncio.AsyncSession` with regards to +how they should be used with concurrent workloads. + .. _asyncio_orm_avoid_lazyloads: Preventing Implicit IO when Using AsyncSession diff --git a/doc/build/orm/session_basics.rst b/doc/build/orm/session_basics.rst index 33a58e87c7..6cdb58e1a2 100644 --- a/doc/build/orm/session_basics.rst +++ b/doc/build/orm/session_basics.rst @@ -944,48 +944,68 @@ The newer :ref:`core_inspection_toplevel` system can also be used:: .. _session_faq_threadsafe: -Is the session thread-safe? -~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The :class:`.Session` is very much intended to be used in a -**non-concurrent** fashion, which usually means in only one thread at a -time. - -The :class:`.Session` should be used in such a way that one -instance exists for a single series of operations within a single -transaction. One expedient way to get this effect is by associating -a :class:`.Session` with the current thread (see :ref:`unitofwork_contextual` -for background). Another is to use a pattern -where the :class:`.Session` is passed between functions and is otherwise -not shared with other threads. - -The bigger point is that you should not *want* to use the session -with multiple concurrent threads. That would be like having everyone at a -restaurant all eat from the same plate. The session is a local "workspace" -that you use for a specific set of tasks; you don't want to, or need to, -share that session with other threads who are doing some other task. - -Making sure the :class:`.Session` is only used in a single concurrent thread at a time -is called a "share nothing" approach to concurrency. But actually, not -sharing the :class:`.Session` implies a more significant pattern; it -means not just the :class:`.Session` object itself, but -also **all objects that are associated with that Session**, must be kept within -the scope of a single concurrent thread. The set of mapped -objects associated with a :class:`.Session` are essentially proxies for data -within database rows accessed over a database connection, and so just like -the :class:`.Session` itself, the whole -set of objects is really just a large-scale proxy for a database connection -(or connections). Ultimately, it's mostly the DBAPI connection itself that -we're keeping away from concurrent access; but since the :class:`.Session` -and all the objects associated with it are all proxies for that DBAPI connection, -the entire graph is essentially not safe for concurrent access. - -If there are in fact multiple threads participating -in the same task, then you may consider sharing the session and its objects between -those threads; however, in this extremely unusual scenario the application would -need to ensure that a proper locking scheme is implemented so that there isn't -*concurrent* access to the :class:`.Session` or its state. A more common approach -to this situation is to maintain a single :class:`.Session` per concurrent thread, -but to instead *copy* objects from one :class:`.Session` to another, often -using the :meth:`.Session.merge` method to copy the state of an object into -a new object local to a different :class:`.Session`. +Is the Session thread-safe? Is AsyncSession safe to share in concurrent tasks? +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The :class:`.Session` is a **mutable, stateful** object that represents a **single +database transaction**. An instance of :class:`.Session` therefore **cannot +be shared among concurrent threads or asyncio tasks without careful +synchronization**. The :class:`.Session` is intended to be used in a +**non-concurrent** fashion, that is, a particular instance of :class:`.Session` +should be used in only one thread or task at a time. + +When using the :class:`_asyncio.AsyncSession` object from SQLAlchemy's +:ref:`asyncio ` extension, this object is only a thin proxy +on top of a :class:`_orm.Session`, and the same rules apply; it is an +**unsynchronized, mutable, stateful object**, so it is **not** safe to use a single +instance of :class:`_asyncio.AsyncSession` in multiple asyncio tasks at once. + +An instance of :class:`.Session` or :class:`_asyncio.AsyncSession` represents a +single logical database transaction, referencing only a single +:class:`_engine.Connection` at a time for a particular :class:`.Engine` or +:class:`.AsyncEngine` to which the object is bound (note that these objects +both support being bound to multiple engines at once, however in this case +there will still be only one connection per engine in play within the +scope of a transaction). + +A database connection within a transaction is also a stateful object that is +intended to be operated upon in a non-concurrent, sequential fashion. Commands +are issued on the connection in a sequence, which are handled by the database +server in the exact order in which they are emitted. As the +:class:`_orm.Session` emits commands upon this connection and receives results, +the :class:`_orm.Session` itself is transitioning through internal state +changes that align with the state of commands and data present on this +connection; states which include if a transaction were begun, committed, or +rolled back, what SAVEPOINTs if any are in play, as well as fine-grained +synchronization of the state of individual database rows with local ORM-mapped +objects. + +When designing database applications for concurrency, the appropriate model is +that each concurrent task / thread works with its own database transaction. +This is why when discussing the issue of database concurrency, the standard +terminology used is **multiple, concurrent transactions**. Within traditional +RDMS there is no analogue for a single database transaction that is receiving +and processing multiple commands concurrently. + +The concurrency model for SQLAlchemy's :class:`_orm.Session` and +:class:`_asyncio.AsyncSession` is therefore **Session per thread, AsyncSession per +task**. An application that uses multiple threads, or multiple tasks in +asyncio such as when using an API like ``asyncio.gather()`` would want to ensure +that each thread has its own :class:`_orm.Session`, each asyncio task +has its own :class:`_asyncio.AsyncSession`. + +The best way to ensure this use is by using the :ref:`standard context manager +pattern ` locally within the top level Python function that +is inside the thread or task, which will ensure the lifespan of the +:class:`_orm.Session` or :class:`_asyncio.AsyncSession` is maintained within +a local scope. + +For applications that benefit from having a "global" :class:`.Session` +where it's not an option to pass the :class:`.Session` object to specific +functions and methods which require it, the :class:`.scoped_session` +approach can provide for a "thread local" :class:`.Session` object; +see the section :ref:`unitofwork_contextual` for background. Within +the asyncio context, the :class:`.async_scoped_session` +object is the asyncio analogue for :class:`.scoped_session`, however is more +challenging to configure as it requires a custom "context" function. + diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py index 6cab56baa9..ceeaff4d90 100644 --- a/lib/sqlalchemy/ext/asyncio/session.py +++ b/lib/sqlalchemy/ext/asyncio/session.py @@ -203,6 +203,9 @@ class AsyncSession(ReversibleProxy[Session]): The :class:`_asyncio.AsyncSession` is a proxy for a traditional :class:`_orm.Session` instance. + The :class:`_asyncio.AsyncSession` is **not safe for use in concurrent + tasks.**. See :ref:`session_faq_threadsafe` for background. + .. versionadded:: 1.4 To use an :class:`_asyncio.AsyncSession` with custom :class:`_orm.Session` diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index dd0cfb7908..8d19e2374e 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -251,10 +251,13 @@ class SessionTransactionState(_StateChangeState): COMMITTED = 3 DEACTIVE = 4 CLOSED = 5 + PROVISIONING_CONNECTION = 6 # backwards compatibility -ACTIVE, PREPARED, COMMITTED, DEACTIVE, CLOSED = tuple(SessionTransactionState) +ACTIVE, PREPARED, COMMITTED, DEACTIVE, CLOSED, PROVISIONING_CONNECTION = tuple( + SessionTransactionState +) class ORMExecuteState(util.MemoizedSlots): @@ -919,6 +922,12 @@ class SessionTransaction(_StateChange, TransactionalContext): ) elif state is SessionTransactionState.CLOSED: raise sa_exc.ResourceClosedError("This transaction is closed") + elif state is SessionTransactionState.PROVISIONING_CONNECTION: + raise sa_exc.InvalidRequestError( + "This session is provisioning a new connection; concurrent " + "operations are not permitted", + code="isce", + ) else: raise sa_exc.InvalidRequestError( f"This session is in '{state.name.lower()}' state; no " @@ -1090,80 +1099,89 @@ class SessionTransaction(_StateChange, TransactionalContext): ) return self._connections[bind][0] + self._state = SessionTransactionState.PROVISIONING_CONNECTION + local_connect = False should_commit = True - if self._parent: - conn = self._parent._connection_for_bind(bind, execution_options) - if not self.nested: - return conn - else: - if isinstance(bind, engine.Connection): - conn = bind - if conn.engine in self._connections: - raise sa_exc.InvalidRequestError( - "Session already has a Connection associated for the " - "given Connection's Engine" - ) - else: - conn = bind.connect() - local_connect = True - try: - if execution_options: - conn = conn.execution_options(**execution_options) - - transaction: Transaction - if self.session.twophase and self._parent is None: - # TODO: shouldn't we only be here if not - # conn.in_transaction() ? - # if twophase is set and conn.in_transaction(), validate - # that it is in fact twophase. - transaction = conn.begin_twophase() - elif self.nested: - transaction = conn.begin_nested() - elif conn.in_transaction(): - join_transaction_mode = self.session.join_transaction_mode - - if join_transaction_mode == "conditional_savepoint": - if conn.in_nested_transaction(): - join_transaction_mode = "create_savepoint" - else: - join_transaction_mode = "rollback_only" + if self._parent: + conn = self._parent._connection_for_bind( + bind, execution_options + ) + if not self.nested: + return conn + else: + if isinstance(bind, engine.Connection): + conn = bind + if conn.engine in self._connections: + raise sa_exc.InvalidRequestError( + "Session already has a Connection associated " + "for the given Connection's Engine" + ) + else: + conn = bind.connect() + local_connect = True - if join_transaction_mode in ( - "control_fully", - "rollback_only", - ): - if conn.in_nested_transaction(): - transaction = conn._get_required_nested_transaction() - else: - transaction = conn._get_required_transaction() - if join_transaction_mode == "rollback_only": - should_commit = False - elif join_transaction_mode == "create_savepoint": + try: + if execution_options: + conn = conn.execution_options(**execution_options) + + transaction: Transaction + if self.session.twophase and self._parent is None: + # TODO: shouldn't we only be here if not + # conn.in_transaction() ? + # if twophase is set and conn.in_transaction(), validate + # that it is in fact twophase. + transaction = conn.begin_twophase() + elif self.nested: transaction = conn.begin_nested() + elif conn.in_transaction(): + join_transaction_mode = self.session.join_transaction_mode + + if join_transaction_mode == "conditional_savepoint": + if conn.in_nested_transaction(): + join_transaction_mode = "create_savepoint" + else: + join_transaction_mode = "rollback_only" + + if join_transaction_mode in ( + "control_fully", + "rollback_only", + ): + if conn.in_nested_transaction(): + transaction = ( + conn._get_required_nested_transaction() + ) + else: + transaction = conn._get_required_transaction() + if join_transaction_mode == "rollback_only": + should_commit = False + elif join_transaction_mode == "create_savepoint": + transaction = conn.begin_nested() + else: + assert False, join_transaction_mode else: - assert False, join_transaction_mode + transaction = conn.begin() + except: + # connection will not not be associated with this Session; + # close it immediately so that it isn't closed under GC + if local_connect: + conn.close() + raise else: - transaction = conn.begin() - except: - # connection will not not be associated with this Session; - # close it immediately so that it isn't closed under GC - if local_connect: - conn.close() - raise - else: - bind_is_connection = isinstance(bind, engine.Connection) + bind_is_connection = isinstance(bind, engine.Connection) - self._connections[conn] = self._connections[conn.engine] = ( - conn, - transaction, - should_commit, - not bind_is_connection, - ) - self.session.dispatch.after_begin(self.session, self, conn) - return conn + self._connections[conn] = self._connections[conn.engine] = ( + conn, + transaction, + should_commit, + not bind_is_connection, + ) + self.session.dispatch.after_begin(self.session, self, conn) + return conn + finally: + self._state = SessionTransactionState.ACTIVE def prepare(self) -> None: if self._parent is not None or not self.session.twophase: @@ -1354,6 +1372,9 @@ class SessionTransaction(_StateChange, TransactionalContext): class Session(_SessionClassMethods, EventTarget): """Manages persistence operations for ORM-mapped objects. + The :class:`_orm.Session` is **not safe for use in concurrent threads.**. + See :ref:`session_faq_threadsafe` for background. + The Session's usage paradigm is described at :doc:`/orm/session`. @@ -1409,7 +1430,7 @@ class Session(_SessionClassMethods, EventTarget): autocommit: Literal[False] = False, join_transaction_mode: JoinTransactionMode = "conditional_savepoint", ): - r"""Construct a new Session. + r"""Construct a new :class:`_orm.Session`. See also the :class:`.sessionmaker` function which is used to generate a :class:`.Session`-producing callable with a given diff --git a/lib/sqlalchemy/orm/state_changes.py b/lib/sqlalchemy/orm/state_changes.py index 61a94d3f9d..3d74ff2de2 100644 --- a/lib/sqlalchemy/orm/state_changes.py +++ b/lib/sqlalchemy/orm/state_changes.py @@ -59,7 +59,8 @@ class _StateChange: ) -> NoReturn: raise sa_exc.IllegalStateChangeError( f"Can't run operation '{operation_name}()' when Session " - f"is in state {state!r}" + f"is in state {state!r}", + code="isce", ) @classmethod @@ -121,13 +122,15 @@ class _StateChange: f"Method '{fn.__name__}()' can't be called here; " f"method '{existing_fn.__name__}()' is already " f"in progress and this would cause an unexpected " - f"state change to {moves_to!r}" + f"state change to {moves_to!r}", + code="isce", ) else: raise sa_exc.IllegalStateChangeError( f"Cant run operation '{fn.__name__}()' here; " f"will move to state {moves_to!r} where we are " - f"expecting {next_state!r}" + f"expecting {next_state!r}", + code="isce", ) self._current_fn = fn @@ -144,7 +147,8 @@ class _StateChange: raise sa_exc.IllegalStateChangeError( f"Method '{fn.__name__}()' failed to " "change state " - f"to {moves_to!r} as expected" + f"to {moves_to!r} as expected", + code="isce", ) elif existing_fn: raise sa_exc.IllegalStateChangeError( @@ -152,12 +156,14 @@ class _StateChange: "running, " f"method '{fn.__name__}()' caused an " "unexpected " - f"state change to {self._state!r}" + f"state change to {self._state!r}", + code="isce", ) else: raise sa_exc.IllegalStateChangeError( f"Method '{fn.__name__}()' caused an unexpected " - f"state change to {self._state!r}" + f"state change to {self._state!r}", + code="isce", ) finally: @@ -186,7 +192,7 @@ class _StateChange: else: if self._state is not expected: raise sa_exc.IllegalStateChangeError( - f"Unexpected state change to {self._state!r}" + f"Unexpected state change to {self._state!r}", code="isce" ) finally: self._next_state = _StateChangeStates.CHANGE_IN_PROGRESS