]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Integrate "pre-ping" into connection pool.
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 24 Feb 2017 15:50:14 +0000 (10:50 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 20 Mar 2017 22:01:23 +0000 (18:01 -0400)
Added native "pessimistic disconnection" handling to the :class:`.Pool`
object.  The new parameter :paramref:`.Pool.pre_ping`, available from
the engine as :paramref:`.create_engine.pool_pre_ping`, applies an
efficient form of the "pre-ping" recipe featured in the pooling
documentation, which upon each connection check out, emits a simple
statement, typically "SELECT 1", to test the connection for liveness.
If the existing connection is no longer able to respond to commands,
the connection is transparently recycled, and all other connections
made prior to the current timestamp are invalidated.

Change-Id: I89700d0075e60abd2250e54b9bd14daf03c71c00
Fixes: #3919
doc/build/changelog/changelog_12.rst
doc/build/changelog/migration_12.rst
doc/build/core/pooling.rst
lib/sqlalchemy/engine/__init__.py
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/engine/strategies.py
lib/sqlalchemy/exc.py
lib/sqlalchemy/pool.py
lib/sqlalchemy/testing/engines.py
test/engine/test_reconnect.py

index 4612030fdd5035f9e01744cb0d1e2731c0778348..95e7b0a5b5253cd6311537792cec926471cd2cfb 100644 (file)
 
             :ref:`change_1546`
 
+    .. change:: 3919
+        :tags: feature, engine
+        :tickets: 3919
+
+        Added native "pessimistic disconnection" handling to the :class:`.Pool`
+        object.  The new parameter :paramref:`.Pool.pre_ping`, available from
+        the engine as :paramref:`.create_engine.pool_pre_ping`, applies an
+        efficient form of the "pre-ping" recipe featured in the pooling
+        documentation, which upon each connection check out, emits a simple
+        statement, typically "SELECT 1", to test the connection for liveness.
+        If the existing connection is no longer able to respond to commands,
+        the connection is transparently recycled, and all other connections
+        made prior to the current timestamp are invalidated.
+
+        .. seealso::
+
+            :ref:`pool_disconnects_pessimistic`
+
+            :ref:`change_3919`
+
     .. change:: 3366
         :tags: bug, orm
         :tickets: 3366
index 21b6a1c026083254156c5515301c7fe0c6c2d233..0e1270d45b9bed72189356c4c59b357fa12e920c 100644 (file)
@@ -181,6 +181,40 @@ Current backend support includes MySQL, Postgresql, and Oracle.
 
 :ticket:`1546`
 
+.. _change_3919:
+
+Pessimistic disconnection detection added to the connection pool
+----------------------------------------------------------------
+
+The connection pool documentation has long featured a recipe for using
+the :meth:`.ConnectionEvents.engine_connect` engine event to emit a simple
+statement on a checked-out connection to test it for liveness.   The
+functionality of this recipe has now been added into the connection pool
+itself, when used in conjunction with an appropriate dialect.   Using
+the new parameter :paramref:`.create_engine.pool_pre_ping`, each connection
+checked out will be tested for freshness before being returned::
+
+    engine = create_engine("mysql+pymysql://", pool_pre_ping=True)
+
+While the "pre-ping" approach adds a small amount of latency to the connection
+pool checkout, for a typical application that is transactionally-oriented
+(which includes most ORM applications), this overhead is minimal, and
+eliminates the problem of acquiring a stale connection that will raise
+an error, requiring that the application either abandon or retry the operation.
+
+The feature does **not** accommodate for connections dropped within
+an ongoing transaction or SQL operation.  If an application must recover
+from these as well, it would need to employ its own operation retry logic
+to anticipate these errors.
+
+
+.. seealso::
+
+    :ref:`pool_disconnects_pessimistic`
+
+
+:ticket:`3919`
+
 .. _change_2694:
 
 New "autoescape" option for startswith(), endswith()
index 65b5ca9cd084ccafe318fd7ddcf046d94d5a1214..086b544dbb3faa9b897fc59ba3ec843d0e203a30 100644 (file)
@@ -170,75 +170,71 @@ its entire set of connections, setting the previously pooled connections as
 when the database server has been restarted, and all previously established connections
 are no longer functional.   There are two approaches to this.
 
-Disconnect Handling - Optimistic
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-The most common approach is to let SQLAlchemy handle disconnects as they
-occur, at which point the pool is refreshed.   This assumes the :class:`.Pool`
-is used in conjunction with a :class:`.Engine`.  The :class:`.Engine` has
-logic which can detect disconnection events and refresh the pool automatically.
-
-When the :class:`.Connection` attempts to use a DBAPI connection, and an
-exception is raised that corresponds to a "disconnect" event, the connection
-is invalidated. The :class:`.Connection` then calls the :meth:`.Pool.recreate`
-method, effectively invalidating all connections not currently checked out so
-that they are replaced with new ones upon next checkout::
-
-    from sqlalchemy import create_engine, exc
-    e = create_engine(...)
-    c = e.connect()
-
-    try:
-        # suppose the database has been restarted.
-        c.execute("SELECT * FROM table")
-        c.close()
-    except exc.DBAPIError, e:
-        # an exception is raised, Connection is invalidated.
-        if e.connection_invalidated:
-            print("Connection was invalidated!")
-
-    # after the invalidate event, a new connection
-    # starts with a new Pool
-    c = e.connect()
-    c.execute("SELECT * FROM table")
-
-The above example illustrates that no special intervention is needed, the pool
-continues normally after a disconnection event is detected.   However, an exception is
-raised.   In a typical web application using an ORM Session, the above condition would
-correspond to a single request failing with a 500 error, then the web application
-continuing normally beyond that.   Hence the approach is "optimistic" in that frequent
-database restarts are not anticipated.
-
-.. _pool_setting_recycle:
-
-Setting Pool Recycle
-~~~~~~~~~~~~~~~~~~~~~~~
-
-An additional setting that can augment the "optimistic" approach is to set the
-pool recycle parameter.   This parameter prevents the pool from using a particular
-connection that has passed a certain age, and is appropriate for database backends
-such as MySQL that automatically close connections that have been stale after a particular
-period of time::
-
-    from sqlalchemy import create_engine
-    e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)
-
-Above, any DBAPI connection that has been open for more than one hour will be invalidated and replaced,
-upon next checkout.   Note that the invalidation **only** occurs during checkout - not on
-any connections that are held in a checked out state.     ``pool_recycle`` is a function
-of the :class:`.Pool` itself, independent of whether or not an :class:`.Engine` is in use.
-
 .. _pool_disconnects_pessimistic:
 
 Disconnect Handling - Pessimistic
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-At the expense of some extra SQL emitted for each connection checked out from
-the pool, a "ping" operation established by a checkout event handler can
-detect an invalid connection before it is used.  In modern SQLAlchemy, the
-best way to do this is to make use of the
-:meth:`.ConnectionEvents.engine_connect` event, assuming the use of a
-:class:`.Engine` and not just a raw :class:`.Pool` object::
+The pessimistic approach refers to emitting a test statement on the SQL
+connection at the start of each connection pool checkout, to test
+that the database connection is still viable.   Typically, this
+is a simple statement like "SELECT 1", but may also make use of some
+DBAPI-specific method to test the connection for liveness.
+
+The approach adds a small bit of overhead to the connection checkout process,
+however is otherwise the most simple and reliable approach to completely
+eliminating database errors due to stale pooled connections.   The calling
+application does not need to be concerned about organizing operations
+to be able to recover from stale connections checked out from the pool.
+
+It is critical to note that the pre-ping approach **does not accommodate for
+connections dropped in the middle of transactions or other SQL operations**.
+If the database becomes unavailable while a transaction is in progress, the
+transaction will be lost and the database error will be raised.   While
+the :class:`.Connection` object will detect a "disconnect" situation and
+recycle the connection as well as invalidate the rest of the connection pool
+when this condition occurs,
+the individual operation where the exception was raised will be lost, and it's
+up to the application to either abandon
+the operation, or retry the whole transaction again.
+
+Pessimistic testing of connections upon checkout is achievable by
+using the :paramref:`.Pool.pre_ping` argument, available from :func:`.create_engine`
+via the :paramref:`.create_engine.pool_pre_ping` argument::
+
+    engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)
+
+The "pre ping" feature will normally emit SQL equivalent to "SELECT 1" each time a
+connection is checked out from the pool; if an error is raised that is detected
+as a "disconnect" situation, the connection will be immediately recycled, and
+all other pooled connections older than the current time are invalidated, so
+that the next time they are checked out, they will also be recycled before use.
+
+If the database is still not available when "pre ping" runs, then the initial
+connect will fail and the error for failure to connect will be propagated
+normally.  In the uncommon situation that the database is available for
+connections, but is not able to respond to a "ping", the "pre_ping" will try up
+to three times before giving up, propagating the database error last received.
+
+.. note::
+
+    the "SELECT 1" emitted by "pre-ping" is invoked within the scope
+    of the connection pool / dialect, using a very short codepath for minimal
+    Python latency.   As such, this statement is **not logged in the SQL
+    echo output**, and will not show up in SQLAlchemy's engine logging.
+
+.. versionadded:: 1.2 Added "pre-ping" capability to the :class:`.Pool`
+   class.
+
+Custom / Legacy Pessimistic Ping
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Before :paramref:`.create_engine.pool_pre_ping` was added, the "pre-ping"
+approach historically has been performed manually using
+the :meth:`.ConnectionEvents.engine_connect` engine event.
+The most common recipe for this is below, for reference
+purposes in case an application is already using such a recipe, or special
+behaviors are needed::
 
     from sqlalchemy import exc
     from sqlalchemy import event
