From: Mike Bayer Date: Thu, 4 Jul 2013 17:25:40 +0000 (-0400) Subject: - take advantage of 0.9's pool redesign a bit, adding X-Git-Tag: rel_0_8_3~107 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4a9d88b1cf1ba674f13b8e9c173aabd13b82dc36;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - take advantage of 0.9's pool redesign a bit, adding _ConnectionRecord.checkin() so that this functionality is encapsulated; is now called from two different locations for [ticket:2772]. - Fixed bug where :class:`.QueuePool` would lose the correct checked out count if an existing pooled connection failed to reconnect after an invalidate or recycle event. [ticket:2772] --- diff --git a/doc/build/changelog/changelog_08.rst b/doc/build/changelog/changelog_08.rst index 1d52a994d3..04c7f1f31a 100644 --- a/doc/build/changelog/changelog_08.rst +++ b/doc/build/changelog/changelog_08.rst @@ -3,6 +3,17 @@ 0.8 Changelog ============== +.. changelog:: + :version: 0.8.3 + + .. change:: + :tags: bug, engine, pool + :tickets: 2772 + + Fixed bug where :class:`.QueuePool` would lose the correct + checked out count if an existing pooled connection failed to reconnect + after an invalidate or recycle event. + .. changelog:: :version: 0.8.2 :released: July 3, 2013 diff --git a/lib/sqlalchemy/__init__.py b/lib/sqlalchemy/__init__.py index 21e06f5483..0c02ac2a10 100644 --- a/lib/sqlalchemy/__init__.py +++ b/lib/sqlalchemy/__init__.py @@ -120,7 +120,7 @@ from .engine import create_engine, engine_from_config __all__ = sorted(name for name, obj in locals().items() if not (name.startswith('_') or _inspect.ismodule(obj))) -__version__ = '0.8.2' +__version__ = '0.8.3' del _inspect, sys diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 8b2ba359a1..1604b03b00 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -359,6 +359,17 @@ class _ConnectionRecord(object): self.__pool.dispatch.connect(self.connection, self) return self.connection + def checkin(self): + self.fairy = None + connection = self.connection + pool = self.__pool + if self.finalize_callback: + self.finalize_callback(connection) + del self.finalize_callback + if pool.dispatch.checkin: + pool.dispatch.checkin(connection, self) + pool._return_conn(self) + def __close(self): self.__pool._close_connection(self.connection) @@ -380,6 +391,10 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo): connection_record.fairy is not ref: return + if connection_record and echo: + pool.logger.debug("Connection %r being returned to pool", + connection) + if connection is not None: try: if pool.dispatch.reset: @@ -397,17 +412,8 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo): if isinstance(e, (SystemExit, KeyboardInterrupt)): raise - if connection_record is not None: - connection_record.fairy = None - if echo: - pool.logger.debug("Connection %r being returned to pool", - connection) - if connection_record.finalize_callback: - connection_record.finalize_callback(connection) - del connection_record.finalize_callback - if pool.dispatch.checkin: - pool.dispatch.checkin(connection, connection_record) - pool._return_conn(connection_record) + if connection_record: + connection_record.checkin() _refs = set() @@ -423,7 +429,11 @@ class _ConnectionFairy(object): self._echo = _echo = pool._should_log_debug() try: rec = self._connection_record = pool._do_get() - conn = self.connection = self._connection_record.get_connection() + try: + conn = self.connection = self._connection_record.get_connection() + except: + self._connection_record.checkin() + raise rec.fairy = weakref.ref( self, lambda ref: _finalize_fairy and \ diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 7d4be30d06..37c0bfdeb5 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -16,11 +16,20 @@ def MockDBAPI(): def cursor(): while True: yield Mock() + def connect(): while True: yield Mock(cursor=Mock(side_effect=cursor())) - return Mock(connect=Mock(side_effect=connect())) + def shutdown(value): + if value: + db.connect = Mock(side_effect=Exception("connect failed")) + else: + db.connect = Mock(side_effect=connect()) + + db = Mock(connect=Mock(side_effect=connect()), + shutdown=shutdown, _shutdown=False) + return db class PoolTestBase(fixtures.TestBase): def setup(self): @@ -516,23 +525,23 @@ class DeprecatedPoolListenerTest(PoolTestBase): self.assert_((item in innerself.checked_out) == in_cout) self.assert_((item in innerself.checked_in) == in_cin) def inst_connect(self, con, record): - print "connect(%s, %s)" % (con, record) + print("connect(%s, %s)" % (con, record)) assert con is not None assert record is not None self.connected.append(con) def inst_first_connect(self, con, record): - print "first_connect(%s, %s)" % (con, record) + print("first_connect(%s, %s)" % (con, record)) assert con is not None assert record is not None self.first_connected.append(con) def inst_checkout(self, con, record, proxy): - print "checkout(%s, %s, %s)" % (con, record, proxy) + print("checkout(%s, %s, %s)" % (con, record, proxy)) assert con is not None assert record is not None assert proxy is not None self.checked_out.append(con) def inst_checkin(self, con, record): - print "checkin(%s, %s)" % (con, record) + print("checkin(%s, %s)" % (con, record)) # con can be None if invalidated assert record is not None self.checked_in.append(con) @@ -729,8 +738,8 @@ class QueuePoolTest(PoolTestBase): def status(pool): tup = pool.size(), pool.checkedin(), pool.overflow(), \ pool.checkedout() - print 'Pool size: %d Connections in pool: %d Current '\ - 'Overflow: %d Current Checked out connections: %d' % tup + print('Pool size: %d Connections in pool: %d Current '\ + 'Overflow: %d Current Checked out connections: %d' % tup) return tup c1 = p.connect() @@ -804,7 +813,7 @@ class QueuePoolTest(PoolTestBase): max_overflow=1, use_threadlocal=False, timeout=3) timeouts = [] def checkout(): - for x in xrange(1): + for x in range(1): now = time.time() try: c1 = p.connect() @@ -815,7 +824,7 @@ class QueuePoolTest(PoolTestBase): c1.close() threads = [] - for i in xrange(10): + for i in range(10): th = threading.Thread(target=checkout) th.start() threads.append(th) @@ -852,7 +861,7 @@ class QueuePoolTest(PoolTestBase): except tsa.exc.TimeoutError: pass threads = [] - for i in xrange(thread_count): + for i in range(thread_count): th = threading.Thread(target=whammy) th.start() threads.append(th) @@ -881,9 +890,10 @@ class QueuePoolTest(PoolTestBase): pool_size=2, timeout=timeout, max_overflow=max_overflow) def waiter(p): + success_key = (timeout, max_overflow) conn = p.connect() time.sleep(.5) - success.append(True) + success.append(success_key) conn.close() time.sleep(.2) @@ -898,8 +908,8 @@ class QueuePoolTest(PoolTestBase): c1.invalidate() c2.invalidate() p2 = p._replace() - time.sleep(2) - eq_(len(success), 12) + time.sleep(1) + eq_(len(success), 12, "successes: %s" % success) @testing.requires.threading_with_mock @testing.requires.python26 @@ -1074,6 +1084,46 @@ class QueuePoolTest(PoolTestBase): c3 = p.connect() assert id(c3.connection) != c_id + def _assert_cleanup_on_pooled_reconnect(self, dbapi, p): + # p is QueuePool with size=1, max_overflow=2, + # and one connection in the pool that will need to + # reconnect when next used (either due to recycle or invalidate) + eq_(p.checkedout(), 0) + eq_(p._overflow, 0) + dbapi.shutdown(True) + assert_raises( + Exception, + p.connect + ) + eq_(p._overflow, 0) + eq_(p.checkedout(), 0) # and not 1 + + dbapi.shutdown(False) + + c1 = p.connect() + assert p._pool.empty() # poolsize is one, so we're empty OK + c2 = p.connect() + eq_(p._overflow, 1) # and not 2 + + # this hangs if p._overflow is 2 + c3 = p.connect() + + def test_error_on_pooled_reconnect_cleanup_invalidate(self): + dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=2) + c1 = p.connect() + c1.invalidate() + c1.close() + self._assert_cleanup_on_pooled_reconnect(dbapi, p) + + def test_error_on_pooled_reconnect_cleanup_recycle(self): + dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, + max_overflow=2, recycle=1) + c1 = p.connect() + c1.close() + time.sleep(1) + self._assert_cleanup_on_pooled_reconnect(dbapi, p) + + def test_invalidate(self): p = self._queuepool_fixture(pool_size=1, max_overflow=0) c1 = p.connect() @@ -1183,7 +1233,7 @@ class SingletonThreadPoolTest(PoolTestBase): return p.connect() def checkout(): - for x in xrange(10): + for x in range(10): c = _conn() assert c c.cursor() @@ -1191,7 +1241,7 @@ class SingletonThreadPoolTest(PoolTestBase): time.sleep(.1) threads = [] - for i in xrange(10): + for i in range(10): th = threading.Thread(target=checkout) th.start() threads.append(th)