:ref:`change_1546`
+ .. change:: 3919
+ :tags: feature, engine
+ :tickets: 3919
+
+ Added native "pessimistic disconnection" handling to the :class:`.Pool`
+ object. The new parameter :paramref:`.Pool.pre_ping`, available from
+ the engine as :paramref:`.create_engine.pool_pre_ping`, applies an
+ efficient form of the "pre-ping" recipe featured in the pooling
+ documentation, which upon each connection check out, emits a simple
+ statement, typically "SELECT 1", to test the connection for liveness.
+ If the existing connection is no longer able to respond to commands,
+ the connection is transparently recycled, and all other connections
+ made prior to the current timestamp are invalidated.
+
+ .. seealso::
+
+ :ref:`pool_disconnects_pessimistic`
+
+ :ref:`change_3919`
+
.. change:: 3366
:tags: bug, orm
:tickets: 3366
:ticket:`1546`
+.. _change_3919:
+
+Pessimistic disconnection detection added to the connection pool
+----------------------------------------------------------------
+
+The connection pool documentation has long featured a recipe for using
+the :meth:`.ConnectionEvents.engine_connect` engine event to emit a simple
+statement on a checked-out connection to test it for liveness. The
+functionality of this recipe has now been added into the connection pool
+itself, when used in conjunction with an appropriate dialect. Using
+the new parameter :paramref:`.create_engine.pool_pre_ping`, each connection
+checked out will be tested for freshness before being returned::
+
+ engine = create_engine("mysql+pymysql://", pool_pre_ping=True)
+
+While the "pre-ping" approach adds a small amount of latency to the connection
+pool checkout, for a typical application that is transactionally-oriented
+(which includes most ORM applications), this overhead is minimal, and
+eliminates the problem of acquiring a stale connection that will raise
+an error, requiring that the application either abandon or retry the operation.
+
+The feature does **not** accommodate for connections dropped within
+an ongoing transaction or SQL operation. If an application must recover
+from these as well, it would need to employ its own operation retry logic
+to anticipate these errors.
+
+
+.. seealso::
+
+ :ref:`pool_disconnects_pessimistic`
+
+
+:ticket:`3919`
+
.. _change_2694:
New "autoescape" option for startswith(), endswith()
when the database server has been restarted, and all previously established connections
are no longer functional. There are two approaches to this.
-Disconnect Handling - Optimistic
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-The most common approach is to let SQLAlchemy handle disconnects as they
-occur, at which point the pool is refreshed. This assumes the :class:`.Pool`
-is used in conjunction with a :class:`.Engine`. The :class:`.Engine` has
-logic which can detect disconnection events and refresh the pool automatically.
-
-When the :class:`.Connection` attempts to use a DBAPI connection, and an
-exception is raised that corresponds to a "disconnect" event, the connection
-is invalidated. The :class:`.Connection` then calls the :meth:`.Pool.recreate`
-method, effectively invalidating all connections not currently checked out so
-that they are replaced with new ones upon next checkout::
-
- from sqlalchemy import create_engine, exc
- e = create_engine(...)
- c = e.connect()
-
- try:
- # suppose the database has been restarted.
- c.execute("SELECT * FROM table")
- c.close()
- except exc.DBAPIError, e:
- # an exception is raised, Connection is invalidated.
- if e.connection_invalidated:
- print("Connection was invalidated!")
-
- # after the invalidate event, a new connection
- # starts with a new Pool
- c = e.connect()
- c.execute("SELECT * FROM table")
-
-The above example illustrates that no special intervention is needed, the pool
-continues normally after a disconnection event is detected. However, an exception is
-raised. In a typical web application using an ORM Session, the above condition would
-correspond to a single request failing with a 500 error, then the web application
-continuing normally beyond that. Hence the approach is "optimistic" in that frequent
-database restarts are not anticipated.
-
-.. _pool_setting_recycle:
-
-Setting Pool Recycle
-~~~~~~~~~~~~~~~~~~~~~~~
-
-An additional setting that can augment the "optimistic" approach is to set the
-pool recycle parameter. This parameter prevents the pool from using a particular
-connection that has passed a certain age, and is appropriate for database backends
-such as MySQL that automatically close connections that have been stale after a particular
-period of time::
-
- from sqlalchemy import create_engine
- e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)
-
-Above, any DBAPI connection that has been open for more than one hour will be invalidated and replaced,
-upon next checkout. Note that the invalidation **only** occurs during checkout - not on
-any connections that are held in a checked out state. ``pool_recycle`` is a function
-of the :class:`.Pool` itself, independent of whether or not an :class:`.Engine` is in use.
-
.. _pool_disconnects_pessimistic:
Disconnect Handling - Pessimistic
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-At the expense of some extra SQL emitted for each connection checked out from
-the pool, a "ping" operation established by a checkout event handler can
-detect an invalid connection before it is used. In modern SQLAlchemy, the
-best way to do this is to make use of the
-:meth:`.ConnectionEvents.engine_connect` event, assuming the use of a
-:class:`.Engine` and not just a raw :class:`.Pool` object::
+The pessimistic approach refers to emitting a test statement on the SQL
+connection at the start of each connection pool checkout, to test
+that the database connection is still viable. Typically, this
+is a simple statement like "SELECT 1", but may also make use of some
+DBAPI-specific method to test the connection for liveness.
+
+The approach adds a small bit of overhead to the connection checkout process,
+however is otherwise the most simple and reliable approach to completely
+eliminating database errors due to stale pooled connections. The calling
+application does not need to be concerned about organizing operations
+to be able to recover from stale connections checked out from the pool.
+
+It is critical to note that the pre-ping approach **does not accommodate for
+connections dropped in the middle of transactions or other SQL operations**.
+If the database becomes unavailable while a transaction is in progress, the
+transaction will be lost and the database error will be raised. While
+the :class:`.Connection` object will detect a "disconnect" situation and
+recycle the connection as well as invalidate the rest of the connection pool
+when this condition occurs,
+the individual operation where the exception was raised will be lost, and it's
+up to the application to either abandon
+the operation, or retry the whole transaction again.
+
+Pessimistic testing of connections upon checkout is achievable by
+using the :paramref:`.Pool.pre_ping` argument, available from :func:`.create_engine`
+via the :paramref:`.create_engine.pool_pre_ping` argument::
+
+ engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)
+
+The "pre ping" feature will normally emit SQL equivalent to "SELECT 1" each time a
+connection is checked out from the pool; if an error is raised that is detected
+as a "disconnect" situation, the connection will be immediately recycled, and
+all other pooled connections older than the current time are invalidated, so
+that the next time they are checked out, they will also be recycled before use.
+
+If the database is still not available when "pre ping" runs, then the initial
+connect will fail and the error for failure to connect will be propagated
+normally. In the uncommon situation that the database is available for
+connections, but is not able to respond to a "ping", the "pre_ping" will try up
+to three times before giving up, propagating the database error last received.
+
+.. note::
+
+ the "SELECT 1" emitted by "pre-ping" is invoked within the scope
+ of the connection pool / dialect, using a very short codepath for minimal
+ Python latency. As such, this statement is **not logged in the SQL
+ echo output**, and will not show up in SQLAlchemy's engine logging.
+
+.. versionadded:: 1.2 Added "pre-ping" capability to the :class:`.Pool`
+ class.
+
+Custom / Legacy Pessimistic Ping
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Before :paramref:`.create_engine.pool_pre_ping` was added, the "pre-ping"
+approach historically has been performed manually using
+the :meth:`.ConnectionEvents.engine_connect` engine event.
+The most common recipe for this is below, for reference
+purposes in case an application is already using such a recipe, or special
+behaviors are needed::
from sqlalchemy import exc
from sqlalchemy import event
occurs and allowing the current :class:`.Connection` to re-validate onto
a new DBAPI connection.
-For the much less common case of where a :class:`.Pool` is being used without
-an :class:`.Engine`, an older approach may be used as below::
- from sqlalchemy import exc
- from sqlalchemy import event
- from sqlalchemy.pool import Pool
+Disconnect Handling - Optimistic
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When pessimistic handling is not employed, as well as when the database is
+shutdown and/or restarted in the middle of a connection's period of use within
+a transaction, the other approach to dealing with stale / closed connections is
+to let SQLAlchemy handle disconnects as they occur, at which point all
+connections in the pool are invalidated, meaning they are assumed to be
+stale and will be refreshed upon next checkout. This behavior assumes the
+:class:`.Pool` is used in conjunction with a :class:`.Engine`.
+The :class:`.Engine` has logic which can detect
+disconnection events and refresh the pool automatically.
+
+When the :class:`.Connection` attempts to use a DBAPI connection, and an
+exception is raised that corresponds to a "disconnect" event, the connection
+is invalidated. The :class:`.Connection` then calls the :meth:`.Pool.recreate`
+method, effectively invalidating all connections not currently checked out so
+that they are replaced with new ones upon next checkout. This flow is
+illustrated by the code example below::
+
+ from sqlalchemy import create_engine, exc
+ e = create_engine(...)
+ c = e.connect()
+
+ try:
+ # suppose the database has been restarted.
+ c.execute("SELECT * FROM table")
+ c.close()
+ except exc.DBAPIError, e:
+ # an exception is raised, Connection is invalidated.
+ if e.connection_invalidated:
+ print("Connection was invalidated!")
+
+ # after the invalidate event, a new connection
+ # starts with a new Pool
+ c = e.connect()
+ c.execute("SELECT * FROM table")
+
+The above example illustrates that no special intervention is needed to
+refresh the pool, which continues normally after a disconnection event is
+detected. However, one database exception is raised, per each connection
+that is in use while the database unavailability event occurred.
+In a typical web application using an ORM Session, the above condition would
+correspond to a single request failing with a 500 error, then the web application
+continuing normally beyond that. Hence the approach is "optimistic" in that frequent
+database restarts are not anticipated.
+
+.. _pool_setting_recycle:
+
+Setting Pool Recycle
+~~~~~~~~~~~~~~~~~~~~~~~
+
+An additional setting that can augment the "optimistic" approach is to set the
+pool recycle parameter. This parameter prevents the pool from using a particular
+connection that has passed a certain age, and is appropriate for database backends
+such as MySQL that automatically close connections that have been stale after a particular
+period of time::
+
+ from sqlalchemy import create_engine
+ e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)
+
+Above, any DBAPI connection that has been open for more than one hour will be invalidated and replaced,
+upon next checkout. Note that the invalidation **only** occurs during checkout - not on
+any connections that are held in a checked out state. ``pool_recycle`` is a function
+of the :class:`.Pool` itself, independent of whether or not an :class:`.Engine` is in use.
- @event.listens_for(Pool, "checkout")
- def ping_connection(dbapi_connection, connection_record, connection_proxy):
- cursor = dbapi_connection.cursor()
- try:
- cursor.execute("SELECT 1")
- except:
- # raise DisconnectionError - pool will try
- # connecting again up to three times before raising.
- raise exc.DisconnectionError()
- cursor.close()
-
-Above, the :class:`.Pool` object specifically catches
-:class:`~sqlalchemy.exc.DisconnectionError` and attempts to create a new DBAPI
-connection, up to three times, before giving up and then raising
-:class:`~sqlalchemy.exc.InvalidRequestError`, failing the connection. The
-disadvantage of the above approach is that we don't have any easy way of
-determining if the exception raised is in fact a "disconnect" situation, since
-there is no :class:`.Engine` or :class:`.Dialect` in play, and also the above
-error would occur individually for all stale connections still in the pool.
.. _pool_connection_invalidation:
"sqlalchemy.pool" logger. Defaults to a hexstring of the object's
id.
+ :param pool_pre_ping: boolean, if True will enable the connection pool
+ "pre-ping" feature that tests connections for liveness upon
+ each checkout.
+
+ .. versionadded:: 1.2
+
+ .. seealso::
+
+ :ref:`pool_disconnects_pessimistic`
+
:param pool_size=5: the number of connections to keep open
inside the connection pool. This used with
:class:`~sqlalchemy.pool.QueuePool` as
def do_close(self, dbapi_connection):
dbapi_connection.close()
+ @util.memoized_property
+ def _dialect_specific_select_one(self):
+ return str(expression.select([1]).compile(dialect=self))
+
+ def do_ping(self, dbapi_connection):
+ cursor = None
+ try:
+ cursor = dbapi_connection.cursor()
+ try:
+ cursor.execute(self._dialect_specific_select_one)
+ finally:
+ cursor.close()
+ except self.dbapi.Error as err:
+ if self.is_disconnect(err, dbapi_connection, cursor):
+ return False
+ else:
+ raise
+ else:
+ return True
+
def create_xid(self):
"""Create a random two-phase transaction ID.
'recycle': 'pool_recycle',
'events': 'pool_events',
'use_threadlocal': 'pool_threadlocal',
- 'reset_on_return': 'pool_reset_on_return'}
+ 'reset_on_return': 'pool_reset_on_return',
+ 'pre_ping': 'pool_pre_ping'}
for k in util.get_cls_kwargs(poolclass):
tk = translate.get(k, k)
if tk in kwargs:
regarding the connection attempt.
"""
+ invalidate_pool = False
+class InvalidatePoolError(DisconnectionError):
+ """Raised when the connection pool should invalidate all stale connections.
+
+ A subclass of :class:`.DisconnectionError` that indicates that the
+ disconnect situation encountered on the connection probably means the
+ entire pool should be invalidated, as the database has been restarted.
+
+ This exception will be handled otherwise the same way as
+ :class:`.DisconnectionError`, allowing three attempts to reconnect
+ before giving up.
+
+ .. versionadded:: 1.2
+
+ """
+ invalidate_pool = True
+
class TimeoutError(SQLAlchemyError):
"""Raised when a connection pool times out on getting a connection."""
def do_close(self, dbapi_connection):
dbapi_connection.close()
+ def do_ping(self, dbapi_connection):
+ raise NotImplementedError(
+ "The ping feature requires that a dialect is "
+ "passed to the connection pool.")
+
class Pool(log.Identified):
listeners=None,
events=None,
dialect=None,
+ pre_ping=False,
_dispatch=None):
"""
Construct a Pool.
.. versionadded:: 1.1 - ``dialect`` is now a public parameter
to the :class:`.Pool`.
+ :param pre_ping: if True, the pool will emit a "ping" (typically
+ "SELECT 1", but is dialect-specific) on the connection
+ upon checkout, to test if the connection is alive or not. If not,
+ the connection is transparently re-connected and upon success, all
+ other pooled connections established prior to that timestamp are
+ invalidated. Requires that a dialect is passed as well to
+ interpret the disconnection error.
+
+ .. versionadded:: 1.2
"""
if logging_name:
self.logging_name = self._orig_logging_name = logging_name
self._recycle = recycle
self._invalidate_time = 0
self._use_threadlocal = use_threadlocal
+ self._pre_ping = pre_ping
if reset_on_return in ('rollback', True, reset_rollback):
self._reset_on_return = reset_rollback
elif reset_on_return in ('none', None, False, reset_none):
return _ConnectionRecord(self)
- def _invalidate(self, connection, exception=None):
+ def _invalidate(self, connection, exception=None, _checkin=True):
"""Mark all connections established within the generation
of the given connection as invalidated.
Connections with a start time prior to this pool's invalidation
time will be recycled upon next checkout.
"""
-
rec = getattr(connection, "_connection_record", None)
if not rec or self._invalidate_time < rec.starttime:
self._invalidate_time = time.time()
- if getattr(connection, 'is_valid', False):
+ if _checkin and getattr(connection, 'is_valid', False):
connection.invalidate(exception)
def recreate(self):
raise exc.InvalidRequestError("This connection is closed")
fairy._counter += 1
- if not pool.dispatch.checkout or fairy._counter != 1:
+ if (not pool.dispatch.checkout and not pool._pre_ping) or \
+ fairy._counter != 1:
return fairy
- # Pool listeners can trigger a reconnection on checkout
+ # Pool listeners can trigger a reconnection on checkout, as well
+ # as the pre-pinger.
+ # there are three attempts made here, but note that if the database
+ # is not accessible from a connection standpoint, those won't proceed
+ # here.
attempts = 2
while attempts > 0:
try:
+ if pool._pre_ping:
+ if fairy._echo:
+ pool.logger.debug(
+ "Pool pre-ping on connection %s",
+ fairy.connection)
+
+ result = pool._dialect.do_ping(fairy.connection)
+ if not result:
+ if fairy._echo:
+ pool.logger.debug(
+ "Pool pre-ping on connection %s failed, "
+ "will invalidate pool", fairy.connection)
+ raise exc.InvalidatePoolError()
+
pool.dispatch.checkout(fairy.connection,
fairy._connection_record,
fairy)
return fairy
except exc.DisconnectionError as e:
- pool.logger.info(
- "Disconnection detected on checkout: %s", e)
- fairy._connection_record.invalidate(e)
+ if e.invalidate_pool:
+ pool.logger.info(
+ "Disconnection detected on checkout, "
+ "invalidating all pooled connections prior to "
+ "current timestamp: %r", e)
+ fairy._connection_record.invalidate(e)
+ pool._invalidate(fairy, e, _checkin=False)
+ else:
+ pool.logger.info(
+ "Disconnection detected on checkout, "
+ "invalidating individual connection: %r", e)
+ fairy._connection_record.invalidate(e)
try:
fairy.connection = \
fairy._connection_record.get_connection()
def __init__(self, dbapi):
self.dbapi = dbapi
self.connections = []
+ self.is_stopped = False
def __getattr__(self, key):
return getattr(self.dbapi, key)
def connect(self, *args, **kwargs):
+
conn = self.dbapi.connect(*args, **kwargs)
- self.connections.append(conn)
- return conn
+ if self.is_stopped:
+ self._safe(conn.close)
+ curs = conn.cursor() # should fail on Oracle etc.
+ # should fail for everything that didn't fail
+ # above, connection is closed
+ curs.execute("select 1")
+ assert False, "simulated connect failure didn't work"
+ else:
+ self.connections.append(conn)
+ return conn
def _safe(self, fn):
try:
"ReconnectFixture couldn't "
"close connection: %s" % e)
- def shutdown(self):
+ def shutdown(self, stop=False):
# TODO: this doesn't cover all cases
# as nicely as we'd like, namely MySQLdb.
# would need to implement R. Brewer's
# proxy server idea to get better
# coverage.
+ self.is_stopped = stop
for c in list(self.connections):
self._safe(c.close)
self.connections = []
+ def restart(self):
+ self.is_stopped = False
+
def reconnecting_engine(url=None, options=None):
url = url or config.db.url
def dispose():
engine.dialect.dbapi.shutdown()
+ engine.dialect.dbapi.is_stopped = False
_dispose()
engine.test_shutdown = engine.dialect.dbapi.shutdown
+ engine.test_restart = engine.dialect.dbapi.restart
engine.dispose = dispose
return engine
select, MetaData, Integer, String, create_engine, pool, exc, util)
from sqlalchemy.testing.schema import Table, Column
import sqlalchemy as tsa
+from sqlalchemy.engine import url
from sqlalchemy import testing
from sqlalchemy.testing import mock
from sqlalchemy.testing import engines
def MockDBAPI():
connections = []
+ stopped = [False]
def connect():
while True:
+ if stopped[0]:
+ raise MockDisconnect("database is stopped")
conn = mock_connection()
connections.append(conn)
yield conn
- def shutdown(explode='execute'):
+ def shutdown(explode='execute', stop=False):
+ stopped[0] = stop
for c in connections:
c.explode = explode
+ def restart():
+ stopped[0] = False
+ connections[:] = []
+
def dispose():
+ stopped[0] = False
for c in connections:
c.explode = None
connections[:] = []
connect=Mock(side_effect=connect()),
shutdown=Mock(side_effect=shutdown),
dispose=Mock(side_effect=dispose),
+ restart=Mock(side_effect=restart),
paramstyle='named',
connections=connections,
Error=MockError)
+class PrePingMockTest(fixtures.TestBase):
+ def setup(self):
+ self.dbapi = MockDBAPI()
+
+ def _pool_fixture(self, pre_ping):
+ dialect = url.make_url(
+ 'postgresql://foo:bar@localhost/test').get_dialect()()
+ dialect.dbapi = self.dbapi
+ _pool = pool.QueuePool(
+ creator=lambda: self.dbapi.connect('foo.db'), pre_ping=pre_ping,
+ dialect=dialect)
+
+ dialect.is_disconnect = \
+ lambda e, conn, cursor: isinstance(e, MockDisconnect)
+ return _pool
+
+ def teardown(self):
+ self.dbapi.dispose()
+
+ def test_connect_across_restart(self):
+ pool = self._pool_fixture(pre_ping=True)
+
+ conn = pool.connect()
+ stale_connection = conn.connection
+ conn.close()
+
+ self.dbapi.shutdown("execute")
+ self.dbapi.restart()
+
+ conn = pool.connect()
+ cursor = conn.cursor()
+ cursor.execute("hi")
+
+ stale_cursor = stale_connection.cursor()
+ assert_raises(
+ MockDisconnect,
+ stale_cursor.execute, "hi"
+ )
+
+ def test_raise_db_is_stopped(self):
+ pool = self._pool_fixture(pre_ping=True)
+
+ conn = pool.connect()
+ conn.close()
+
+ self.dbapi.shutdown("execute", stop=True)
+
+ assert_raises_message(
+ MockDisconnect,
+ "database is stopped",
+ pool.connect
+ )
+
+ def test_waits_til_exec_wo_ping_db_is_stopped(self):
+ pool = self._pool_fixture(pre_ping=False)
+
+ conn = pool.connect()
+ conn.close()
+
+ self.dbapi.shutdown("execute", stop=True)
+
+ conn = pool.connect()
+
+ cursor = conn.cursor()
+ assert_raises_message(
+ MockDisconnect,
+ "Lost the DB connection on execute",
+ cursor.execute, "foo"
+ )
+
+ def test_waits_til_exec_wo_ping_db_is_restarted(self):
+ pool = self._pool_fixture(pre_ping=False)
+
+ conn = pool.connect()
+ conn.close()
+
+ self.dbapi.shutdown("execute", stop=True)
+ self.dbapi.restart()
+
+ conn = pool.connect()
+
+ cursor = conn.cursor()
+ assert_raises_message(
+ MockDisconnect,
+ "Lost the DB connection on execute",
+ cursor.execute, "foo"
+ )
+
+
class MockReconnectTest(fixtures.TestBase):
def setup(self):
self.dbapi = MockDBAPI()
conn.execute(select([1]))
assert not conn.invalidated
-
class CursorErrTest(fixtures.TestBase):
# this isn't really a "reconnect" test, it's more of
# a generic "recovery". maybe this test suite should have been
conn.close()
+class PrePingRealTest(fixtures.TestBase):
+ __backend__ = True
+
+ def test_pre_ping_db_is_restarted(self):
+ engine = engines.reconnecting_engine(
+ options={"pool_pre_ping": True}
+ )
+
+ conn = engine.connect()
+ eq_(conn.execute(select([1])).scalar(), 1)
+ stale_connection = conn.connection.connection
+ conn.close()
+
+ engine.test_shutdown()
+ engine.test_restart()
+
+ conn = engine.connect()
+ eq_(conn.execute(select([1])).scalar(), 1)
+ conn.close()
+
+ def exercise_stale_connection():
+ curs = stale_connection.cursor()
+ curs.execute("select 1")
+
+ assert_raises(
+ engine.dialect.dbapi.Error,
+ exercise_stale_connection
+ )
+
+ def test_pre_ping_db_stays_shutdown(self):
+ engine = engines.reconnecting_engine(
+ options={"pool_pre_ping": True}
+ )
+
+ conn = engine.connect()
+ eq_(conn.execute(select([1])).scalar(), 1)
+ conn.close()
+
+ engine.test_shutdown(stop=True)
+
+ assert_raises(
+ exc.DBAPIError,
+ engine.connect
+ )
+
+
class InvalidateDuringResultTest(fixtures.TestBase):
__backend__ = True