@@ -288,32 +284,73 @@ to correctly invalidate the current connection pool when this condition
 occurs and allowing the current :class:`.Connection` to re-validate onto
 a new DBAPI connection.
 
-For the much less common case of where a :class:`.Pool` is being used without
-an :class:`.Engine`, an older approach may be used as below::
 
-    from sqlalchemy import exc
-    from sqlalchemy import event
-    from sqlalchemy.pool import Pool
+Disconnect Handling - Optimistic
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When pessimistic handling is not employed, as well as when the database is
+shutdown and/or restarted in the middle of a connection's period of use within
+a transaction, the other approach to dealing with stale / closed connections is
+to let SQLAlchemy handle disconnects as  they occur, at which point all
+connections in the pool are invalidated, meaning they are assumed to be
+stale and will be refreshed upon next checkout.  This behavior assumes the
+:class:`.Pool` is used in conjunction with a :class:`.Engine`.
+The :class:`.Engine` has logic which can detect
+disconnection events and refresh the pool automatically.
+
+When the :class:`.Connection` attempts to use a DBAPI connection, and an
+exception is raised that corresponds to a "disconnect" event, the connection
+is invalidated. The :class:`.Connection` then calls the :meth:`.Pool.recreate`
+method, effectively invalidating all connections not currently checked out so
+that they are replaced with new ones upon next checkout.  This flow is
+illustrated by the code example below::
+
+    from sqlalchemy import create_engine, exc
+    e = create_engine(...)
+    c = e.connect()
+
+    try:
+        # suppose the database has been restarted.
+        c.execute("SELECT * FROM table")
+        c.close()
+    except exc.DBAPIError, e:
+        # an exception is raised, Connection is invalidated.
+        if e.connection_invalidated:
+            print("Connection was invalidated!")
+
+    # after the invalidate event, a new connection
+    # starts with a new Pool
+    c = e.connect()
+    c.execute("SELECT * FROM table")
+
+The above example illustrates that no special intervention is needed to
+refresh the pool, which continues normally after a disconnection event is
+detected.   However, one database exception is raised, per each connection
+that is in use while the database unavailability event occurred.
+In a typical web application using an ORM Session, the above condition would
+correspond to a single request failing with a 500 error, then the web application
+continuing normally beyond that.   Hence the approach is "optimistic" in that frequent
+database restarts are not anticipated.
+
+.. _pool_setting_recycle:
+
+Setting Pool Recycle
+~~~~~~~~~~~~~~~~~~~~~~~
+
+An additional setting that can augment the "optimistic" approach is to set the
+pool recycle parameter.   This parameter prevents the pool from using a particular
+connection that has passed a certain age, and is appropriate for database backends
+such as MySQL that automatically close connections that have been stale after a particular
+period of time::
+
+    from sqlalchemy import create_engine
+    e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)
+
+Above, any DBAPI connection that has been open for more than one hour will be invalidated and replaced,
+upon next checkout.   Note that the invalidation **only** occurs during checkout - not on
+any connections that are held in a checked out state.     ``pool_recycle`` is a function
+of the :class:`.Pool` itself, independent of whether or not an :class:`.Engine` is in use.
 
