self.__pool.dispatch.connect(self.connection, self)
return self.connection
+ def checkin(self):
+ self.fairy = None
+ connection = self.connection
+ pool = self.__pool
+ if self.finalize_callback:
+ self.finalize_callback(connection)
+ del self.finalize_callback
+ if pool.dispatch.checkin:
+ pool.dispatch.checkin(connection, self)
+ pool._return_conn(self)
+
def __close(self):
self.__pool._close_connection(self.connection)
connection_record.fairy is not ref:
return
+ if connection_record and echo:
+ pool.logger.debug("Connection %r being returned to pool",
+ connection)
+
if connection is not None:
try:
if pool.dispatch.reset:
if isinstance(e, (SystemExit, KeyboardInterrupt)):
raise
- if connection_record is not None:
- connection_record.fairy = None
- if echo:
- pool.logger.debug("Connection %r being returned to pool",
- connection)
- if connection_record.finalize_callback:
- connection_record.finalize_callback(connection)
- del connection_record.finalize_callback
- if pool.dispatch.checkin:
- pool.dispatch.checkin(connection, connection_record)
- pool._return_conn(connection_record)
+ if connection_record:
+ connection_record.checkin()
_refs = set()
self._echo = _echo = pool._should_log_debug()
try:
rec = self._connection_record = pool._do_get()
- conn = self.connection = self._connection_record.get_connection()
+ try:
+ conn = self.connection = self._connection_record.get_connection()
+ except:
+ self._connection_record.checkin()
+ raise
rec.fairy = weakref.ref(
self,
lambda ref: _finalize_fairy and \
def cursor():
while True:
yield Mock()
+
def connect():
while True:
yield Mock(cursor=Mock(side_effect=cursor()))
- return Mock(connect=Mock(side_effect=connect()))
+ def shutdown(value):
+ if value:
+ db.connect = Mock(side_effect=Exception("connect failed"))
+ else:
+ db.connect = Mock(side_effect=connect())
+
+ db = Mock(connect=Mock(side_effect=connect()),
+ shutdown=shutdown, _shutdown=False)
+ return db
class PoolTestBase(fixtures.TestBase):
def setup(self):
self.assert_((item in innerself.checked_out) == in_cout)
self.assert_((item in innerself.checked_in) == in_cin)
def inst_connect(self, con, record):
- print "connect(%s, %s)" % (con, record)
+ print("connect(%s, %s)" % (con, record))
assert con is not None
assert record is not None
self.connected.append(con)
def inst_first_connect(self, con, record):
- print "first_connect(%s, %s)" % (con, record)
+ print("first_connect(%s, %s)" % (con, record))
assert con is not None
assert record is not None
self.first_connected.append(con)
def inst_checkout(self, con, record, proxy):
- print "checkout(%s, %s, %s)" % (con, record, proxy)
+ print("checkout(%s, %s, %s)" % (con, record, proxy))
assert con is not None
assert record is not None
assert proxy is not None
self.checked_out.append(con)
def inst_checkin(self, con, record):
- print "checkin(%s, %s)" % (con, record)
+ print("checkin(%s, %s)" % (con, record))
# con can be None if invalidated
assert record is not None
self.checked_in.append(con)
def status(pool):
tup = pool.size(), pool.checkedin(), pool.overflow(), \
pool.checkedout()
- print 'Pool size: %d Connections in pool: %d Current '\
- 'Overflow: %d Current Checked out connections: %d' % tup
+ print('Pool size: %d Connections in pool: %d Current '\
+ 'Overflow: %d Current Checked out connections: %d' % tup)
return tup
c1 = p.connect()
max_overflow=1, use_threadlocal=False, timeout=3)
timeouts = []
def checkout():
- for x in xrange(1):
+ for x in range(1):
now = time.time()
try:
c1 = p.connect()
c1.close()
threads = []
- for i in xrange(10):
+ for i in range(10):
th = threading.Thread(target=checkout)
th.start()
threads.append(th)
except tsa.exc.TimeoutError:
pass
threads = []
- for i in xrange(thread_count):
+ for i in range(thread_count):
th = threading.Thread(target=whammy)
th.start()
threads.append(th)
pool_size=2, timeout=timeout,
max_overflow=max_overflow)
def waiter(p):
+ success_key = (timeout, max_overflow)
conn = p.connect()
time.sleep(.5)
- success.append(True)
+ success.append(success_key)
conn.close()
time.sleep(.2)
c1.invalidate()
c2.invalidate()
p2 = p._replace()
- time.sleep(2)
- eq_(len(success), 12)
+ time.sleep(1)
+ eq_(len(success), 12, "successes: %s" % success)
@testing.requires.threading_with_mock
@testing.requires.python26
c3 = p.connect()
assert id(c3.connection) != c_id
+ def _assert_cleanup_on_pooled_reconnect(self, dbapi, p):
+ # p is QueuePool with size=1, max_overflow=2,
+ # and one connection in the pool that will need to
+ # reconnect when next used (either due to recycle or invalidate)
+ eq_(p.checkedout(), 0)
+ eq_(p._overflow, 0)
+ dbapi.shutdown(True)
+ assert_raises(
+ Exception,
+ p.connect
+ )
+ eq_(p._overflow, 0)
+ eq_(p.checkedout(), 0) # and not 1
+
+ dbapi.shutdown(False)
+
+ c1 = p.connect()
+ assert p._pool.empty() # poolsize is one, so we're empty OK
+ c2 = p.connect()
+ eq_(p._overflow, 1) # and not 2
+
+ # this hangs if p._overflow is 2
+ c3 = p.connect()
+
+ def test_error_on_pooled_reconnect_cleanup_invalidate(self):
+ dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=2)
+ c1 = p.connect()
+ c1.invalidate()
+ c1.close()
+ self._assert_cleanup_on_pooled_reconnect(dbapi, p)
+
+ def test_error_on_pooled_reconnect_cleanup_recycle(self):
+ dbapi, p = self._queuepool_dbapi_fixture(pool_size=1,
+ max_overflow=2, recycle=1)
+ c1 = p.connect()
+ c1.close()
+ time.sleep(1)
+ self._assert_cleanup_on_pooled_reconnect(dbapi, p)
+
+
def test_invalidate(self):
p = self._queuepool_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
return p.connect()
def checkout():
- for x in xrange(10):
+ for x in range(10):
c = _conn()
assert c
c.cursor()
time.sleep(.1)
threads = []
- for i in xrange(10):
+ for i in range(10):
th = threading.Thread(target=checkout)
th.start()
threads.append(th)