--- /dev/null
+.. change::
+ :tags: usecase, engine
+ :tickets: 7877, 7815
+
+ Added new parameter :paramref:`.Engine.dispose.close`, defaulting to True.
+ When False, the engine disposal does not touch the connections in the old
+ pool at all, simply dropping the pool and replacing it. This use case is so
+ that when the original pool is transferred from a parent process, the
+ parent process may continue to use those connections.
+
+ .. seealso::
+
+ :ref:`pooling_multiprocessing` - revised documentation
:class:`_engine.Engine` object is copied to the child process,
:meth:`_engine.Engine.dispose` should be called so that the engine creates
brand new database connections local to that fork. Database connections
- generally do **not** travel across process boundaries.
+ generally do **not** travel across process boundaries. Use the
+ :paramref:`.Engine.dispose.close` parameter set to False in this case.
+ See the section :ref:`pooling_multiprocessing` for more background on this
+ use case.
* Within test suites or multitenancy scenarios where many
ad-hoc, short-lived :class:`_engine.Engine` objects may be created and disposed.
it is entirely closed out and is not held in memory. See :ref:`pool_switching`
for guidelines on how to disable pooling.
+.. seealso::
+
+ :ref:`pooling_toplevel`
+
+ :ref:`pooling_multiprocessing`
+
.. _dbapi_connections:
Working with Driver SQL and Raw DBAPI Connections
engine = create_engine("mysql://user:pass@host/dbname", poolclass=NullPool)
-2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine`
- **directly before** the new process is started, so that the new process
- will create new connections, as well as not attempt to close connections that
- were shared from the parent which can impact the parent's subsequent
- use of those connections. **This is the recommended approach**::
-
- engine = create_engine("mysql://user:pass@host/dbname")
-
- def run_in_process():
- with engine.connect() as conn:
- conn.execute(text("..."))
-
- # before process starts, ensure engine.dispose() is called
- engine.dispose()
- p = Process(target=run_in_process)
- p.start()
-
-3. Alternatively, if the :class:`_engine.Engine` is only to be used in
- child processes, and will not be used from the parent process subsequent
- to the creation of child forks, the dispose may be within the child process
- right as it begins::
-
- engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
-
- def run_in_process():
- # process starts. ensure engine.dispose() is called just once
- # at the beginning. note this cause parent process connections
- # to be closed for most drivers
- engine.dispose()
+2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine`,
+ passing the :paramref:`.Engine.dispose.close` parameter with a value of
+ ``False``, within the initialize phase of the child process. This is
+ so that the new process will not touch any of the parent process' connections
+ and will instead start with new connections.
+ **This is the recommended approach**::
+
+ from multiprocessing import Pool
+
+ engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
+
+ def run_in_process(some_data_record):
+ with engine.connect() as conn:
+ conn.execute(text("..."))
+
+ def initializer():
+ """ensure the parent proc's database connections are not touched
+ in the new connection pool"""
+ engine.dispose(close=False)
+
+ with Pool(10, initializer=initializer) as p:
+ p.map(run_in_process, data)
+
- with engine.connect() as conn:
- conn.execute(text("..."))
+ .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
+ parameter to allow the replacement of a connection pool in a child
+ process without interfering with the connections used by the parent
+ process.
- p = Process(target=run_in_process)
- p.start()
+ To achieve the same "dispose without close" behavior prior to version
+ 1.4.33 (all SQLAlchemy versions), instead of calling
+ :meth:`.Engine.dispose`, replace the :class:`.Pool` directly using
+ :meth:`.Pool.recreate`::
- # after child process starts, "engine" above should not be used within
- # the parent process for connectivity, without calling
- # engine.dispose() first
+ engine.pool = engine.pool.recreate()
+
+ The above code is equivalent to ``engine.dispose(close=False)`` with the
+ exception that the :meth:`.ConnectionEvents.engine_disposed` end-user
+ event hook is not invoked; assuming end-user code is not making use of
+ this hook, this workaround has no other negative effects.
+
+3. Call :meth:`.Engine.dispose` **directly before** the child process is
+ created. This will also cause the child process to start with a new
+ connection pool, while ensuring the parent connections are not transferred
+ to the child process::
+
+ engine = create_engine("mysql://user:pass@host/dbname")
+
+ def run_in_process():
+ with engine.connect() as conn:
+ conn.execute(text("..."))
+
+ # before process starts, ensure engine.dispose() is called
+ engine.dispose()
+ p = Process(target=run_in_process)
+ p.start()
4. An event handler can be applied to the connection pool that tests for
connections being shared across process boundaries, and invalidates them::
def __repr__(self):
return "Engine(%r)" % (self.url,)
- def dispose(self):
+ def dispose(self, close=True):
"""Dispose of the connection pool used by this
:class:`_engine.Engine`.
- This has the effect of fully closing all **currently checked in**
- database connections. Connections that are still checked out
- will **not** be closed, however they will no longer be associated
- with this :class:`_engine.Engine`,
- so when they are closed individually,
- eventually the :class:`_pool.Pool` which they are associated with will
- be garbage collected and they will be closed out fully, if
- not already closed on checkin.
-
- A new connection pool is created immediately after the old one has
- been disposed. This new pool, like all SQLAlchemy connection pools,
- does not make any actual connections to the database until one is
- first requested, so as long as the :class:`_engine.Engine`
- isn't used again,
- no new connections will be made.
+ A new connection pool is created immediately after the old one has been
+ disposed. The previous connection pool is disposed either actively, by
+ closing out all currently checked-in connections in that pool, or
+ passively, by losing references to it but otherwise not closing any
+ connections. The latter strategy is more appropriate for an initializer
+ in a forked Python process.
+
+ :param close: if left at its default of ``True``, has the
+ effect of fully closing all **currently checked in**
+ database connections. Connections that are still checked out
+ will **not** be closed, however they will no longer be associated
+ with this :class:`_engine.Engine`,
+ so when they are closed individually, eventually the
+ :class:`_pool.Pool` which they are associated with will
+ be garbage collected and they will be closed out fully, if
+ not already closed on checkin.
+
+ If set to ``False``, the previous connection pool is de-referenced,
+ and otherwise not touched in any way.
+
+ .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
+ parameter to allow the replacement of a connection pool in a child
+ process without interfering with the connections used by the parent
+ process.
+
.. seealso::
:ref:`engine_disposal`
+ :ref:`pooling_multiprocessing`
+
"""
- self.pool.dispose()
+ if close:
+ self.pool.dispose()
self.pool = self.pool.recreate()
self.dispatch.engine_disposed(self)
# coding: utf-8
from contextlib import contextmanager
+import copy
import re
import threading
import weakref
eq_(canary.mock_calls, [call(eng), call(eng)])
+ @testing.requires.ad_hoc_engines
+ @testing.combinations(True, False, argnames="close")
+ def test_close_parameter(self, testing_engine, close):
+ eng = testing_engine(
+ options=dict(pool_size=1, max_overflow=0, poolclass=QueuePool)
+ )
+
+ conn = eng.connect()
+ dbapi_conn_one = conn.connection.dbapi_connection
+ conn.close()
+
+ eng_copy = copy.copy(eng)
+ eng_copy.dispose(close=close)
+ copy_conn = eng_copy.connect()
+ dbapi_conn_two = copy_conn.connection.dbapi_connection
+
+ is_not(dbapi_conn_one, dbapi_conn_two)
+
+ conn = eng.connect()
+ if close:
+ is_not(dbapi_conn_one, conn.connection.dbapi_connection)
+ else:
+ is_(dbapi_conn_one, conn.connection.dbapi_connection)
+
def test_retval_flag(self):
canary = []