-    @event.listens_for(Pool, "checkout")
-    def ping_connection(dbapi_connection, connection_record, connection_proxy):
-        cursor = dbapi_connection.cursor()
-        try:
-            cursor.execute("SELECT 1")
-        except:
-            # raise DisconnectionError - pool will try
-            # connecting again up to three times before raising.
-            raise exc.DisconnectionError()
-        cursor.close()
-
-Above, the :class:`.Pool` object specifically catches
-:class:`~sqlalchemy.exc.DisconnectionError` and attempts to create a new DBAPI
-connection, up to three times, before giving up and then raising
-:class:`~sqlalchemy.exc.InvalidRequestError`, failing the connection.  The
-disadvantage of the above approach is that we don't have any easy way of
-determining if the exception raised is in fact a "disconnect" situation, since
-there is no :class:`.Engine` or :class:`.Dialect` in play, and also the above
-error would occur individually for all stale connections still in the pool.
 
 .. _pool_connection_invalidation:
 
index bd8b7e68a3bcfe38f4117ddb7dff94344ea1a02c..32e84c53f56b2637fff9da83981ebbd4a6b90398 100644 (file)
@@ -355,6 +355,16 @@ def create_engine(*args, **kwargs):
        "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
        id.
 
+    :param pool_pre_ping: boolean, if True will enable the connection pool
+        "pre-ping" feature that tests connections for liveness upon
+        each checkout.
+
+        .. versionadded:: 1.2
+
+        .. seealso::
+
+            :ref:`pool_disconnects_pessimistic`
+
     :param pool_size=5: the number of connections to keep open
         inside the connection pool. This used with
         :class:`~sqlalchemy.pool.QueuePool` as
