]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
track state change within _connection_for_bind()
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 19 Jun 2023 15:54:35 +0000 (11:54 -0400)
committermike bayer <mike_mp@zzzcomputing.com>
Wed, 21 Jun 2023 17:06:44 +0000 (17:06 +0000)
Additional hardening and documentation for the ORM :class:`_orm.Session`
"state change" system, which detects concurrent use of
:class:`_orm.Session` and :class:`_asyncio.AsyncSession` objects; an
additional check is added within the process to acquire connections from
the underlying engine, which is a critical section with regards to internal
connection management.

Change-Id: I6893c53e016218ff6cfb39709179ca4bf73a2414
Fixes: #9973
doc/build/changelog/unreleased_20/9973.rst [new file with mode: 0644]
doc/build/errors.rst
doc/build/orm/extensions/asyncio.rst
doc/build/orm/session_basics.rst
lib/sqlalchemy/ext/asyncio/session.py
lib/sqlalchemy/orm/session.py
lib/sqlalchemy/orm/state_changes.py

diff --git a/doc/build/changelog/unreleased_20/9973.rst b/doc/build/changelog/unreleased_20/9973.rst
new file mode 100644 (file)
index 0000000..c9d74d2
--- /dev/null
@@ -0,0 +1,10 @@
+.. change::
+    :tags: bug, orm
+    :tickets: 9973
+
+    Additional hardening and documentation for the ORM :class:`_orm.Session`
+    "state change" system, which detects concurrent use of
+    :class:`_orm.Session` and :class:`_asyncio.AsyncSession` objects; an
+    additional check is added within the process to acquire connections from
+    the underlying engine, which is a critical section with regards to internal
+    connection management.
index e02dfcb3faf6d1252b3686b7b64450b56329f1b1..bcfb43efb75860a622abe06cbe73552146c94f4b 100644 (file)
@@ -835,6 +835,42 @@ and instead keep the SQL construction as explicit as possible.
 Object Relational Mapping
 -------------------------
 
+.. _error_isce:
+
+IllegalStateChangeError and concurrency exceptions
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+SQLAlchemy 2.0 introduced a new system described at :ref:`change_7433`, which
+proactively detects concurrent methods being invoked on an individual instance of
+the :class:`_orm.Session`
+object and by extension the :class:`_asyncio.AsyncSession` proxy object.
+These concurrent access calls typically, though not exclusively, would occur
+when a single instance of :class:`_orm.Session` is shared among multiple
+concurrent threads without such access being synchronized, or similarly
+when a single instance of :class:`_asyncio.AsyncSession` is shared among
+multiple concurrent tasks (such as when using a function like ``asyncio.gather()``).
+These use patterns are not the appropriate use of these objects, where without
+the proactive warning system SQLAlchemy implements would still otherwise produce
+invalid state within the objects, producing hard-to-debug errors including
+driver-level errors on the database connections themselves.
+
+Instances of :class:`_orm.Session` and :class:`_asyncio.AsyncSession` are
+**mutable, stateful objects with no built-in synchronization** of method calls,
+and represent a **single, ongoing database transaction** upon a single database
+connection at a time for a particular :class:`.Engine` or :class:`.AsyncEngine`
+to which the object is bound (note that these objects both support being bound
+to multiple engines at once, however in this case there will still be only one
+connection per engine in play within the scope of a transaction).  A single
+database transaction is not an appropriate target for concurrent SQL commands;
+instead, an application that runs concurrent database operations should use
+concurrent transactions. For these objects then it follows that the appropriate
+pattern is :class:`_orm.Session` per thread, or :class:`_asyncio.AsyncSession`
+per task.
+
+For more background on concurrency see the section
+:ref:`session_faq_threadsafe`.
+
+
 .. _error_bhk3:
 
 Parent instance <x> is not bound to a Session; (lazy load/deferred load/refresh/etc.) operation cannot proceed
index 016a3d908e6d6f14669c16c7b913d950961795a9..e4782f2bbb24db88247c70e43c626e11c5bdc89d 100644 (file)
@@ -140,11 +140,21 @@ Synopsis - ORM
 ---------------
 
 Using :term:`2.0 style` querying, the :class:`_asyncio.AsyncSession` class
