join_timeout = 10
-def MockDBAPI():
+def MockDBAPI(): # noqa
def cursor():
return Mock()
def _queuepool_dbapi_fixture(self, **kw):
dbapi = MockDBAPI()
- return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
- **kw)
+ return dbapi, pool.QueuePool(
+ creator=lambda: dbapi.connect('foo.db'),
+ **kw)
class PoolTest(PoolTestBase):
assert c1 is not c2
assert c1 is c3
- eq_(dbapi.connect.mock_calls,
+ eq_(
+ dbapi.connect.mock_calls,
[
call("foo.db"),
call("foo.db"),
for p in pool.QueuePool(creator=dbapi.connect,
pool_size=3, max_overflow=-1,
use_threadlocal=True), \
- pool.SingletonThreadPool(creator=dbapi.connect,
+ pool.SingletonThreadPool(
+ creator=dbapi.connect,
use_threadlocal=True):
c1 = p.connect()
c2 = p.connect()
def test_first_connect_event(self):
p, canary = self._first_connect_event_fixture()
- c1 = p.connect()
+ p.connect()
eq_(canary, ['first_connect'])
def test_first_connect_event_fires_once(self):
p, canary = self._first_connect_event_fixture()
- c1 = p.connect()
- c2 = p.connect()
+ p.connect()
+ p.connect()
eq_(canary, ['first_connect'])
p, canary = self._first_connect_event_fixture()
p2 = p.recreate()
- c1 = p.connect()
- c2 = p2.connect()
+ p.connect()
+ p2.connect()
eq_(canary, ['first_connect', 'first_connect'])
def test_first_connect_on_subsequently_recreated(self):
p, canary = self._first_connect_event_fixture()
- c1 = p.connect()
+ p.connect()
p2 = p.recreate()
- c2 = p2.connect()
+ p2.connect()
eq_(canary, ['first_connect', 'first_connect'])
def test_connect_event(self):
p, canary = self._connect_event_fixture()
- c1 = p.connect()
+ p.connect()
eq_(canary, ['connect'])
def test_connect_event_fires_subsequent(self):
p, canary = self._connect_event_fixture()
- c1 = p.connect()
- c2 = p.connect()
+ c1 = p.connect() # noqa
+ c2 = p.connect() # noqa
eq_(canary, ['connect', 'connect'])
p2 = p.recreate()
- c1 = p.connect()
- c2 = p2.connect()
+ p.connect()
+ p2.connect()
eq_(canary, ['connect', 'connect'])
def test_connect_on_subsequently_recreated(self):
p, canary = self._connect_event_fixture()
- c1 = p.connect()
+ p.connect()
p2 = p.recreate()
- c2 = p2.connect()
+ p2.connect()
eq_(canary, ['connect', 'connect'])
def test_checkout_event(self):
p, canary = self._checkout_event_fixture()
- c1 = p.connect()
+ p.connect()
eq_(canary, ['checkout'])
def test_checkout_event_fires_subsequent(self):
p, canary = self._checkout_event_fixture()
- c1 = p.connect()
- c2 = p.connect()
+ p.connect()
+ p.connect()
eq_(canary, ['checkout', 'checkout'])
def test_checkout_event_on_subsequently_recreated(self):
p, canary = self._checkout_event_fixture()
- c1 = p.connect()
+ p.connect()
p2 = p.recreate()
- c2 = p2.connect()
+ p2.connect()
eq_(canary, ['checkout', 'checkout'])
for th in threads:
th.join(join_timeout)
- eq_(evt.mock_calls,
- [
- call.first_connect(),
- call.connect(),
- call.connect(),
- call.connect()]
- )
+ eq_(
+ evt.mock_calls,
+ [
+ call.first_connect(),
+ call.connect(),
+ call.connect(),
+ call.connect()]
+ )
class DeprecatedPoolListenerTest(PoolTestBase):
@testing.requires.predictable_gc
@testing.uses_deprecated(r".*Use event.listen")
def test_listeners(self):
+
class InstrumentingListener(object):
def __init__(self):
if hasattr(self, 'connect'):
self.checked_out = []
self.checked_in = []
- def assert_total(innerself, conn, fconn, cout, cin):
- eq_(len(innerself.connected), conn)
- eq_(len(innerself.first_connected), fconn)
- eq_(len(innerself.checked_out), cout)
- eq_(len(innerself.checked_in), cin)
+ def assert_total(self, conn, fconn, cout, cin):
+ eq_(len(self.connected), conn)
+ eq_(len(self.first_connected), fconn)
+ eq_(len(self.checked_out), cout)
+ eq_(len(self.checked_in), cin)
- def assert_in(innerself, item, in_conn, in_fconn,
- in_cout, in_cin):
- self.assert_((item in innerself.connected) == in_conn)
- self.assert_((item in innerself.first_connected) == in_fconn)
- self.assert_((item in innerself.checked_out) == in_cout)
- self.assert_((item in innerself.checked_in) == in_cin)
+ def assert_in(
+ self, item, in_conn, in_fconn,
+ in_cout, in_cin):
+ eq_((item in self.connected), in_conn)
+ eq_((item in self.first_connected), in_fconn)
+ eq_((item in self.checked_out), in_cout)
+ eq_((item in self.checked_in), in_cin)
def inst_connect(self, con, record):
print("connect(%s, %s)" % (con, record))
self._do_testqueuepool(useclose=True)
def _do_testqueuepool(self, useclose=False):
- p = self._queuepool_fixture(pool_size=3,
- max_overflow=-1)
+ p = self._queuepool_fixture(
+ pool_size=3,
+ max_overflow=-1)
def status(pool):
return pool.size(), pool.checkedin(), pool.overflow(), \
@testing.requires.timing_intensive
def test_timeout(self):
- p = self._queuepool_fixture(pool_size=3,
- max_overflow=0,
- timeout=2)
- c1 = p.connect()
- c2 = p.connect()
- c3 = p.connect()
+ p = self._queuepool_fixture(
+ pool_size=3,
+ max_overflow=0,
+ timeout=2)
+ c1 = p.connect() # noqa
+ c2 = p.connect() # noqa
+ c3 = p.connect() # noqa
now = time.time()
- try:
- c4 = p.connect()
- assert False
- except tsa.exc.TimeoutError:
- assert int(time.time() - now) == 2
+
+ assert_raises(
+ tsa.exc.TimeoutError,
+ p.connect
+ )
+ assert int(time.time() - now) == 2
@testing.requires.threading_with_mock
@testing.requires.timing_intensive
# them back to the start of do_get()
dbapi = MockDBAPI()
p = pool.QueuePool(
- creator=lambda: dbapi.connect(delay=.05),
- pool_size=2,
- max_overflow=1, use_threadlocal=False, timeout=3)
+ creator=lambda: dbapi.connect(delay=.05),
+ pool_size=2,
+ max_overflow=1, use_threadlocal=False, timeout=3)
timeouts = []
def checkout():
return creator()
p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3)
- c1 = self._with_teardown(p.connect())
- c2 = self._with_teardown(p.connect())
- c3 = self._with_teardown(p.connect())
+ c1 = self._with_teardown(p.connect()) # noqa
+ c2 = self._with_teardown(p.connect()) # noqa
+ c3 = self._with_teardown(p.connect()) # noqa
eq_(p._overflow, 1)
creator = failing_dbapi
assert_raises(Exception, p.connect)
threads = [
threading.Thread(
- target=run_test, args=("success_one", p, False)),
+ target=run_test, args=("success_one", p, False)),
threading.Thread(
- target=run_test, args=("success_two", p, False)),
+ target=run_test, args=("success_two", p, False)),
threading.Thread(
- target=run_test, args=("overflow_one", p, True)),
+ target=run_test, args=("overflow_one", p, True)),
threading.Thread(
- target=run_test, args=("overflow_two", p, False)),
+ target=run_test, args=("overflow_two", p, False)),
threading.Thread(
- target=run_test, args=("overflow_three", p, False))
+ target=run_test, args=("overflow_three", p, False))
]
for t in threads:
t.start()
time.sleep(.1)
conn.close()
- c1 = p.connect()
+ c1 = p.connect() # noqa
c2 = p.connect()
threads = []
for i in range(2):
- t = threading.Thread(target=waiter,
- args=(p, timeout, max_overflow))
+ t = threading.Thread(
+ target=waiter,
+ args=(p, timeout, max_overflow))
t.daemon = True
t.start()
threads.append(t)
def test_connrec_invalidated_within_checkout_no_race(self):
"""Test that a concurrent ConnectionRecord.invalidate() which
- occurs after the ConnectionFairy has called _ConnectionRecord.checkout()
+ occurs after the ConnectionFairy has called
+ _ConnectionRecord.checkout()
but before the ConnectionFairy tests "fairy.connection is None"
will not result in an InvalidRequestError.
is_(conn._connection_record.connection, None)
conn.close()
-
@testing.requires.threading_with_mock
@testing.requires.timing_intensive
def test_notify_waiters(self):
def creator():
canary.append(1)
return dbapi.connect()
- p1 = pool.QueuePool(creator=creator,
- pool_size=1, timeout=None,
- max_overflow=0)
+ p1 = pool.QueuePool(
+ creator=creator,
+ pool_size=1, timeout=None,
+ max_overflow=0)
def waiter(p):
conn = p.connect()
self._test_overflow_no_gc(False)
def _test_overflow_no_gc(self, threadlocal):
- p = self._queuepool_fixture(pool_size=2,
- max_overflow=2)
+ p = self._queuepool_fixture(
+ pool_size=2,
+ max_overflow=2)
# disable weakref collection of the
# underlying connections
@testing.requires.predictable_gc
def test_weakref_kaboom(self):
- p = self._queuepool_fixture(pool_size=3,
- max_overflow=-1, use_threadlocal=True)
+ p = self._queuepool_fixture(
+ pool_size=3,
+ max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
c1.close()
ConnectionFairy with an ambiguous counter. i.e. its not true
reference counting."""
- p = self._queuepool_fixture(pool_size=3,
- max_overflow=-1, use_threadlocal=True)
+ p = self._queuepool_fixture(
+ pool_size=3,
+ max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
@testing.requires.timing_intensive
def test_recycle_on_invalidate(self):
- p = self._queuepool_fixture(pool_size=1,
- max_overflow=0)
+ p = self._queuepool_fixture(
+ pool_size=1,
+ max_overflow=0)
c1 = p.connect()
c_id = id(c1.connection)
c1.close()
@testing.requires.timing_intensive
def test_recycle_on_soft_invalidate(self):
- p = self._queuepool_fixture(pool_size=1,
- max_overflow=0)
+ p = self._queuepool_fixture(
+ pool_size=1,
+ max_overflow=0)
c1 = p.connect()
c_id = id(c1.connection)
c1.close()
dbapi.shutdown(False)
- c1 = self._with_teardown(p.connect())
+ c1 = self._with_teardown(p.connect()) # noqa
assert p._pool.empty() # poolsize is one, so we're empty OK
- c2 = self._with_teardown(p.connect())
+ c2 = self._with_teardown(p.connect()) # noqa
eq_(p._overflow, 1) # and not 2
# this hangs if p._overflow is 2
c3 = self._with_teardown(p.connect())
+ c3.close()
+
def test_error_on_pooled_reconnect_cleanup_invalidate(self):
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=2)
c1 = p.connect()
@testing.requires.timing_intensive
def test_error_on_pooled_reconnect_cleanup_recycle(self):
- dbapi, p = self._queuepool_dbapi_fixture(pool_size=1,
- max_overflow=2, recycle=1)
+ dbapi, p = self._queuepool_dbapi_fixture(
+ pool_size=1,
+ max_overflow=2, recycle=1)
c1 = p.connect()
c1.close()
time.sleep(1.5)
c = p.connect()
c.close()
-
def test_error_on_pooled_reconnect_cleanup_wcheckout_event(self):
- dbapi, p = self._queuepool_dbapi_fixture(pool_size=1,
- max_overflow=2)
+ dbapi, p = self._queuepool_dbapi_fixture(
+ pool_size=1,
+ max_overflow=2)
c1 = p.connect()
c1.close()
def attempt(conn):
time.sleep(random.random())
try:
- conn._handle_dbapi_exception(Error(), "statement", {},
- Mock(), Mock())
+ conn._handle_dbapi_exception(
+ Error(), "statement", {},
+ Mock(), Mock())
except tsa.exc.DBAPIError:
pass
# run an error + invalidate operation on the remaining 7 open
- #connections
+ # connections
threads = []
for conn in conns:
t = threading.Thread(target=attempt, args=(conn, ))
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
c1.detach()
- c2 = p.connect()
+ c2 = p.connect() # noqa
eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")])
c1_con = c1.connection
class ResetOnReturnTest(PoolTestBase):
def _fixture(self, **kw):
dbapi = Mock()
- return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
- **kw)
+ return dbapi, pool.QueuePool(
+ creator=lambda: dbapi.connect('foo.db'),
+ **kw)
def test_plain_rollback(self):
dbapi, p = self._fixture(reset_on_return='rollback')
def test_connect_error(self):
dbapi = MockDBAPI()
p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db'))
- c1 = p.connect()
+ c1 = p.connect() # noqa
assert_raises(AssertionError, p.connect)
def test_connect_multiple(self):
c2 = p.connect()
c2.close()
- c3 = p.connect()
+ c3 = p.connect() # noqa
assert_raises(AssertionError, p.connect)
c1 = None
c1 = p.connect()
- dbapi.connect.assert_has_calls([
- call('foo.db'),
- call('foo.db')],
- any_order=True)
+ dbapi.connect.assert_has_calls(
+ [
+ call('foo.db'),
+ call('foo.db')],
+ any_order=True)
class StaticPoolTest(PoolTestBase):
def test_recreate(self):
dbapi = MockDBAPI()
- creator = lambda: dbapi.connect('foo.db')
+
+ def creator():
+ return dbapi.connect('foo.db')
p = pool.StaticPool(creator)
p2 = p.recreate()
assert p._creator is p2._creator
conn = e.connect()
conn.close()
-
-