illustrating the original failure cause, while still throwing the
immediate error which is the failure of the ROLLBACK.
+.. _faq_execute_retry:
+
+How Do I "Retry" a Statement Execution Automatically?
+-------------------------------------------------------
+
+The documentation section :ref:`pool_disconnects` discusses the strategies
+available for pooled connections that have been disconnected since the last
+time a particular connection was checked out. The most modern feature
+in this regard is the :paramref:`_sa.create_engine.pre_ping` parameter, which
+allows that a "ping" is emitted on a database connection when it's retrieved
+from the pool, reconnecting if the current connection has been disconnected.
+
+It's important to note that this "ping" is only emitted **before** the
+connection is actually used for an operation. Once the connection is
+delivered to the caller, per the Python :term:`DBAPI` specification it is now
+subject to an **autobegin** operation, which means it will automatically BEGIN
+a new transaction when it is first used that remains in effect for subsequent
+statements, until the DBAPI-level ``connection.commit()`` or
+``connection.rollback()`` method is invoked.
+
+As discussed at :ref:`autocommit`, there is a library level "autocommit"
+feature which is deprecated in 1.4 that causes :term:`DML` and :term:`DDL`
+executions to commit automatically after individual statements are executed;
+however, outside of this deprecated case, modern use of SQLAlchemy works with
+this transaction in all cases and does not commit any data unless explicitly
+told to commit.
+
+At the ORM level, a similar situation where the ORM
+:class:`_orm.Session` object also presents a legacy "autocommit" operation is
+present; however even if this legacy mode of operation is used, the
+:class:`_orm.Session` still makes use of transactions internally,
+particularly within the :meth:`_orm.Session.flush` process.
+
+The implication that this has for the notion of "retrying" a statement is that
+in the default case, when a connection is lost, **the entire transaction is
+lost**. There is no useful way that the database can "reconnect and retry" and
+continue where it left off, since data is already lost. For this reason,
+SQLAlchemy does not have a transparent "reconnection" feature that works
+mid-transaction, for the case when the database connection has disconnected
+while being used. The canonical approach to dealing with mid-operation
+disconnects is to **retry the entire operation from the start of the
+transaction**, often by using a Python "retry" decorator, or to otherwise
+architect the application in such a way that it is resilient against
+transactions that are dropped.
+
+There is also the notion of extensions that can keep track of all of the
+statements that have proceeded within a transaction and then replay them all in
+a new transaction in order to approximate a "retry" operation. SQLAlchemy's
+:ref:`event system <core_event_toplevel>` does allow such a system to be
+constructed, however this approach is also not generally useful as there is
+no way to guarantee that those
+:term:`DML` statements will be working against the same state, as once a
+transaction has ended the state of the database in a new transaction may be
+totally different. Architecting "retry" explicitly into the application
+at the points at which transactional operations begin and commit remains
+the better approach since the application-level transactional methods are
+the ones that know best how to re-run their steps.
+
+Otherwise, if SQLAlchemy were to provide a feature that transparently and
+silently "reconnected" a connection mid-transaction, the effect would be that
+data is silently lost. By trying to hide the problem, SQLAlchemy would make
+the situation much worse.
+
+However, if we are **not** using transactions, then there are more options
+available, as the next section describes.
+
+.. _faq_execute_retry_autocommit:
+
+Using DBAPI Autocommit Allows for a Readonly Version of Transparent Reconnect
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+With the rationale for not having a transparent reconnection mechanism stated,
+the preceding section rests upon the assumption that the application is in
+fact using DBAPI-level transactions. As most DBAPIs now offer :ref:`native
+"autocommit" settings <dbapi_autocommit>`, we can make use of these features to
+provide a limited form of transparent reconnect for **read only,
+autocommit only operations**. A transparent statement retry may be applied to
+the ``cursor.execute()`` method of the DBAPI, however it is still not safe to
+apply to the ``cursor.executemany()`` method of the DBAPI, as the statement may
+have consumed any portion of the arguments given.
+
+.. warning:: The following recipe should **not** be used for operations that
+ write data. Users should carefully read and understand how the recipe
+ works and test failure modes very carefully against the specifically
+ targeted DBAPI driver before making production use of this recipe.
+ The retry mechanism does not guarantee prevention of disconnection errors
+ in all cases.
+
+A simple retry mechanism may be applied to the DBAPI level ``cursor.execute()``
+method by making use of the :meth:`_events.DialectEvents.do_execute` and
+:meth:`_events.DialectEvents.do_execute_no_params` hooks, which will be able to
+intercept disconnections during statement executions. It will **not**
+intercept connection failures during result set fetch operations, for those
+DBAPIs that don't fully buffer result sets. The recipe requires that the
+database support DBAPI level autocommit and is **not guaranteed** for
+particular backends. A single function ``reconnecting_engine()`` is presented
+which applies the event hooks to a given :class:`_engine.Engine` object,
+returning an always-autocommit version that enables DBAPI-level autocommit.
+A connection will transparently reconnect for single-parameter and no-parameter
+statement executions::
+
+
+ import time
+
+ from sqlalchemy import event
+
+
+ def reconnecting_engine(engine, num_retries, retry_interval):
+ def _run_with_retries(fn, context, cursor, statement, *arg, **kw):
+ for retry in range(num_retries + 1):
+ try:
+ fn(cursor, statement, context=context, *arg)
+ except engine.dialect.dbapi.Error as raw_dbapi_err:
+ connection = context.root_connection
+ if engine.dialect.is_disconnect(
+ raw_dbapi_err, connection, cursor
+ ):
+ if retry > num_retries:
+ raise
+ engine.logger.error(
+ "disconnection error, retrying operation",
+ exc_info=True,
+ )
+ connection.invalidate()
+
+ # use SQLAlchemy 2.0 API if available
+ if hasattr(connection, "rollback"):
+ connection.rollback()
+ else:
+ trans = connection.get_transaction()
+ if trans:
+ trans.rollback()
+
+ time.sleep(retry_interval)
+ context.cursor = cursor = connection.connection.cursor()
+ else:
+ raise
+ else:
+ return True
+
+ e = engine.execution_options(isolation_level="AUTOCOMMIT")
+
+ @event.listens_for(e, "do_execute_no_params")
+ def do_execute_no_params(cursor, statement, context):
+ return _run_with_retries(
+ context.dialect.do_execute_no_params, context, cursor, statement
+ )
+
+ @event.listens_for(e, "do_execute")
+ def do_execute(cursor, statement, parameters, context):
+ return _run_with_retries(
+ context.dialect.do_execute, context, cursor, statement, parameters
+ )
+
+ return e
+
+Given the above recipe, a reconnection mid-transaction may be demonstrated
+using the following proof of concept script. Once run, it will emit a
+``SELECT 1`` statement to the database every five seconds::
+
+ from sqlalchemy import create_engine
+ from sqlalchemy import select
+
+ if __name__ == "__main__":
+
+ engine = create_engine("mysql://scott:tiger@localhost/test", echo_pool=True)
+
+ def do_a_thing(engine):
+ with engine.begin() as conn:
+ while True:
+ print("ping: %s" % conn.execute(select([1])).scalar())
+ time.sleep(5)
+
+ e = reconnecting_engine(
+ create_engine(
+ "mysql://scott:tiger@localhost/test", echo_pool=True
+ ),
+ num_retries=5,
+ retry_interval=2,
+ )
+
+ do_a_thing(e)
+
+Restart the database while the script runs to demonstrate the transparent
+reconnect operation::
+
+ $ python reconnect_test.py
+ ping: 1
+ ping: 1
+ disconnection error, retrying operation
+ Traceback (most recent call last):
+ ...
+ MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
+ 2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
+ ping: 1
+ ping: 1
+ ...
+
+.. versionadded: 1.4 the above recipe makes use of 1.4-specific behaviors and will
+ not work as given on previous SQLAlchemy versions.
+
+The above recipe is tested for SQLAlchemy 1.4.
+
+
Why does SQLAlchemy issue so many ROLLBACKs?
--------------------------------------------
assert conn.invalidated
finally:
conn.invalidate()
+
+
+class ReconnectRecipeTest(fixtures.TestBase):
+ """Test for the reconnect recipe given at doc/build/faq/connections.rst.
+
+ Make sure the above document is updated if changes are made here.
+
+ """
+
+ # this recipe works on PostgreSQL also but only if the connection
+ # is cut off from the server side, otherwise the connection.cursor()
+ # method rightly fails because we explicitly closed the connection.
+ # since we don't have a fixture
+ # that can do this we currently rely on the MySQL drivers that allow
+ # us to call cursor() even when the connection were closed. In order
+ # to get a real "cut the server off" kind of fixture we'd need to do
+ # something in provisioning that seeks out the TCP connection at the
+ # OS level and kills it.
+ __only_on__ = ("mysql+mysqldb", "mysql+pymysql")
+
+ future = False
+
+ def make_engine(self, engine):
+ num_retries = 3
+ retry_interval = 0.5
+
+ def _run_with_retries(fn, context, cursor, statement, *arg, **kw):
+ for retry in range(num_retries + 1):
+ try:
+ fn(cursor, statement, context=context, *arg)
+ except engine.dialect.dbapi.Error as raw_dbapi_err:
+ connection = context.root_connection
+ if engine.dialect.is_disconnect(
+ raw_dbapi_err, connection, cursor
+ ):
+ if retry > num_retries:
+ raise
+ engine.logger.error(
+ "disconnection error, retrying operation",
+ exc_info=True,
+ )
+ connection.invalidate()
+
+ if self.future:
+ connection.rollback()
+ else:
+ trans = connection.get_transaction()
+ if trans:
+ trans.rollback()
+
+ time.sleep(retry_interval)
+ context.cursor = (
+ cursor
+ ) = connection.connection.cursor()
+ else:
+ raise
+ else:
+ return True
+
+ e = engine.execution_options(isolation_level="AUTOCOMMIT")
+
+ @event.listens_for(e, "do_execute_no_params")
+ def do_execute_no_params(cursor, statement, context):
+ return _run_with_retries(
+ context.dialect.do_execute_no_params,
+ context,
+ cursor,
+ statement,
+ )
+
+ @event.listens_for(e, "do_execute")
+ def do_execute(cursor, statement, parameters, context):
+ return _run_with_retries(
+ context.dialect.do_execute,
+ context,
+ cursor,
+ statement,
+ parameters,
+ )
+
+ return e
+
+ __backend__ = True
+
+ def setup(self):
+ self.engine = engines.reconnecting_engine(
+ options=dict(future=self.future)
+ )
+ self.meta = MetaData()
+ self.table = Table(
+ "sometable",
+ self.meta,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(50)),
+ )
+ self.meta.create_all(self.engine)
+
+ def teardown(self):
+ self.meta.drop_all(self.engine)
+ self.engine.dispose()
+
+ def test_restart_on_execute_no_txn(self):
+ engine = self.make_engine(self.engine)
+
+ with engine.connect() as conn:
+ eq_(conn.execute(select(1)).scalar(), 1)
+
+ self.engine.test_shutdown()
+ self.engine.test_restart()
+
+ eq_(conn.execute(select(1)).scalar(), 1)
+
+ def test_restart_on_execute_txn(self):
+ engine = self.make_engine(self.engine)
+
+ with engine.begin() as conn:
+ eq_(conn.execute(select(1)).scalar(), 1)
+
+ self.engine.test_shutdown()
+ self.engine.test_restart()
+
+ eq_(conn.execute(select(1)).scalar(), 1)
+
+ def test_autocommits_txn(self):
+ engine = self.make_engine(self.engine)
+
+ with engine.begin() as conn:
+ conn.execute(
+ self.table.insert(),
+ [
+ {"id": 1, "name": "some name 1"},
+ {"id": 2, "name": "some name 2"},
+ {"id": 3, "name": "some name 3"},
+ ],
+ )
+
+ self.engine.test_shutdown()
+ self.engine.test_restart()
+
+ eq_(
+ conn.execute(
+ select(self.table).order_by(self.table.c.id)
+ ).fetchall(),
+ [(1, "some name 1"), (2, "some name 2"), (3, "some name 3")],
+ )
+
+ def test_fail_on_executemany_txn(self):
+ engine = self.make_engine(self.engine)
+
+ with engine.begin() as conn:
+ conn.execute(
+ self.table.insert(),
+ [
+ {"id": 1, "name": "some name 1"},
+ {"id": 2, "name": "some name 2"},
+ {"id": 3, "name": "some name 3"},
+ ],
+ )
+
+ self.engine.test_shutdown()
+ self.engine.test_restart()
+
+ assert_raises(
+ exc.DBAPIError,
+ conn.execute,
+ self.table.insert(),
+ [
+ {"id": 4, "name": "some name 4"},
+ {"id": 5, "name": "some name 5"},
+ {"id": 6, "name": "some name 6"},
+ ],
+ )
+ if self.future:
+ conn.rollback()
+ else:
+ trans = conn.get_transaction()
+ trans.rollback()
+
+
+class FutureReconnectRecipeTest(ReconnectRecipeTest):
+ future = True