-provides full ORM functionality. Within the default mode of use, special care
-must be taken to avoid :term:`lazy loading` or other expired-attribute access
-involving ORM relationships and column attributes; the next
-section :ref:`asyncio_orm_avoid_lazyloads` details this.   The example below
-illustrates a complete example including mapper and session configuration::
+provides full ORM functionality.
+
+Within the default mode of use, special care must be taken to avoid :term:`lazy
+loading` or other expired-attribute access involving ORM relationships and
+column attributes; the next section :ref:`asyncio_orm_avoid_lazyloads` details
+this.
+
+.. warning::
+
+    A single instance of :class:`_asyncio.AsyncSession` is **not safe for
+    use in multiple, concurrent tasks**.  See the sections
+    :ref:`asyncio_concurrency` and :ref:`session_faq_threadsafe` for background.
+
+The example below illustrates a complete example including mapper and session
+configuration::
 
     from __future__ import annotations
 
@@ -264,6 +274,21 @@ the end of the block; this is equivalent to calling the
 :meth:`_asyncio.AsyncSession.close` method.
 
 
+.. _asyncio_concurrency:
+
+Using AsyncSession with Concurrent Tasks
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The :class:`_asyncio.AsyncSession` object is a **mutable, stateful object**
+which represents a **single, stateful database transaction in progress**. Using
+concurrent tasks with asyncio, with APIs such as ``asyncio.gather()`` for
+example, should use a **separate** :class:`_asyncio.AsyncSession` **per individual
+task**.
+
+See the section :ref:`session_faq_threadsafe` for a general description of
+the :class:`_orm.Session` and :class:`_asyncio.AsyncSession` with regards to
+how they should be used with concurrent workloads.
+
 .. _asyncio_orm_avoid_lazyloads:
 
 Preventing Implicit IO when Using AsyncSession
index 33a58e87c7fc0e8a571da4895d37d0321fa6ad67..6cdb58e1a21fbbfd446c27c35f6a8c710c147da2 100644 (file)
@@ -944,48 +944,68 @@ The newer :ref:`core_inspection_toplevel` system can also be used::
 
 .. _session_faq_threadsafe:
 
