From: Mike Bayer Date: Mon, 12 Dec 2022 18:47:27 +0000 (-0500) Subject: catch all BaseException in pool and revert failed checkouts X-Git-Tag: rel_1_4_46~20^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=225eff40f28e943c80dbe8cd2fcd7322e1bdb816;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git catch all BaseException in pool and revert failed checkouts Fixed a long-standing race condition in the connection pool which could occur under eventlet/gevent monkeypatching schemes in conjunction with the use of eventlet/gevent ``Timeout`` conditions, where a connection pool checkout that's interrupted due to the timeout would fail to clean up the failed state, causing the underlying connection record and sometimes the database connection itself to "leak", leaving the pool in an invalid state with unreachable entries. This issue was first identified and fixed in SQLAlchemy 1.2 for :ticket:`4225`, however the failure modes detected in that fix failed to accommodate for ``BaseException``, rather than ``Exception``, which prevented eventlet/gevent ``Timeout`` from being caught. In addition, a block within initial pool connect has also been identified and hardened with a ``BaseException`` -> "clean failed connect" block to accommodate for the same condition in this location. Big thanks to Github user @niklaus for their tenacious efforts in identifying and describing this intricate issue. Fixes: #8974 Change-Id: I95a0e1f080d0cee6f1a66977432a586fdf87f686 (cherry picked from commit a71917204dcf12a93d957a0fa29c9df97d0411ee) --- diff --git a/doc/build/changelog/unreleased_14/8974.rst b/doc/build/changelog/unreleased_14/8974.rst new file mode 100644 index 0000000000..6400c95b45 --- /dev/null +++ b/doc/build/changelog/unreleased_14/8974.rst @@ -0,0 +1,19 @@ +.. change:: + :tags: bug, pool + :tickets: 8974 + + Fixed a long-standing race condition in the connection pool which could + occur under eventlet/gevent monkeypatching schemes in conjunction with the + use of eventlet/gevent ``Timeout`` conditions, where a connection pool + checkout that's interrupted due to the timeout would fail to clean up the + failed state, causing the underlying connection record and sometimes the + database connection itself to "leak", leaving the pool in an invalid state + with unreachable entries. This issue was first identified and fixed in + SQLAlchemy 1.2 for :ticket:`4225`, however the failure modes detected in + that fix failed to accommodate for ``BaseException``, rather than + ``Exception``, which prevented eventlet/gevent ``Timeout`` from being + caught. In addition, a block within initial pool connect has also been + identified and hardened with a ``BaseException`` -> "clean failed connect" + block to accommodate for the same condition in this location. + Big thanks to Github user @niklaus for their tenacious efforts in + identifying and describing this intricate issue. diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index a8234c5309..dbffd54b85 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -260,10 +260,12 @@ class Pool(log.Identified): self._dialect.do_terminate(connection) else: self._dialect.do_close(connection) - except Exception: + except BaseException as e: self.logger.error( "Exception closing connection %r", connection, exc_info=True ) + if not isinstance(e, Exception): + raise def _create_connection(self): """Called by subclasses to create a new ConnectionRecord.""" @@ -491,9 +493,13 @@ class _ConnectionRecord(object): rec = pool._do_get() try: dbapi_connection = rec.get_connection() - except Exception as err: + except BaseException as err: with util.safe_reraise(): rec._checkin_failed(err, _fairy_was_created=False) + + # never called, this is for code linters + raise + echo = pool._should_log_debug() fairy = _ConnectionFairy(dbapi_connection, rec, echo) @@ -680,7 +686,7 @@ class _ConnectionRecord(object): self.dbapi_connection = connection = pool._invoke_creator(self) pool.logger.debug("Created new connection %r", connection) self.fresh = True - except Exception as e: + except BaseException as e: with util.safe_reraise(): pool.logger.debug("Error on connect(): %s", e) else: @@ -907,6 +913,7 @@ class _ConnectionFairy(object): # is not accessible from a connection standpoint, those won't proceed # here. attempts = 2 + while attempts > 0: connection_is_fresh = fairy._connection_record.fresh fairy._connection_record.fresh = False @@ -959,7 +966,7 @@ class _ConnectionFairy(object): fairy.dbapi_connection = ( fairy._connection_record.get_connection() ) - except Exception as err: + except BaseException as err: with util.safe_reraise(): fairy._connection_record._checkin_failed( err, @@ -974,6 +981,21 @@ class _ConnectionFairy(object): del fairy attempts -= 1 + except BaseException as be_outer: + with util.safe_reraise(): + rec = fairy._connection_record + if rec is not None: + rec._checkin_failed( + be_outer, + _fairy_was_created=True, + ) + + # prevent _ConnectionFairy from being carried + # in the stack trace, see above + del fairy + + # never called, this is for code linters + raise pool.logger.info("Reconnection attempts exhausted on checkout") fairy.invalidate() diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 7a3b8ed58d..2e11002efc 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -858,18 +858,34 @@ class PoolEventsTest(PoolTestBase): p2.connect() eq_(canary, ["listen_one", "listen_two", "listen_one", "listen_three"]) - def test_connect_event_fails_invalidates(self): + @testing.variation("exc_type", ["plain", "base_exception"]) + def test_connect_event_fails_invalidates(self, exc_type): fail = False + if exc_type.plain: + + class RegularThing(Exception): + pass + + exc_cls = RegularThing + elif exc_type.base_exception: + + class TimeoutThing(BaseException): + pass + + exc_cls = TimeoutThing + else: + exc_type.fail() + def listen_one(conn, rec): if fail: - raise Exception("it failed") + raise exc_cls("it failed") def listen_two(conn, rec): rec.info["important_flag"] = True p1 = pool.QueuePool( - creator=MockDBAPI().connect, pool_size=1, max_overflow=0 + creator=MockDBAPI().connect, pool_size=1, max_overflow=0, timeout=5 ) event.listen(p1, "connect", listen_one) event.listen(p1, "connect", listen_two) @@ -880,7 +896,9 @@ class PoolEventsTest(PoolTestBase): conn.close() fail = True - assert_raises(Exception, p1.connect) + + # if the failed checkin is not reverted, the pool is blocked + assert_raises(exc_cls, p1.connect) fail = False @@ -1506,7 +1524,7 @@ class QueuePoolTest(PoolTestBase): return patch.object(pool, "_finalize_fairy", assert_no_wr_callback) - def _assert_cleanup_on_pooled_reconnect(self, dbapi, p): + def _assert_cleanup_on_pooled_reconnect(self, dbapi, p, exc_cls=Exception): # p is QueuePool with size=1, max_overflow=2, # and one connection in the pool that will need to # reconnect when next used (either due to recycle or invalidate) @@ -1515,7 +1533,7 @@ class QueuePoolTest(PoolTestBase): eq_(p.checkedout(), 0) eq_(p._overflow, 0) dbapi.shutdown(True) - assert_raises_context_ok(Exception, p.connect) + assert_raises_context_ok(exc_cls, p.connect) eq_(p._overflow, 0) eq_(p.checkedout(), 0) # and not 1 @@ -1633,18 +1651,38 @@ class QueuePoolTest(PoolTestBase): c = p.connect() c.close() - def test_error_on_pooled_reconnect_cleanup_wcheckout_event(self): + @testing.variation("exc_type", ["plain", "base_exception"]) + def test_error_on_pooled_reconnect_cleanup_wcheckout_event(self, exc_type): dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=2) c1 = p.connect() c1.close() - @event.listens_for(p, "checkout") - def handle_checkout_event(dbapi_con, con_record, con_proxy): - if dbapi.is_shutdown: - raise tsa.exc.DisconnectionError() + if exc_type.plain: - self._assert_cleanup_on_pooled_reconnect(dbapi, p) + @event.listens_for(p, "checkout") + def handle_checkout_event(dbapi_con, con_record, con_proxy): + if dbapi.is_shutdown: + raise tsa.exc.DisconnectionError() + + elif exc_type.base_exception: + + class TimeoutThing(BaseException): + pass + + @event.listens_for(p, "checkout") + def handle_checkout_event(dbapi_con, con_record, con_proxy): + if dbapi.is_shutdown: + raise TimeoutThing() + + else: + exc_type.fail() + + self._assert_cleanup_on_pooled_reconnect( + dbapi, + p, + exc_cls=TimeoutThing if exc_type.base_exception else Exception, + ) @testing.combinations((True, testing.requires.python3), (False,)) def test_userspace_disconnectionerror_weakref_finalizer(self, detach_gced):