.. _pooling_multiprocessing:
-Using Connection Pools with Multiprocessing
--------------------------------------------
+Using Connection Pools with Multiprocessing or os.fork()
+--------------------------------------------------------
It's critical that when using a connection pool, and by extension when
using an :class:`_engine.Engine` created via :func:`_sa.create_engine`, that
boundaries, meaning this will cause concurrent access to the file descriptor
on behalf of two or more entirely independent Python interpreter states.
-There are two approaches to dealing with this.
+Depending on specifics of the driver and OS, the issues that arise here range
+from non-working connections to socket connections that are used by multiple
+processes concurrently, leading to broken messaging (the latter case is
+typically the most common).
-The first is, either create a new :class:`_engine.Engine` within the child
-process, or upon an existing :class:`_engine.Engine`, call :meth:`_engine.Engine.dispose`
-before the child process uses any connections. This will remove all existing
-connections from the pool so that it makes all new ones. Below is
-a simple version using ``multiprocessing.Process``, but this idea
-should be adapted to the style of forking in use::
+The SQLAlchemy :class:`_engine.Engine` object refers to a connection pool of existing
+database connections. So when this object is replicated to a child process,
+the goal is to ensure that no database connections are carried over. There
+are three general approaches to this:
- engine = create_engine("...")
+1. Disable pooling using :class:`.NullPool`. This is the most simplistic,
+ one shot system that prevents the :class:`_engine.Engine` from using any connection
+ more than once::
+
+ from sqlalchemy.pool import NullPool
+ engine = create_engine("mysql://user:pass@host/dbname", poolclass=NullPool)
+
+
+2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine` as
+ soon one is within the new process. In Python multiprocessing, constructs
+ such as ``multiprocessing.Pool`` include "initializer" hooks which are a
+ place that this can be performed; otherwise at the top of where
+ ``os.fork()`` or where the ``Process`` object begins the child fork, a
+ single call to :meth:`_engine.Engine.dispose` will ensure any remaining
+ connections are flushed. **This is the recommended approach**::
+
+ engine = create_engine("mysql://user:pass@host/dbname")
def run_in_process():
- engine.dispose()
+ # process starts. ensure engine.dispose() is called just once
+ # at the beginning
+ engine.dispose()
- with engine.connect() as conn:
- conn.execute(text("..."))
+ with engine.connect() as conn:
+ conn.execute(text("..."))
p = Process(target=run_in_process)
+ p.start()
-The next approach is to instrument the :class:`_pool.Pool` itself with events
-so that connections are automatically invalidated in the subprocess.
-This is a little more magical but probably more foolproof::
+3. An event handler can be applied to the connection pool that tests for
+ connections being shared across process boundaries, and invalidates them.
+ This approach, **when combined with an explicit call to dispose() as
+ mentioned above**, should cover all cases::
from sqlalchemy import event
from sqlalchemy import exc
(connection_record.info['pid'], pid)
)
-Above, we use an approach similar to that described in
-:ref:`pool_disconnects_pessimistic` to treat a DBAPI connection that
-originated in a different parent process as an "invalid" connection,
-coercing the pool to recycle the connection record to make a new connection.
+ Above, we use an approach similar to that described in
+ :ref:`pool_disconnects_pessimistic` to treat a DBAPI connection that
+ originated in a different parent process as an "invalid" connection,
+ coercing the pool to recycle the connection record to make a new connection.
+
+ When using the above recipe, **ensure the dispose approach from #2 is also
+ used**, as if the connection pool is exhausted in the parent process
+ when the fork occurs, an empty pool will be copied into
+ the child process which will then hang because it has no connections.
+
+The above strategies will accommodate the case of an :class:`_engine.Engine`
+being shared among processes. However, for the case of a transaction-active
+:class:`.Session` or :class:`_engine.Connection` being shared, there's no automatic
+fix for this; an application needs to ensure a new child process only
+initiate new :class:`_engine.Connection` objects and transactions, as well as ORM
+:class:`.Session` objects. For a :class:`.Session` object, technically
+this is only needed if the session is currently transaction-bound, however
+the scope of a single :class:`.Session` is in any case intended to be
+kept within a single call stack in any case (e.g. not a global object, not
+shared between processes or threads).
How do I use engines / connections / sessions with Python multiprocessing, or os.fork()?
----------------------------------------------------------------------------------------
-The key goal with multiple python processes is to prevent any database connections
-from being shared across processes. Depending on specifics of the driver and OS,
-the issues that arise here range from non-working connections to socket connections that
-are used by multiple processes concurrently, leading to broken messaging (the latter
-case is typically the most common).
-
-The SQLAlchemy :class:`_engine.Engine` object refers to a connection pool of existing
-database connections. So when this object is replicated to a child process,
-the goal is to ensure that no database connections are carried over. There
-are three general approaches to this:
-
-1. Disable pooling using :class:`.NullPool`. This is the most simplistic,
- one shot system that prevents the :class:`_engine.Engine` from using any connection
- more than once.
-
-2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine` as soon one is
- within the new process. In Python multiprocessing, constructs such as
- ``multiprocessing.Pool`` include "initializer" hooks which are a place
- that this can be performed; otherwise at the top of where ``os.fork()``
- or where the ``Process`` object begins the child fork, a single call
- to :meth:`_engine.Engine.dispose` will ensure any remaining connections are flushed.
-
-3. An event handler can be applied to the connection pool that tests for connections
- being shared across process boundaries, and invalidates them. This looks like
- the following::
-
- import os
- import warnings
-
- from sqlalchemy import event
- from sqlalchemy import exc
-
- def add_engine_pidguard(engine):
- """Add multiprocessing guards.
-
- Forces a connection to be reconnected if it is detected
- as having been shared to a sub-process.
-
- """
-
- @event.listens_for(engine, "connect")
- def connect(dbapi_connection, connection_record):
- connection_record.info['pid'] = os.getpid()
-
- @event.listens_for(engine, "checkout")
- def checkout(dbapi_connection, connection_record, connection_proxy):
- pid = os.getpid()
- if connection_record.info['pid'] != pid:
- # substitute log.debug() or similar here as desired
- warnings.warn(
- "Parent process %(orig)s forked (%(newproc)s) with an open "
- "database connection, "
- "which is being discarded and recreated." %
- {"newproc": pid, "orig": connection_record.info['pid']})
- connection_record.connection = connection_proxy.connection = None
- raise exc.DisconnectionError(
- "Connection record belongs to pid %s, "
- "attempting to check out in pid %s" %
- (connection_record.info['pid'], pid)
- )
-
- These events are applied to an :class:`_engine.Engine` as soon as its created::
-
- engine = create_engine("...")
-
- add_engine_pidguard(engine)
-
-The above strategies will accommodate the case of an :class:`_engine.Engine`
-being shared among processes. However, for the case of a transaction-active
-:class:`.Session` or :class:`_engine.Connection` being shared, there's no automatic
-fix for this; an application needs to ensure a new child process only
-initiate new :class:`_engine.Connection` objects and transactions, as well as ORM
-:class:`.Session` objects. For a :class:`.Session` object, technically
-this is only needed if the session is currently transaction-bound, however
-the scope of a single :class:`.Session` is in any case intended to be
-kept within a single call stack in any case (e.g. not a global object, not
-shared between processes or threads).
+This is covered in the section :ref:`pooling_multiprocessing`.
+