-Is the session thread-safe?
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The :class:`.Session` is very much intended to be used in a
-**non-concurrent** fashion, which usually means in only one thread at a
-time.
-
-The :class:`.Session` should be used in such a way that one
-instance exists for a single series of operations within a single
-transaction.   One expedient way to get this effect is by associating
-a :class:`.Session` with the current thread (see :ref:`unitofwork_contextual`
-for background).  Another is to use a pattern
-where the :class:`.Session` is passed between functions and is otherwise
-not shared with other threads.
-
-The bigger point is that you should not *want* to use the session
-with multiple concurrent threads. That would be like having everyone at a
-restaurant all eat from the same plate. The session is a local "workspace"
-that you use for a specific set of tasks; you don't want to, or need to,
-share that session with other threads who are doing some other task.
-
-Making sure the :class:`.Session` is only used in a single concurrent thread at a time
-is called a "share nothing" approach to concurrency.  But actually, not
-sharing the :class:`.Session` implies a more significant pattern; it
-means not just the :class:`.Session` object itself, but
-also **all objects that are associated with that Session**, must be kept within
-the scope of a single concurrent thread.   The set of mapped
-objects associated with a :class:`.Session` are essentially proxies for data
-within database rows accessed over a database connection, and so just like
-the :class:`.Session` itself, the whole
-set of objects is really just a large-scale proxy for a database connection
-(or connections).  Ultimately, it's mostly the DBAPI connection itself that
-we're keeping away from concurrent access; but since the :class:`.Session`
-and all the objects associated with it are all proxies for that DBAPI connection,
-the entire graph is essentially not safe for concurrent access.
-
-If there are in fact multiple threads participating
-in the same task, then you may consider sharing the session and its objects between
-those threads; however, in this extremely unusual scenario the application would
-need to ensure that a proper locking scheme is implemented so that there isn't
-*concurrent* access to the :class:`.Session` or its state.   A more common approach
-to this situation is to maintain a single :class:`.Session` per concurrent thread,
-but to instead *copy* objects from one :class:`.Session` to another, often
-using the :meth:`.Session.merge` method to copy the state of an object into
-a new object local to a different :class:`.Session`.
+Is the Session thread-safe?  Is AsyncSession safe to share in concurrent tasks?
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The :class:`.Session` is a **mutable, stateful** object that represents a **single
+database transaction**.   An instance of :class:`.Session` therefore **cannot
+be shared among concurrent threads or asyncio tasks without careful
+synchronization**. The :class:`.Session` is intended to be used in a
+**non-concurrent** fashion, that is, a particular instance of :class:`.Session`
+should be used in only one thread or task at a time.
+
+When using the :class:`_asyncio.AsyncSession` object from SQLAlchemy's
+:ref:`asyncio <asyncio_toplevel>` extension, this object is only a thin proxy
+on top of a :class:`_orm.Session`, and the same rules apply; it is an
+**unsynchronized, mutable, stateful object**, so it is **not** safe to use a single
+instance of :class:`_asyncio.AsyncSession` in multiple asyncio tasks at once.
+
+An instance of :class:`.Session` or :class:`_asyncio.AsyncSession` represents a
+single logical database transaction, referencing only a single
+:class:`_engine.Connection` at a time for a particular :class:`.Engine` or
+:class:`.AsyncEngine` to which the object is bound (note that these objects
+both support being bound to multiple engines at once, however in this case
+there will still be only one connection per engine in play within the
+scope of a transaction).
+
+A database connection within a transaction is also a stateful object that is
+intended to be operated upon in a non-concurrent, sequential fashion. Commands
+are issued on the connection in a sequence, which are handled by the database
+server in the exact order in which they are emitted.   As the
+:class:`_orm.Session` emits commands upon this connection and receives results,
+the :class:`_orm.Session` itself is transitioning through internal state
+changes that align with the state of commands and data present on this
+connection; states which include if a transaction were begun, committed, or
+rolled back, what SAVEPOINTs if any are in play, as well as fine-grained
+synchronization of the state of individual database rows with local ORM-mapped
+objects.
+
+When designing database applications for concurrency, the appropriate model is
+that each concurrent task / thread works with its own database transaction.
+This is why when discussing the issue of database concurrency, the standard
+terminology used is **multiple, concurrent transactions**.   Within traditional
+RDMS there is no analogue for a single database transaction that is receiving
+and processing multiple commands concurrently.
+
+The concurrency model for SQLAlchemy's :class:`_orm.Session` and
+:class:`_asyncio.AsyncSession` is therefore **Session per thread, AsyncSession per
+task**.  An application that uses multiple threads, or multiple tasks in
+asyncio such as when using an API like ``asyncio.gather()`` would want to ensure
+that each thread has its own :class:`_orm.Session`, each asyncio task
+has its own :class:`_asyncio.AsyncSession`.
+
+The best way to ensure this use is by using the :ref:`standard context manager
+pattern <session_getting>`  locally within the top level Python function that
+is inside the thread or task, which will ensure the lifespan of the
+:class:`_orm.Session` or :class:`_asyncio.AsyncSession` is maintained within
+a local scope.
+
+For applications that benefit from having a "global" :class:`.Session`
+where it's not an option to pass the :class:`.Session` object to specific
+functions and methods which require it, the :class:`.scoped_session`
+approach can provide for a "thread local" :class:`.Session` object;
+see the section :ref:`unitofwork_contextual` for background.   Within
+the asyncio context, the :class:`.async_scoped_session`
+object is the asyncio analogue for :class:`.scoped_session`, however is more
+challenging to configure as it requires a custom "context" function.
+
index 6cab56baa91f72c6e0048b6c3db507e3da08fccd..ceeaff4d90f31d2055123ac3b872030b1a3d24dc 100644 (file)
@@ -203,6 +203,9 @@ class AsyncSession(ReversibleProxy[Session]):
     The :class:`_asyncio.AsyncSession` is a proxy for a traditional
     :class:`_orm.Session` instance.
 
+    The :class:`_asyncio.AsyncSession` is **not safe for use in concurrent
+    tasks.**.  See :ref:`session_faq_threadsafe` for background.
+
     .. versionadded:: 1.4
 
     To use an :class:`_asyncio.AsyncSession` with custom :class:`_orm.Session`
index dd0cfb79081efbf02996feb83886a7807fcefc12..8d19e2374e1c053ab6c181ab6b69d2c05a5ded19 100644 (file)
@@ -251,10 +251,13 @@ class SessionTransactionState(_StateChangeState):
     COMMITTED = 3
     DEACTIVE = 4
     CLOSED = 5
+    PROVISIONING_CONNECTION = 6
 
 
 # backwards compatibility
-ACTIVE, PREPARED, COMMITTED, DEACTIVE, CLOSED = tuple(SessionTransactionState)
+ACTIVE, PREPARED, COMMITTED, DEACTIVE, CLOSED, PROVISIONING_CONNECTION = tuple(
+    SessionTransactionState
+)
 
 
 class ORMExecuteState(util.MemoizedSlots):
