From df3b4522c0f9e199cb05fe1129f53473e7da26fb Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Thu, 31 Mar 2022 09:08:11 -0400 Subject: [PATCH] add close=False parameter to engine.dispose() Added new parameter :paramref:`.Engine.dispose.close`, defaulting to True. When False, the engine disposal does not touch the connections in the old pool at all, simply dropping the pool and replacing it. This use case is so that when the original pool is transferred from a parent process, the parent process may continue to use those connections. Fixes: #7877 Change-Id: I88b0808442381ba5e50674787cdb64f0e77d8b54 (cherry picked from commit 87a0f7183de4e8454483c7348bf486265bfe1c4d) --- doc/build/changelog/unreleased_14/7877.rst | 13 ++++ doc/build/core/connections.rst | 11 ++- doc/build/core/pooling.rst | 89 +++++++++++++--------- lib/sqlalchemy/engine/base.py | 47 +++++++----- test/engine/test_execute.py | 25 ++++++ 5 files changed, 131 insertions(+), 54 deletions(-) create mode 100644 doc/build/changelog/unreleased_14/7877.rst diff --git a/doc/build/changelog/unreleased_14/7877.rst b/doc/build/changelog/unreleased_14/7877.rst new file mode 100644 index 0000000000..d6ad6facd5 --- /dev/null +++ b/doc/build/changelog/unreleased_14/7877.rst @@ -0,0 +1,13 @@ +.. change:: + :tags: usecase, engine + :tickets: 7877, 7815 + + Added new parameter :paramref:`.Engine.dispose.close`, defaulting to True. + When False, the engine disposal does not touch the connections in the old + pool at all, simply dropping the pool and replacing it. This use case is so + that when the original pool is transferred from a parent process, the + parent process may continue to use those connections. + + .. seealso:: + + :ref:`pooling_multiprocessing` - revised documentation diff --git a/doc/build/core/connections.rst b/doc/build/core/connections.rst index 6395c3c6b9..3c2875bb4c 100644 --- a/doc/build/core/connections.rst +++ b/doc/build/core/connections.rst @@ -1904,7 +1904,10 @@ Valid use cases for calling :meth:`_engine.Engine.dispose` include: :class:`_engine.Engine` object is copied to the child process, :meth:`_engine.Engine.dispose` should be called so that the engine creates brand new database connections local to that fork. Database connections - generally do **not** travel across process boundaries. + generally do **not** travel across process boundaries. Use the + :paramref:`.Engine.dispose.close` parameter set to False in this case. + See the section :ref:`pooling_multiprocessing` for more background on this + use case. * Within test suites or multitenancy scenarios where many ad-hoc, short-lived :class:`_engine.Engine` objects may be created and disposed. @@ -1929,6 +1932,12 @@ use of new connections, and means that when a connection is checked in, it is entirely closed out and is not held in memory. See :ref:`pool_switching` for guidelines on how to disable pooling. +.. seealso:: + + :ref:`pooling_toplevel` + + :ref:`pooling_multiprocessing` + .. _dbapi_connections: Working with Driver SQL and Raw DBAPI Connections diff --git a/doc/build/core/pooling.rst b/doc/build/core/pooling.rst index 6b2735a5d4..c6ef94a0a7 100644 --- a/doc/build/core/pooling.rst +++ b/doc/build/core/pooling.rst @@ -479,45 +479,62 @@ are three general approaches to this: engine = create_engine("mysql://user:pass@host/dbname", poolclass=NullPool) -2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine` - **directly before** the new process is started, so that the new process - will create new connections, as well as not attempt to close connections that - were shared from the parent which can impact the parent's subsequent - use of those connections. **This is the recommended approach**:: - - engine = create_engine("mysql://user:pass@host/dbname") - - def run_in_process(): - with engine.connect() as conn: - conn.execute(text("...")) - - # before process starts, ensure engine.dispose() is called - engine.dispose() - p = Process(target=run_in_process) - p.start() - -3. Alternatively, if the :class:`_engine.Engine` is only to be used in - child processes, and will not be used from the parent process subsequent - to the creation of child forks, the dispose may be within the child process - right as it begins:: - - engine = create_engine("mysql+mysqldb://user:pass@host/dbname") - - def run_in_process(): - # process starts. ensure engine.dispose() is called just once - # at the beginning. note this cause parent process connections - # to be closed for most drivers - engine.dispose() +2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine`, + passing the :paramref:`.Engine.dispose.close` parameter with a value of + ``False``, within the initialize phase of the child process. This is + so that the new process will not touch any of the parent process' connections + and will instead start with new connections. + **This is the recommended approach**:: + + from multiprocessing import Pool + + engine = create_engine("mysql+mysqldb://user:pass@host/dbname") + + def run_in_process(some_data_record): + with engine.connect() as conn: + conn.execute(text("...")) + + def initializer(): + """ensure the parent proc's database connections are not touched + in the new connection pool""" + engine.dispose(close=False) + + with Pool(10, initializer=initializer) as p: + p.map(run_in_process, data) + - with engine.connect() as conn: - conn.execute(text("...")) + .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` + parameter to allow the replacement of a connection pool in a child + process without interfering with the connections used by the parent + process. - p = Process(target=run_in_process) - p.start() + To achieve the same "dispose without close" behavior prior to version + 1.4.33 (all SQLAlchemy versions), instead of calling + :meth:`.Engine.dispose`, replace the :class:`.Pool` directly using + :meth:`.Pool.recreate`:: - # after child process starts, "engine" above should not be used within - # the parent process for connectivity, without calling - # engine.dispose() first + engine.pool = engine.pool.recreate() + + The above code is equivalent to ``engine.dispose(close=False)`` with the + exception that the :meth:`.ConnectionEvents.engine_disposed` end-user + event hook is not invoked; assuming end-user code is not making use of + this hook, this workaround has no other negative effects. + +3. Call :meth:`.Engine.dispose` **directly before** the child process is + created. This will also cause the child process to start with a new + connection pool, while ensuring the parent connections are not transferred + to the child process:: + + engine = create_engine("mysql://user:pass@host/dbname") + + def run_in_process(): + with engine.connect() as conn: + conn.execute(text("...")) + + # before process starts, ensure engine.dispose() is called + engine.dispose() + p = Process(target=run_in_process) + p.start() 4. An event handler can be applied to the connection pool that tests for connections being shared across process boundaries, and invalidates them:: diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index b5a3096e5b..eca4a9e10a 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -2948,32 +2948,45 @@ class Engine(Connectable, log.Identified): def __repr__(self): return "Engine(%r)" % (self.url,) - def dispose(self): + def dispose(self, close=True): """Dispose of the connection pool used by this :class:`_engine.Engine`. - This has the effect of fully closing all **currently checked in** - database connections. Connections that are still checked out - will **not** be closed, however they will no longer be associated - with this :class:`_engine.Engine`, - so when they are closed individually, - eventually the :class:`_pool.Pool` which they are associated with will - be garbage collected and they will be closed out fully, if - not already closed on checkin. - - A new connection pool is created immediately after the old one has - been disposed. This new pool, like all SQLAlchemy connection pools, - does not make any actual connections to the database until one is - first requested, so as long as the :class:`_engine.Engine` - isn't used again, - no new connections will be made. + A new connection pool is created immediately after the old one has been + disposed. The previous connection pool is disposed either actively, by + closing out all currently checked-in connections in that pool, or + passively, by losing references to it but otherwise not closing any + connections. The latter strategy is more appropriate for an initializer + in a forked Python process. + + :param close: if left at its default of ``True``, has the + effect of fully closing all **currently checked in** + database connections. Connections that are still checked out + will **not** be closed, however they will no longer be associated + with this :class:`_engine.Engine`, + so when they are closed individually, eventually the + :class:`_pool.Pool` which they are associated with will + be garbage collected and they will be closed out fully, if + not already closed on checkin. + + If set to ``False``, the previous connection pool is de-referenced, + and otherwise not touched in any way. + + .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` + parameter to allow the replacement of a connection pool in a child + process without interfering with the connections used by the parent + process. + .. seealso:: :ref:`engine_disposal` + :ref:`pooling_multiprocessing` + """ - self.pool.dispose() + if close: + self.pool.dispose() self.pool = self.pool.recreate() self.dispatch.engine_disposed(self) diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index f462a7035c..2a61bd1f63 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1,6 +1,7 @@ # coding: utf-8 from contextlib import contextmanager +import copy import re import threading import weakref @@ -2191,6 +2192,30 @@ class EngineEventsTest(fixtures.TestBase): eq_(canary.mock_calls, [call(eng), call(eng)]) + @testing.requires.ad_hoc_engines + @testing.combinations(True, False, argnames="close") + def test_close_parameter(self, testing_engine, close): + eng = testing_engine( + options=dict(pool_size=1, max_overflow=0, poolclass=QueuePool) + ) + + conn = eng.connect() + dbapi_conn_one = conn.connection.dbapi_connection + conn.close() + + eng_copy = copy.copy(eng) + eng_copy.dispose(close=close) + copy_conn = eng_copy.connect() + dbapi_conn_two = copy_conn.connection.dbapi_connection + + is_not(dbapi_conn_one, dbapi_conn_two) + + conn = eng.connect() + if close: + is_not(dbapi_conn_one, conn.connection.dbapi_connection) + else: + is_(dbapi_conn_one, conn.connection.dbapi_connection) + def test_retval_flag(self): canary = [] -- 2.47.2