index dc2bd5f9762cf4e7c9565f801ec8be3738d867a7..c7d574a2164679b8c77677356071f3cad4a95cff 100644 (file)
@@ -459,6 +459,26 @@ class DefaultDialect(interfaces.Dialect):
     def do_close(self, dbapi_connection):
         dbapi_connection.close()
 
+    @util.memoized_property
+    def _dialect_specific_select_one(self):
+        return str(expression.select([1]).compile(dialect=self))
+
+    def do_ping(self, dbapi_connection):
+        cursor = None
+        try:
+            cursor = dbapi_connection.cursor()
+            try:
+                cursor.execute(self._dialect_specific_select_one)
+            finally:
+                cursor.close()
+        except self.dbapi.Error as err:
+            if self.is_disconnect(err, dbapi_connection, cursor):
+                return False
+            else:
+                raise
+        else:
+            return True
+
     def create_xid(self):
         """Create a random two-phase transaction ID.
 
index 81bb2c50841019f4fe01e14b1e32552bb95a1ac9..2867576980783e342bc6413462e4233b1680e598 100644 (file)
@@ -121,7 +121,8 @@ class DefaultEngineStrategy(EngineStrategy):
                          'recycle': 'pool_recycle',
                          'events': 'pool_events',
                          'use_threadlocal': 'pool_threadlocal',
-                         'reset_on_return': 'pool_reset_on_return'}
+                         'reset_on_return': 'pool_reset_on_return',
+                         'pre_ping': 'pool_pre_ping'}
             for k in util.get_cls_kwargs(poolclass):
                 tk = translate.get(k, k)
                 if tk in kwargs:
index b2e07ae192787039ddda186bae2f3e6ae55a5b0d..e8ba34ba4e4be599f00d81290065a66aa61fa655 100644 (file)
@@ -118,8 +118,25 @@ class DisconnectionError(SQLAlchemyError):
     regarding the connection attempt.
 
     """
+    invalidate_pool = False
 
 
+class InvalidatePoolError(DisconnectionError):
+    """Raised when the connection pool should invalidate all stale connections.
+
+    A subclass of :class:`.DisconnectionError` that indicates that the
+    disconnect situation encountered on the connection probably means the
+    entire pool should be invalidated, as the database has been restarted.
+
+    This exception will be handled otherwise the same way as
+    :class:`.DisconnectionError`, allowing three attempts to reconnect
+    before giving up.
+
+    .. versionadded:: 1.2
+
+    """
+    invalidate_pool = True
+
 class TimeoutError(SQLAlchemyError):
     """Raised when a connection pool times out on getting a connection."""
 