@@ -919,6 +922,12 @@ class SessionTransaction(_StateChange, TransactionalContext):
                 )
         elif state is SessionTransactionState.CLOSED:
             raise sa_exc.ResourceClosedError("This transaction is closed")
+        elif state is SessionTransactionState.PROVISIONING_CONNECTION:
+            raise sa_exc.InvalidRequestError(
+                "This session is provisioning a new connection; concurrent "
+                "operations are not permitted",
+                code="isce",
+            )
         else:
             raise sa_exc.InvalidRequestError(
                 f"This session is in '{state.name.lower()}' state; no "
@@ -1090,80 +1099,89 @@ class SessionTransaction(_StateChange, TransactionalContext):
                 )
             return self._connections[bind][0]
 
+        self._state = SessionTransactionState.PROVISIONING_CONNECTION
+
         local_connect = False
         should_commit = True
 
-        if self._parent:
-            conn = self._parent._connection_for_bind(bind, execution_options)
-            if not self.nested:
-                return conn
-        else:
-            if isinstance(bind, engine.Connection):
-                conn = bind
-                if conn.engine in self._connections:
-                    raise sa_exc.InvalidRequestError(
-                        "Session already has a Connection associated for the "
-                        "given Connection's Engine"
-                    )
-            else:
-                conn = bind.connect()
-                local_connect = True
-
         try:
-            if execution_options:
-                conn = conn.execution_options(**execution_options)
-
-            transaction: Transaction
-            if self.session.twophase and self._parent is None:
-                # TODO: shouldn't we only be here if not
-                # conn.in_transaction() ?
-                # if twophase is set and conn.in_transaction(), validate
-                # that it is in fact twophase.
-                transaction = conn.begin_twophase()
-            elif self.nested:
-                transaction = conn.begin_nested()
-            elif conn.in_transaction():
-                join_transaction_mode = self.session.join_transaction_mode
-
-                if join_transaction_mode == "conditional_savepoint":
-                    if conn.in_nested_transaction():
-                        join_transaction_mode = "create_savepoint"
-                    else:
-                        join_transaction_mode = "rollback_only"
+            if self._parent:
+                conn = self._parent._connection_for_bind(
+                    bind, execution_options
+                )
+                if not self.nested:
+                    return conn
+            else:
+                if isinstance(bind, engine.Connection):
+                    conn = bind
+                    if conn.engine in self._connections:
+                        raise sa_exc.InvalidRequestError(
+                            "Session already has a Connection associated "
+                            "for the given Connection's Engine"
+                        )
+                else:
+                    conn = bind.connect()
+                    local_connect = True
 
-                if join_transaction_mode in (
-                    "control_fully",
-                    "rollback_only",
-                ):
-                    if conn.in_nested_transaction():
-                        transaction = conn._get_required_nested_transaction()
-                    else:
-                        transaction = conn._get_required_transaction()
-                    if join_transaction_mode == "rollback_only":
-                        should_commit = False
-                elif join_transaction_mode == "create_savepoint":
+            try:
+                if execution_options:
+                    conn = conn.execution_options(**execution_options)
+
+                transaction: Transaction
+                if self.session.twophase and self._parent is None:
+                    # TODO: shouldn't we only be here if not
+                    # conn.in_transaction() ?
+                    # if twophase is set and conn.in_transaction(), validate
+                    # that it is in fact twophase.
+                    transaction = conn.begin_twophase()
+                elif self.nested:
                     transaction = conn.begin_nested()
+                elif conn.in_transaction():
+                    join_transaction_mode = self.session.join_transaction_mode
+
+                    if join_transaction_mode == "conditional_savepoint":
+                        if conn.in_nested_transaction():
+                            join_transaction_mode = "create_savepoint"
+                        else:
+                            join_transaction_mode = "rollback_only"
+
+                    if join_transaction_mode in (
+                        "control_fully",
+                        "rollback_only",
+                    ):
+                        if conn.in_nested_transaction():
+                            transaction = (
+                                conn._get_required_nested_transaction()
+                            )
+                        else:
+                            transaction = conn._get_required_transaction()
+                        if join_transaction_mode == "rollback_only":
+                            should_commit = False
+                    elif join_transaction_mode == "create_savepoint":
+                        transaction = conn.begin_nested()
+                    else:
+                        assert False, join_transaction_mode
                 else:
-                    assert False, join_transaction_mode
+                    transaction = conn.begin()
+            except:
+                # connection will not not be associated with this Session;
+                # close it immediately so that it isn't closed under GC
+                if local_connect:
+                    conn.close()
+                raise
             else:
-                transaction = conn.begin()
-        except:
-            # connection will not not be associated with this Session;
-            # close it immediately so that it isn't closed under GC
-            if local_connect:
-                conn.close()
-            raise
-        else:
-            bind_is_connection = isinstance(bind, engine.Connection)
+                bind_is_connection = isinstance(bind, engine.Connection)
 
-            self._connections[conn] = self._connections[conn.engine] = (
-                conn,
-                transaction,
-                should_commit,
-                not bind_is_connection,
-            )
-            self.session.dispatch.after_begin(self.session, self, conn)
-            return conn
+                self._connections[conn] = self._connections[conn.engine] = (
+                    conn,
+                    transaction,
+                    should_commit,
+                    not bind_is_connection,
+                )
+                self.session.dispatch.after_begin(self.session, self, conn)
+                return conn
+        finally:
+            self._state = SessionTransactionState.ACTIVE
 
     def prepare(self) -> None:
         if self._parent is not None or not self.session.twophase:
@@ -1354,6 +1372,9 @@ class SessionTransaction(_StateChange, TransactionalContext):
 class Session(_SessionClassMethods, EventTarget):
     """Manages persistence operations for ORM-mapped objects.
 
+    The :class:`_orm.Session` is **not safe for use in concurrent threads.**.
+    See :ref:`session_faq_threadsafe` for background.
+
     The Session's usage paradigm is described at :doc:`/orm/session`.
 
 
@@ -1409,7 +1430,7 @@ class Session(_SessionClassMethods, EventTarget):
         autocommit: Literal[False] = False,
         join_transaction_mode: JoinTransactionMode = "conditional_savepoint",
     ):
-        r"""Construct a new Session.
+        r"""Construct a new :class:`_orm.Session`.
 
         See also the :class:`.sessionmaker` function which is used to
         generate a :class:`.Session`-producing callable with a given
index 61a94d3f9dc2f86ef8456ddcd4a49cbda8921a02..3d74ff2de224d41d930419e9cf712f9123053942 100644 (file)
@@ -59,7 +59,8 @@ class _StateChange:
     ) -> NoReturn:
         raise sa_exc.IllegalStateChangeError(
             f"Can't run operation '{operation_name}()' when Session "
-            f"is in state {state!r}"
+            f"is in state {state!r}",
+            code="isce",
         )
 
     @classmethod
@@ -121,13 +122,15 @@ class _StateChange:
                         f"Method '{fn.__name__}()' can't be called here; "
                         f"method '{existing_fn.__name__}()' is already "
                         f"in progress and this would cause an unexpected "
-                        f"state change to {moves_to!r}"
+                        f"state change to {moves_to!r}",
+                        code="isce",
                     )
                 else:
                     raise sa_exc.IllegalStateChangeError(
                         f"Cant run operation '{fn.__name__}()' here; "
                         f"will move to state {moves_to!r} where we are "
-                        f"expecting {next_state!r}"
+                        f"expecting {next_state!r}",
+                        code="isce",
                     )
 
             self._current_fn = fn
@@ -144,7 +147,8 @@ class _StateChange:
                     raise sa_exc.IllegalStateChangeError(
                         f"Method '{fn.__name__}()' failed to "
                         "change state "
-                        f"to {moves_to!r} as expected"
+                        f"to {moves_to!r} as expected",
+                        code="isce",
                     )
                 elif existing_fn:
                     raise sa_exc.IllegalStateChangeError(
@@ -152,12 +156,14 @@ class _StateChange:
                         "running, "
                         f"method '{fn.__name__}()' caused an "
                         "unexpected "
-                        f"state change to {self._state!r}"
+                        f"state change to {self._state!r}",
+                        code="isce",
                     )
                 else:
                     raise sa_exc.IllegalStateChangeError(
                         f"Method '{fn.__name__}()' caused an unexpected "
-                        f"state change to {self._state!r}"
+                        f"state change to {self._state!r}",
+                        code="isce",
                     )
 
             finally:
@@ -186,7 +192,7 @@ class _StateChange:
         else:
             if self._state is not expected:
                 raise sa_exc.IllegalStateChangeError(
-                    f"Unexpected state change to {self._state!r}"
+                    f"Unexpected state change to {self._state!r}", code="isce"
                 )
         finally:
             self._next_state = _StateChangeStates.CHANGE_IN_PROGRESS