]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- take advantage of 0.9's pool redesign a bit, adding
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 4 Jul 2013 17:25:40 +0000 (13:25 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 4 Jul 2013 17:25:40 +0000 (13:25 -0400)
_ConnectionRecord.checkin() so that this functionality is
encapsulated; is now called from two different locations
for [ticket:2772].
- Fixed bug where :class:`.QueuePool` would lose the correct
checked out count if an existing pooled connection failed to reconnect
after an invalidate or recycle event. [ticket:2772]

doc/build/changelog/changelog_08.rst
lib/sqlalchemy/__init__.py
lib/sqlalchemy/pool.py
test/engine/test_pool.py

index 1d52a994d3fe779cac2dc3812fe63daaf7cc35f8..04c7f1f31a6f44522ba27e40cd3a2bb8cd98e938 100644 (file)
@@ -3,6 +3,17 @@
 0.8 Changelog
 ==============
 
+.. changelog::
+    :version: 0.8.3
+
+    .. change::
+        :tags: bug, engine, pool
+        :tickets: 2772
+
+        Fixed bug where :class:`.QueuePool` would lose the correct
+        checked out count if an existing pooled connection failed to reconnect
+        after an invalidate or recycle event.
+
 .. changelog::
     :version: 0.8.2
     :released: July 3, 2013
index 21e06f5483d4caed094e814e93a5044fc501eacf..0c02ac2a10fc11c76aa639e74975d580e46d6542 100644 (file)
@@ -120,7 +120,7 @@ from .engine import create_engine, engine_from_config
 __all__ = sorted(name for name, obj in locals().items()
                  if not (name.startswith('_') or _inspect.ismodule(obj)))
 
-__version__ = '0.8.2'
+__version__ = '0.8.3'
 
 del _inspect, sys
 
index 8b2ba359a10b25fb33859001810dd7f0d348b9f3..1604b03b00c43575a6ae5e369aeed10db1a7c883 100644 (file)
@@ -359,6 +359,17 @@ class _ConnectionRecord(object):
                 self.__pool.dispatch.connect(self.connection, self)
         return self.connection
 
+    def checkin(self):
+        self.fairy = None
+        connection = self.connection
+        pool = self.__pool
+        if self.finalize_callback:
+            self.finalize_callback(connection)
+            del self.finalize_callback
+        if pool.dispatch.checkin:
+            pool.dispatch.checkin(connection, self)
+        pool._return_conn(self)
+
     def __close(self):
         self.__pool._close_connection(self.connection)
 
@@ -380,6 +391,10 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo):
                 connection_record.fairy is not ref:
         return
 
+    if connection_record and echo:
+        pool.logger.debug("Connection %r being returned to pool",
+                                connection)
+
     if connection is not None:
         try:
             if pool.dispatch.reset:
@@ -397,17 +412,8 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo):
             if isinstance(e, (SystemExit, KeyboardInterrupt)):
                 raise
 
-    if connection_record is not None:
-        connection_record.fairy = None
-        if echo:
-            pool.logger.debug("Connection %r being returned to pool",
-                                    connection)
-        if connection_record.finalize_callback:
-            connection_record.finalize_callback(connection)
-            del connection_record.finalize_callback
-        if pool.dispatch.checkin:
-            pool.dispatch.checkin(connection, connection_record)
-        pool._return_conn(connection_record)
+    if connection_record:
+        connection_record.checkin()
 
 
 _refs = set()
@@ -423,7 +429,11 @@ class _ConnectionFairy(object):
         self._echo = _echo = pool._should_log_debug()
         try:
             rec = self._connection_record = pool._do_get()
-            conn = self.connection = self._connection_record.get_connection()
+            try:
+                conn = self.connection = self._connection_record.get_connection()
+            except:
+                self._connection_record.checkin()
+                raise
             rec.fairy = weakref.ref(
                             self,
                             lambda ref: _finalize_fairy and \
index 7d4be30d062375bc0906436e8a0d01e8f1cd0725..37c0bfdeb5b089b52d3a9519a1f0b750c6b617ea 100644 (file)
@@ -16,11 +16,20 @@ def MockDBAPI():
     def cursor():
         while True:
             yield Mock()
+
     def connect():
         while True:
             yield Mock(cursor=Mock(side_effect=cursor()))
 
-    return Mock(connect=Mock(side_effect=connect()))
+    def shutdown(value):
+        if value:
+            db.connect = Mock(side_effect=Exception("connect failed"))
+        else:
+            db.connect = Mock(side_effect=connect())
+
+    db = Mock(connect=Mock(side_effect=connect()),
+                    shutdown=shutdown, _shutdown=False)
+    return db
 
 class PoolTestBase(fixtures.TestBase):
     def setup(self):
@@ -516,23 +525,23 @@ class DeprecatedPoolListenerTest(PoolTestBase):
                 self.assert_((item in innerself.checked_out) == in_cout)
                 self.assert_((item in innerself.checked_in) == in_cin)
             def inst_connect(self, con, record):
-                print "connect(%s, %s)" % (con, record)
+                print("connect(%s, %s)" % (con, record))
                 assert con is not None
                 assert record is not None
                 self.connected.append(con)
             def inst_first_connect(self, con, record):
-                print "first_connect(%s, %s)" % (con, record)
+                print("first_connect(%s, %s)" % (con, record))
                 assert con is not None
                 assert record is not None
                 self.first_connected.append(con)
             def inst_checkout(self, con, record, proxy):
-                print "checkout(%s, %s, %s)" % (con, record, proxy)
+                print("checkout(%s, %s, %s)" % (con, record, proxy))
                 assert con is not None
                 assert record is not None
                 assert proxy is not None
                 self.checked_out.append(con)
             def inst_checkin(self, con, record):
-                print "checkin(%s, %s)" % (con, record)
+                print("checkin(%s, %s)" % (con, record))
                 # con can be None if invalidated
                 assert record is not None
                 self.checked_in.append(con)
@@ -729,8 +738,8 @@ class QueuePoolTest(PoolTestBase):
         def status(pool):
             tup = pool.size(), pool.checkedin(), pool.overflow(), \
                 pool.checkedout()
-            print 'Pool size: %d  Connections in pool: %d Current '\
-                'Overflow: %d Current Checked out connections: %d' % tup
+            print('Pool size: %d  Connections in pool: %d Current '\
+                'Overflow: %d Current Checked out connections: %d' % tup)
             return tup
 
         c1 = p.connect()
@@ -804,7 +813,7 @@ class QueuePoolTest(PoolTestBase):
                 max_overflow=1, use_threadlocal=False, timeout=3)
         timeouts = []
         def checkout():
