From: Mike Bayer Date: Tue, 17 Feb 2015 18:43:48 +0000 (-0500) Subject: - add a new section regarding multiprocessing X-Git-Tag: rel_1_0_0b1~60 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=94d57374c44b49dce8531a0b0ed3116e52530c3b;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - add a new section regarding multiprocessing --- diff --git a/doc/build/core/pooling.rst b/doc/build/core/pooling.rst index 698c103274..0dbf835d92 100644 --- a/doc/build/core/pooling.rst +++ b/doc/build/core/pooling.rst @@ -224,6 +224,8 @@ upon next checkout. Note that the invalidation **only** occurs during checkout any connections that are held in a checked out state. ``pool_recycle`` is a function of the :class:`.Pool` itself, independent of whether or not an :class:`.Engine` is in use. +.. _pool_disconnects_pessimistic: + Disconnect Handling - Pessimistic ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -324,6 +326,64 @@ a DBAPI connection might be invalidated include: All invalidations which occur will invoke the :meth:`.PoolEvents.invalidate` event. +Using Connection Pools with Multiprocessing +------------------------------------------- + +It's critical that when using a connection pool, and by extension when +using an :class:`.Engine` created via :func:`.create_engine`, that +the pooled connections **are not shared to a forked process**. TCP connections +are represented as file descriptors, which usually work across process +boundaries, meaning this will cause concurrent access to the file descriptor +on behalf of two or more entirely independent Python interpreter states. + +There are two approaches to dealing with this. + +The first is, either create a new :class:`.Engine` within the child +process, or upon an existing :class:`.Engine`, call :meth:`.Engine.dispose` +before the child process uses any connections. This will remove all existing +connections from the pool so that it makes all new ones. Below is +a simple version using ``multiprocessing.Process``, but this idea +should be adapted to the style of forking in use:: + + eng = create_engine("...") + + def run_in_process(): + eng.dispose() + + with eng.connect() as conn: + conn.execute("...") + + p = Process(target=run_in_process) + +The next approach is to instrument the :class:`.Pool` itself with events +so that connections are automatically invalidated in the subprocess. +This is a little more magical but probably more foolproof:: + + from sqlalchemy import event + from sqlalchemy import exc + import os + + eng = create_engine("...") + + @event.listens_for(engine, "connect") + def connect(dbapi_connection, connection_record): + connection_record.info['pid'] = os.getpid() + + @event.listens_for(engine, "checkout") + def checkout(dbapi_connection, connection_record, connection_proxy): + pid = os.getpid() + if connection_record.info['pid'] != pid: + connection_record.connection = connection_proxy.connection = None + raise exc.DisconnectionError( + "Connection record belongs to pid %s, " + "attempting to check out in pid %s" % + (connection_record.info['pid'], pid) + ) + +Above, we use an approach similar to that described in +:ref:`pool_disconnects_pessimistic` to treat a DBAPI connection that +originated in a different parent process as an "invalid" connection, +coercing the pool to recycle the connection record to make a new connection.