]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
add close=False parameter to engine.dispose()
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 31 Mar 2022 13:08:11 +0000 (09:08 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 31 Mar 2022 13:19:46 +0000 (09:19 -0400)
Added new parameter :paramref:`.Engine.dispose.close`, defaulting to True.
When False, the engine disposal does not touch the connections in the old
pool at all, simply dropping the pool and replacing it. This use case is so
that when the original pool is transferred from a parent process, the
parent process may continue to use those connections.

Fixes: #7877
Change-Id: I88b0808442381ba5e50674787cdb64f0e77d8b54

doc/build/changelog/unreleased_14/7877.rst [new file with mode: 0644]
doc/build/core/connections.rst
doc/build/core/pooling.rst
lib/sqlalchemy/engine/base.py
test/engine/test_execute.py

diff --git a/doc/build/changelog/unreleased_14/7877.rst b/doc/build/changelog/unreleased_14/7877.rst
new file mode 100644 (file)
index 0000000..d6ad6fa
--- /dev/null
@@ -0,0 +1,13 @@
+.. change::
+    :tags: usecase, engine
+    :tickets: 7877, 7815
+
+    Added new parameter :paramref:`.Engine.dispose.close`, defaulting to True.
+    When False, the engine disposal does not touch the connections in the old
+    pool at all, simply dropping the pool and replacing it. This use case is so
+    that when the original pool is transferred from a parent process, the
+    parent process may continue to use those connections.
+
+    .. seealso::
+
+        :ref:`pooling_multiprocessing` - revised documentation
index 2f19367d48f07d8988a79b598240822de73e8220..11d35237a71954773a268950b5470172cdabe245 100644 (file)
@@ -1720,7 +1720,10 @@ Valid use cases for calling :meth:`_engine.Engine.dispose` include:
   :class:`_engine.Engine` object is copied to the child process,
   :meth:`_engine.Engine.dispose` should be called so that the engine creates
   brand new database connections local to that fork.   Database connections
-  generally do **not** travel across process boundaries.
+  generally do **not** travel across process boundaries.  Use the
+  :paramref:`.Engine.dispose.close` parameter set to False in this case.
+  See the section :ref:`pooling_multiprocessing` for more background on this
+  use case.
 
 * Within test suites or multitenancy scenarios where many
   ad-hoc, short-lived :class:`_engine.Engine` objects may be created and disposed.
@@ -1745,6 +1748,12 @@ use of new connections, and means that when a connection is checked in,
 it is entirely closed out and is not held in memory.  See :ref:`pool_switching`
 for guidelines on how to disable pooling.
 
+.. seealso::
+
+    :ref:`pooling_toplevel`
+
+    :ref:`pooling_multiprocessing`
+
 .. _dbapi_connections:
 
 Working with Driver SQL and Raw DBAPI Connections
index f685b240d24964634a8f8416cdec1d94f3b12a0d..58a3a4350f02baa5785716260083068f6ababb25 100644 (file)
@@ -461,45 +461,50 @@ are three general approaches to this:
     engine = create_engine("mysql+mysqldb://user:pass@host/dbname", poolclass=NullPool)
 
 
-2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine`
-   **directly before** the new process is started, so that the new process
-   will create new connections, as well as not attempt to close connections that
-   were shared from the parent which can impact the parent's subsequent
-   use of those connections.  **This is the recommended approach**::
-
-    engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
-
-    def run_in_process():
-        with engine.connect() as conn:
-            conn.execute(text("..."))
-
-    # before process starts, ensure engine.dispose() is called
-    engine.dispose()
-    p = Process(target=run_in_process)
-    p.start()
-
-3. Alternatively, if the :class:`_engine.Engine` is only to be used in
-   child processes, and will not be used from the parent process subsequent
-   to the creation of child forks, the dispose may be within the child process
-   right as it begins::
-
-    engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
-
-    def run_in_process():
-        # process starts. ensure engine.dispose() is called just once
-        # at the beginning.  note this cause parent process connections
-        # to be closed for most drivers
-        engine.dispose()
+2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine`,
+   passing the :paramref:`.Engine.dispose.close` parameter with a value of
+   ``False``, within the initialize phase of the child process.  This is
+   so that the new process will not touch any of the parent process' connections
+   and will instead start with new connections.
+   **This is the recommended approach**::
+
+        from multiprocessing import Pool
+
+        engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
+
+        def run_in_process(some_data_record):
+            with engine.connect() as conn:
+                conn.execute(text("..."))
+
+        def initializer():
+            """ensure the parent proc's database connections are not touched
+               in the new connection pool"""
+            engine.dispose(close=False)
 
-        with engine.connect() as conn:
-            conn.execute(text("..."))
+        with Pool(10, initializer=initializer) as p:
+            p.map(run_in_process, data)
 
-    p = Process(target=run_in_process)
-    p.start()
 
-    # after child process starts, "engine" above should not be used within
-    # the parent process for connectivity, without calling
-    # engine.dispose() first
+   .. versionadded:: 1.4.33  Added the :paramref:`.Engine.dispose.close`
+      parameter to allow the replacement of a connection pool in a child
+      process without interfering with the connections used by the parent
+      process.
+
+3. Call :meth:`.Engine.dispose` **directly before** the child process is
+   created.  This will also cause the child process to start with a new
+   connection pool, while ensuring the parent connections are not transferred
+   to the child process::
+
+        engine = create_engine("mysql://user:pass@host/dbname")
+
+        def run_in_process():
+            with engine.connect() as conn:
+                conn.execute(text("..."))
+
+        # before process starts, ensure engine.dispose() is called
+        engine.dispose()
+        p = Process(target=run_in_process)
+        p.start()
 
 4. An event handler can be applied to the connection pool that tests for
    connections being shared across process boundaries, and invalidates them::
index db6d1ef3f2dd21dadd64ab711344bddfa095e981..ad31585fb20ec876878ef4ec2db5043c68acd399 100644 (file)
@@ -2688,32 +2688,45 @@ class Engine(
     def __repr__(self) -> str:
         return "Engine(%r)" % (self.url,)
 
-    def dispose(self) -> None:
+    def dispose(self, close: bool = True) -> None:
         """Dispose of the connection pool used by this
         :class:`_engine.Engine`.
 
-        This has the effect of fully closing all **currently checked in**
-        database connections.  Connections that are still checked out
-        will **not** be closed, however they will no longer be associated
-        with this :class:`_engine.Engine`,
-        so when they are closed individually,
-        eventually the :class:`_pool.Pool` which they are associated with will
-        be garbage collected and they will be closed out fully, if
-        not already closed on checkin.
-
-        A new connection pool is created immediately after the old one has
-        been disposed.   This new pool, like all SQLAlchemy connection pools,
-        does not make any actual connections to the database until one is
-        first requested, so as long as the :class:`_engine.Engine`
-        isn't used again,
-        no new connections will be made.
+        A new connection pool is created immediately after the old one has been
+        disposed. The previous connection pool is disposed either actively, by
+        closing out all currently checked-in connections in that pool, or
+        passively, by losing references to it but otherwise not closing any
+        connections. The latter strategy is more appropriate for an initializer
+        in a forked Python process.
+
+        :param close: if left at its default of ``True``, has the
+         effect of fully closing all **currently checked in**
+         database connections.  Connections that are still checked out
+         will **not** be closed, however they will no longer be associated
+         with this :class:`_engine.Engine`,
+         so when they are closed individually, eventually the
+         :class:`_pool.Pool` which they are associated with will
+         be garbage collected and they will be closed out fully, if
+         not already closed on checkin.
+
+         If set to ``False``, the previous connection pool is de-referenced,
+         and otherwise not touched in any way.
+
+        .. versionadded:: 1.4.33  Added the :paramref:`.Engine.dispose.close`
+            parameter to allow the replacement of a connection pool in a child
+            process without interfering with the connections used by the parent
+            process.
+
 
         .. seealso::
 
             :ref:`engine_disposal`
 
+            :ref:`pooling_multiprocessing`
+
         """
-        self.pool.dispose()
+        if close:
+            self.pool.dispose()
         self.pool = self.pool.recreate()
         self.dispatch.engine_disposed(self)
 
index 2c8a07220ccaa621a19ae58beab252d31790d335..b561db99e0ab55f3bde289dddbec33a4f38e0ebd 100644 (file)
@@ -3,6 +3,7 @@
 import collections.abc as collections_abc
 from contextlib import contextmanager
 from contextlib import nullcontext
+import copy
 from io import StringIO
 import re
 import threading
@@ -2277,6 +2278,30 @@ class EngineEventsTest(fixtures.TestBase):
 
         eq_(canary.mock_calls, [call(eng), call(eng)])
 
+    @testing.requires.ad_hoc_engines
+    @testing.combinations(True, False, argnames="close")
+    def test_close_parameter(self, testing_engine, close):
+        eng = testing_engine(
+            options=dict(pool_size=1, max_overflow=0, poolclass=QueuePool)
+        )
+
+        conn = eng.connect()
+        dbapi_conn_one = conn.connection.dbapi_connection
+        conn.close()
+
+        eng_copy = copy.copy(eng)
+        eng_copy.dispose(close=close)
+        copy_conn = eng_copy.connect()
+        dbapi_conn_two = copy_conn.connection.dbapi_connection
+
+        is_not(dbapi_conn_one, dbapi_conn_two)
+
+        conn = eng.connect()
+        if close:
+            is_not(dbapi_conn_one, conn.connection.dbapi_connection)
+        else:
+            is_(dbapi_conn_one, conn.connection.dbapi_connection)
+
     def test_retval_flag(self):
         canary = []