index b58fdaa6728d3d76c9525a0a067d6cbab02bd941..a889d1bba29a46695ef9b506fea2ada17a811dc8 100644 (file)
@@ -88,6 +88,11 @@ class _ConnDialect(object):
     def do_close(self, dbapi_connection):
         dbapi_connection.close()
 
+    def do_ping(self, dbapi_connection):
+        raise NotImplementedError(
+            "The ping feature requires that a dialect is "
+            "passed to the connection pool.")
+
 
 class Pool(log.Identified):
 
@@ -103,6 +108,7 @@ class Pool(log.Identified):
                  listeners=None,
                  events=None,
                  dialect=None,
+                 pre_ping=False,
                  _dispatch=None):
         """
         Construct a Pool.
@@ -219,6 +225,15 @@ class Pool(log.Identified):
          .. versionadded:: 1.1 - ``dialect`` is now a public parameter
             to the :class:`.Pool`.
 
+        :param pre_ping: if True, the pool will emit a "ping" (typically
+        "SELECT 1", but is dialect-specific) on the connection
+         upon checkout, to test if the connection is alive or not.   If not,
+         the connection is transparently re-connected and upon success, all
+         other pooled connections established prior to that timestamp are
+         invalidated.     Requires that a dialect is passed as well to
+         interpret the disconnection error.
+
+         .. versionadded:: 1.2
         """
         if logging_name:
             self.logging_name = self._orig_logging_name = logging_name
@@ -231,6 +246,7 @@ class Pool(log.Identified):
         self._recycle = recycle
         self._invalidate_time = 0
         self._use_threadlocal = use_threadlocal
+        self._pre_ping = pre_ping
         if reset_on_return in ('rollback', True, reset_rollback):
             self._reset_on_return = reset_rollback
         elif reset_on_return in ('none', None, False, reset_none):
@@ -332,7 +348,7 @@ class Pool(log.Identified):
 
         return _ConnectionRecord(self)
 
-    def _invalidate(self, connection, exception=None):
+    def _invalidate(self, connection, exception=None, _checkin=True):
         """Mark all connections established within the generation
         of the given connection as invalidated.
 
@@ -343,11 +359,10 @@ class Pool(log.Identified):
         Connections with a start time prior to this pool's invalidation
         time will be recycled upon next checkout.
         """
-
         rec = getattr(connection, "_connection_record", None)
         if not rec or self._invalidate_time < rec.starttime:
             self._invalidate_time = time.time()
-        if getattr(connection, 'is_valid', False):
+        if _checkin and getattr(connection, 'is_valid', False):
             connection.invalidate(exception)
 
     def recreate(self):
@@ -775,21 +790,49 @@ class _ConnectionFairy(object):
             raise exc.InvalidRequestError("This connection is closed")
         fairy._counter += 1
 
-        if not pool.dispatch.checkout or fairy._counter != 1:
+        if (not pool.dispatch.checkout and not pool._pre_ping) or \
+                fairy._counter != 1:
             return fairy
 
-        # Pool listeners can trigger a reconnection on checkout
+        # Pool listeners can trigger a reconnection on checkout, as well
+        # as the pre-pinger.
+        # there are three attempts made here, but note that if the database
+        # is not accessible from a connection standpoint, those won't proceed
+        # here.
         attempts = 2
         while attempts > 0:
             try:
+                if pool._pre_ping:
+                    if fairy._echo:
+                        pool.logger.debug(
+                            "Pool pre-ping on connection %s",
+                            fairy.connection)
+
+                    result = pool._dialect.do_ping(fairy.connection)
+                    if not result:
+                        if fairy._echo:
+                            pool.logger.debug(
+                                "Pool pre-ping on connection %s failed, "
+                                "will invalidate pool", fairy.connection)
+                        raise exc.InvalidatePoolError()
+
                 pool.dispatch.checkout(fairy.connection,
                                        fairy._connection_record,
                                        fairy)
                 return fairy
             except exc.DisconnectionError as e:
-                pool.logger.info(
-                    "Disconnection detected on checkout: %s", e)
-                fairy._connection_record.invalidate(e)
+                if e.invalidate_pool:
+                    pool.logger.info(
+                        "Disconnection detected on checkout, "
+                        "invalidating all pooled connections prior to "
+                        "current timestamp: %r", e)
+                    fairy._connection_record.invalidate(e)
+                    pool._invalidate(fairy, e, _checkin=False)
+                else:
+                    pool.logger.info(
+                        "Disconnection detected on checkout, "
+                        "invalidating individual connection: %r", e)
+                    fairy._connection_record.invalidate(e)
                 try:
                     fairy.connection = \
                         fairy._connection_record.get_connection()
index 6bca75fb1f835b9ce785bb73069210bcf2b186b5..4510ba6e5a2dc8c5670dbee7c6af5acdbfa6709d 100644 (file)
@@ -161,14 +161,24 @@ class ReconnectFixture(object):
     def __init__(self, dbapi):
         self.dbapi = dbapi
         self.connections = []
+        self.is_stopped = False
 
     def __getattr__(self, key):
         return getattr(self.dbapi, key)
 
     def connect(self, *args, **kwargs):
+
         conn = self.dbapi.connect(*args, **kwargs)
-        self.connections.append(conn)
-        return conn
+        if self.is_stopped:
+            self._safe(conn.close)
+            curs = conn.cursor()  # should fail on Oracle etc.
+            # should fail for everything that didn't fail
+            # above, connection is closed
+            curs.execute("select 1")
+            assert False, "simulated connect failure didn't work"
+        else:
+            self.connections.append(conn)
+            return conn
 
     def _safe(self, fn):
         try:
@@ -178,16 +188,20 @@ class ReconnectFixture(object):
                 "ReconnectFixture couldn't "
                 "close connection: %s" % e)
 
-    def shutdown(self):
+    def shutdown(self, stop=False):
         # TODO: this doesn't cover all cases
         # as nicely as we'd like, namely MySQLdb.
         # would need to implement R. Brewer's
         # proxy server idea to get better
         # coverage.
+        self.is_stopped = stop
         for c in list(self.connections):
             self._safe(c.close)
         self.connections = []
 
+    def restart(self):
+        self.is_stopped = False
+
 
 def reconnecting_engine(url=None, options=None):
     url = url or config.db.url
@@ -200,9 +214,11 @@ def reconnecting_engine(url=None, options=None):
 
     def dispose():
         engine.dialect.dbapi.shutdown()
+        engine.dialect.dbapi.is_stopped = False
         _dispose()
 
     engine.test_shutdown = engine.dialect.dbapi.shutdown
+    engine.test_restart = engine.dialect.dbapi.restart
     engine.dispose = dispose
     return engine
 
index 0d7cdb4e70a8ecf62c91b1ee07c9b2501c6fe899..be60056a5612435e178e6a228c056d498bfd053c 100644 (file)
@@ -4,6 +4,7 @@ from sqlalchemy import (
     select, MetaData, Integer, String, create_engine, pool, exc, util)
 from sqlalchemy.testing.schema import Table, Column
 import sqlalchemy as tsa
+from sqlalchemy.engine import url
 from sqlalchemy import testing
 from sqlalchemy.testing import mock
 from sqlalchemy.testing import engines
@@ -81,18 +82,27 @@ def mock_connection():
 
 def MockDBAPI():
     connections = []
+    stopped = [False]
 
     def connect():
         while True:
+            if stopped[0]:
+                raise MockDisconnect("database is stopped")
             conn = mock_connection()
             connections.append(conn)
             yield conn
 
-    def shutdown(explode='execute'):
+    def shutdown(explode='execute', stop=False):
+        stopped[0] = stop
         for c in connections:
             c.explode = explode
 
+    def restart():
+        stopped[0] = False
+        connections[:] = []
+
     def dispose():
+        stopped[0] = False
         for c in connections:
             c.explode = None
         connections[:] = []
@@ -101,11 +111,101 @@ def MockDBAPI():
         connect=Mock(side_effect=connect()),
         shutdown=Mock(side_effect=shutdown),
         dispose=Mock(side_effect=dispose),
+        restart=Mock(side_effect=restart),
         paramstyle='named',
         connections=connections,
         Error=MockError)
 
 