-            for x in xrange(1):
+            for x in range(1):
                 now = time.time()
                 try:
                     c1 = p.connect()
@@ -815,7 +824,7 @@ class QueuePoolTest(PoolTestBase):
                 c1.close()
 
         threads = []
-        for i in xrange(10):
+        for i in range(10):
             th = threading.Thread(target=checkout)
             th.start()
             threads.append(th)
@@ -852,7 +861,7 @@ class QueuePoolTest(PoolTestBase):
                 except tsa.exc.TimeoutError:
                     pass
         threads = []
-        for i in xrange(thread_count):
+        for i in range(thread_count):
             th = threading.Thread(target=whammy)
             th.start()
             threads.append(th)
@@ -881,9 +890,10 @@ class QueuePoolTest(PoolTestBase):
                                    pool_size=2, timeout=timeout,
                                    max_overflow=max_overflow)
                 def waiter(p):
+                    success_key = (timeout, max_overflow)
                     conn = p.connect()
                     time.sleep(.5)
-                    success.append(True)
+                    success.append(success_key)
                     conn.close()
 
                 time.sleep(.2)
@@ -898,8 +908,8 @@ class QueuePoolTest(PoolTestBase):
                 c1.invalidate()
                 c2.invalidate()
                 p2 = p._replace()
-        time.sleep(2)
-        eq_(len(success), 12)
+        time.sleep(1)
+        eq_(len(success), 12, "successes: %s" % success)
 
     @testing.requires.threading_with_mock
     @testing.requires.python26
@@ -1074,6 +1084,46 @@ class QueuePoolTest(PoolTestBase):
         c3 = p.connect()
         assert id(c3.connection) != c_id
 
+    def _assert_cleanup_on_pooled_reconnect(self, dbapi, p):
+        # p is QueuePool with size=1, max_overflow=2,
+        # and one connection in the pool that will need to
+        # reconnect when next used (either due to recycle or invalidate)
+        eq_(p.checkedout(), 0)
+        eq_(p._overflow, 0)
+        dbapi.shutdown(True)
+        assert_raises(
+            Exception,
+            p.connect
+        )
+        eq_(p._overflow, 0)
+        eq_(p.checkedout(), 0)  # and not 1
+
+        dbapi.shutdown(False)
+
+        c1 = p.connect()
+        assert p._pool.empty()  # poolsize is one, so we're empty OK
+        c2 = p.connect()
+        eq_(p._overflow, 1)  # and not 2
+
+        # this hangs if p._overflow is 2
+        c3 = p.connect()
+
+    def test_error_on_pooled_reconnect_cleanup_invalidate(self):
+        dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=2)
+        c1 = p.connect()
+        c1.invalidate()
+        c1.close()
+        self._assert_cleanup_on_pooled_reconnect(dbapi, p)
+
+    def test_error_on_pooled_reconnect_cleanup_recycle(self):
+        dbapi, p = self._queuepool_dbapi_fixture(pool_size=1,
+                                        max_overflow=2, recycle=1)
+        c1 = p.connect()
+        c1.close()
+        time.sleep(1)
+        self._assert_cleanup_on_pooled_reconnect(dbapi, p)
+
+
     def test_invalidate(self):
         p = self._queuepool_fixture(pool_size=1, max_overflow=0)
         c1 = p.connect()
@@ -1183,7 +1233,7 @@ class SingletonThreadPoolTest(PoolTestBase):
                 return p.connect()
 
         def checkout():
-            for x in xrange(10):
+            for x in range(10):
                 c = _conn()
                 assert c
                 c.cursor()
@@ -1191,7 +1241,7 @@ class SingletonThreadPoolTest(PoolTestBase):
                 time.sleep(.1)
 
         threads = []
-        for i in xrange(10):
+        for i in range(10):
             th = threading.Thread(target=checkout)
             th.start()
             threads.append(th)