Added new parameter :paramref:`.Engine.dispose.close`, defaulting to True.
When False, the engine disposal does not touch the connections in the old
pool at all, simply dropping the pool and replacing it. This use case is so
that when the original pool is transferred from a parent process, the
parent process may continue to use those connections.
Fixes: #7877
Change-Id: I88b0808442381ba5e50674787cdb64f0e77d8b54
--- /dev/null
+.. change::
+ :tags: usecase, engine
+ :tickets: 7877, 7815
+
+ Added new parameter :paramref:`.Engine.dispose.close`, defaulting to True.
+ When False, the engine disposal does not touch the connections in the old
+ pool at all, simply dropping the pool and replacing it. This use case is so
+ that when the original pool is transferred from a parent process, the
+ parent process may continue to use those connections.
+
+ .. seealso::
+
+ :ref:`pooling_multiprocessing` - revised documentation
:class:`_engine.Engine` object is copied to the child process,
:meth:`_engine.Engine.dispose` should be called so that the engine creates
brand new database connections local to that fork. Database connections
- generally do **not** travel across process boundaries.
+ generally do **not** travel across process boundaries. Use the
+ :paramref:`.Engine.dispose.close` parameter set to False in this case.
+ See the section :ref:`pooling_multiprocessing` for more background on this
+ use case.
* Within test suites or multitenancy scenarios where many
ad-hoc, short-lived :class:`_engine.Engine` objects may be created and disposed.
it is entirely closed out and is not held in memory. See :ref:`pool_switching`
for guidelines on how to disable pooling.
+.. seealso::
+
+ :ref:`pooling_toplevel`
+
+ :ref:`pooling_multiprocessing`
+
.. _dbapi_connections:
Working with Driver SQL and Raw DBAPI Connections
engine = create_engine("mysql+mysqldb://user:pass@host/dbname", poolclass=NullPool)
-2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine`
- **directly before** the new process is started, so that the new process
- will create new connections, as well as not attempt to close connections that
- were shared from the parent which can impact the parent's subsequent
- use of those connections. **This is the recommended approach**::
-
- engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
-
- def run_in_process():
- with engine.connect() as conn:
- conn.execute(text("..."))
-
- # before process starts, ensure engine.dispose() is called
- engine.dispose()
- p = Process(target=run_in_process)
- p.start()
-
-3. Alternatively, if the :class:`_engine.Engine` is only to be used in
- child processes, and will not be used from the parent process subsequent
- to the creation of child forks, the dispose may be within the child process
- right as it begins::
-
- engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
-
- def run_in_process():
- # process starts. ensure engine.dispose() is called just once
- # at the beginning. note this cause parent process connections
- # to be closed for most drivers
- engine.dispose()
+2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine`,
+ passing the :paramref:`.Engine.dispose.close` parameter with a value of
+ ``False``, within the initialize phase of the child process. This is
+ so that the new process will not touch any of the parent process' connections
+ and will instead start with new connections.
+ **This is the recommended approach**::
+
+ from multiprocessing import Pool
+
+ engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
+
+ def run_in_process(some_data_record):
+ with engine.connect() as conn:
+ conn.execute(text("..."))
+
+ def initializer():
+ """ensure the parent proc's database connections are not touched
+ in the new connection pool"""
+ engine.dispose(close=False)
- with engine.connect() as conn:
- conn.execute(text("..."))
+ with Pool(10, initializer=initializer) as p:
+ p.map(run_in_process, data)
- p = Process(target=run_in_process)
- p.start()
- # after child process starts, "engine" above should not be used within
- # the parent process for connectivity, without calling
- # engine.dispose() first
+ .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
+ parameter to allow the replacement of a connection pool in a child
+ process without interfering with the connections used by the parent
+ process.
+
+3. Call :meth:`.Engine.dispose` **directly before** the child process is
+ created. This will also cause the child process to start with a new
+ connection pool, while ensuring the parent connections are not transferred
+ to the child process::
+
+ engine = create_engine("mysql://user:pass@host/dbname")
+
+ def run_in_process():
+ with engine.connect() as conn:
+ conn.execute(text("..."))
+
+ # before process starts, ensure engine.dispose() is called
+ engine.dispose()
+ p = Process(target=run_in_process)
+ p.start()
4. An event handler can be applied to the connection pool that tests for
connections being shared across process boundaries, and invalidates them::
def __repr__(self) -> str:
return "Engine(%r)" % (self.url,)
- def dispose(self) -> None:
+ def dispose(self, close: bool = True) -> None:
"""Dispose of the connection pool used by this
:class:`_engine.Engine`.
- This has the effect of fully closing all **currently checked in**
- database connections. Connections that are still checked out
- will **not** be closed, however they will no longer be associated
- with this :class:`_engine.Engine`,
- so when they are closed individually,
- eventually the :class:`_pool.Pool` which they are associated with will
- be garbage collected and they will be closed out fully, if
- not already closed on checkin.
-
- A new connection pool is created immediately after the old one has
- been disposed. This new pool, like all SQLAlchemy connection pools,
- does not make any actual connections to the database until one is
- first requested, so as long as the :class:`_engine.Engine`
- isn't used again,
- no new connections will be made.
+ A new connection pool is created immediately after the old one has been
+ disposed. The previous connection pool is disposed either actively, by
+ closing out all currently checked-in connections in that pool, or
+ passively, by losing references to it but otherwise not closing any
+ connections. The latter strategy is more appropriate for an initializer
+ in a forked Python process.
+
+ :param close: if left at its default of ``True``, has the
+ effect of fully closing all **currently checked in**
+ database connections. Connections that are still checked out
+ will **not** be closed, however they will no longer be associated
+ with this :class:`_engine.Engine`,
+ so when they are closed individually, eventually the
+ :class:`_pool.Pool` which they are associated with will
+ be garbage collected and they will be closed out fully, if
+ not already closed on checkin.
+
+ If set to ``False``, the previous connection pool is de-referenced,
+ and otherwise not touched in any way.
+
+ .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
+ parameter to allow the replacement of a connection pool in a child
+ process without interfering with the connections used by the parent
+ process.
+
.. seealso::
:ref:`engine_disposal`
+ :ref:`pooling_multiprocessing`
+
"""
- self.pool.dispose()
+ if close:
+ self.pool.dispose()
self.pool = self.pool.recreate()
self.dispatch.engine_disposed(self)
import collections.abc as collections_abc
from contextlib import contextmanager
from contextlib import nullcontext
+import copy
from io import StringIO
import re
import threading
eq_(canary.mock_calls, [call(eng), call(eng)])
+ @testing.requires.ad_hoc_engines
+ @testing.combinations(True, False, argnames="close")
+ def test_close_parameter(self, testing_engine, close):
+ eng = testing_engine(
+ options=dict(pool_size=1, max_overflow=0, poolclass=QueuePool)
+ )
+
+ conn = eng.connect()
+ dbapi_conn_one = conn.connection.dbapi_connection
+ conn.close()
+
+ eng_copy = copy.copy(eng)
+ eng_copy.dispose(close=close)
+ copy_conn = eng_copy.connect()
+ dbapi_conn_two = copy_conn.connection.dbapi_connection
+
+ is_not(dbapi_conn_one, dbapi_conn_two)
+
+ conn = eng.connect()
+ if close:
+ is_not(dbapi_conn_one, conn.connection.dbapi_connection)
+ else:
+ is_(dbapi_conn_one, conn.connection.dbapi_connection)
+
def test_retval_flag(self):
canary = []