+class PrePingMockTest(fixtures.TestBase):
+    def setup(self):
+        self.dbapi = MockDBAPI()
+
+    def _pool_fixture(self, pre_ping):
+        dialect = url.make_url(
+            'postgresql://foo:bar@localhost/test').get_dialect()()
+        dialect.dbapi = self.dbapi
+        _pool = pool.QueuePool(
+            creator=lambda: self.dbapi.connect('foo.db'), pre_ping=pre_ping,
+            dialect=dialect)
+
+        dialect.is_disconnect = \
+            lambda e, conn, cursor: isinstance(e, MockDisconnect)
+        return _pool
+
+    def teardown(self):
+        self.dbapi.dispose()
+
+    def test_connect_across_restart(self):
+        pool = self._pool_fixture(pre_ping=True)
+
+        conn = pool.connect()
+        stale_connection = conn.connection
+        conn.close()
+
+        self.dbapi.shutdown("execute")
+        self.dbapi.restart()
+
+        conn = pool.connect()
+        cursor = conn.cursor()
+        cursor.execute("hi")
+
+        stale_cursor = stale_connection.cursor()
+        assert_raises(
+            MockDisconnect,
+            stale_cursor.execute, "hi"
+        )
+
+    def test_raise_db_is_stopped(self):
+        pool = self._pool_fixture(pre_ping=True)
+
+        conn = pool.connect()
+        conn.close()
+
+        self.dbapi.shutdown("execute", stop=True)
+
+        assert_raises_message(
+            MockDisconnect,
+            "database is stopped",
+            pool.connect
+        )
+
+    def test_waits_til_exec_wo_ping_db_is_stopped(self):
+        pool = self._pool_fixture(pre_ping=False)
+
+        conn = pool.connect()
+        conn.close()
+
+        self.dbapi.shutdown("execute", stop=True)
+
+        conn = pool.connect()
+
+        cursor = conn.cursor()
+        assert_raises_message(
+            MockDisconnect,
+            "Lost the DB connection on execute",
+            cursor.execute, "foo"
+        )
+
+    def test_waits_til_exec_wo_ping_db_is_restarted(self):
+        pool = self._pool_fixture(pre_ping=False)
+
+        conn = pool.connect()
+        conn.close()
+
+        self.dbapi.shutdown("execute", stop=True)
+        self.dbapi.restart()
+
+        conn = pool.connect()
+
+        cursor = conn.cursor()
+        assert_raises_message(
+            MockDisconnect,
+            "Lost the DB connection on execute",
+            cursor.execute, "foo"
+        )
+
+
 class MockReconnectTest(fixtures.TestBase):
     def setup(self):
         self.dbapi = MockDBAPI()
@@ -473,7 +573,6 @@ class MockReconnectTest(fixtures.TestBase):
         conn.execute(select([1]))
         assert not conn.invalidated
 
-
 class CursorErrTest(fixtures.TestBase):
     # this isn't really a "reconnect" test, it's more of
     # a generic "recovery".   maybe this test suite should have been
@@ -818,6 +917,52 @@ class RecycleTest(fixtures.TestBase):
             conn.close()
 
 
+class PrePingRealTest(fixtures.TestBase):
+    __backend__ = True
+
+    def test_pre_ping_db_is_restarted(self):
+        engine = engines.reconnecting_engine(
+            options={"pool_pre_ping": True}
+        )
+
+        conn = engine.connect()
+        eq_(conn.execute(select([1])).scalar(), 1)
+        stale_connection = conn.connection.connection
+        conn.close()
+
+        engine.test_shutdown()
+        engine.test_restart()
+
+        conn = engine.connect()
+        eq_(conn.execute(select([1])).scalar(), 1)
+        conn.close()
+
+        def exercise_stale_connection():
+            curs = stale_connection.cursor()
+            curs.execute("select 1")
+
+        assert_raises(
+            engine.dialect.dbapi.Error,
+            exercise_stale_connection
+        )
+
+    def test_pre_ping_db_stays_shutdown(self):
+        engine = engines.reconnecting_engine(
+            options={"pool_pre_ping": True}
+        )
+
+        conn = engine.connect()
+        eq_(conn.execute(select([1])).scalar(), 1)
+        conn.close()
+
+        engine.test_shutdown(stop=True)
+
+        assert_raises(
+            exc.DBAPIError,
+            engine.connect
+        )
+
+
 class InvalidateDuringResultTest(fixtures.TestBase):
     __backend__ = True