def creator():
raise Exception("no creates allowed")
- for cls in (pool.SingletonThreadPool, pool.StaticPool,
+ for cls in (pool.SingletonThreadPool, pool.StaticPool,
pool.QueuePool, pool.NullPool, pool.AssertionPool):
p = cls(creator=creator)
p.dispose()
eq_(len(innerself.first_connected), fconn)
eq_(len(innerself.checked_out), cout)
eq_(len(innerself.checked_in), cin)
- def assert_in(innerself, item, in_conn, in_fconn,
+ def assert_in(innerself, item, in_conn, in_fconn,
in_cout, in_cin):
self.assert_((item in innerself.connected) == in_conn)
self.assert_((item in innerself.first_connected) == in_fconn)
def test_timeout(self):
p = self._queuepool_fixture(pool_size=3,
- max_overflow=0,
+ max_overflow=0,
timeout=2)
c1 = p.connect()
c2 = p.connect()
# them back to the start of do_get()
dbapi = MockDBAPI()
p = pool.QueuePool(
- creator = lambda: dbapi.connect(delay=.05),
- pool_size = 2,
+ creator = lambda: dbapi.connect(delay=.05),
+ pool_size = 2,
max_overflow = 1, use_threadlocal = False, timeout=3)
timeouts = []
def checkout():
th.start()
threads.append(th)
for th in threads:
- th.join()
+ th.join()
assert len(timeouts) > 0
for t in timeouts:
def creator():
time.sleep(.05)
return dbapi.connect()
-
+
p = pool.QueuePool(creator=creator,
pool_size=3, timeout=2,
max_overflow=max_overflow)
threads.append(th)
for th in threads:
th.join()
-
+
self.assert_(max(peaks) <= max_overflow)
lazy_gc()
def test_waiters_handled(self):
"""test that threads waiting for connections are
handled when the pool is replaced.
-
+
"""
dbapi = MockDBAPI()
def creator():
return dbapi.connect()
-
+
success = []
for timeout in (None, 30):
for max_overflow in (0, -1, 3):
max_overflow=max_overflow)
def waiter(p):
conn = p.connect()
- time.sleep(1)
+ time.sleep(.5)
success.append(True)
conn.close()
c1.invalidate()
c2.invalidate()
p2 = p._replace()
- time.sleep(1)
+ time.sleep(2)
eq_(len(success), 12)
@testing.requires.python26
dbapi = MockDBAPI()
def creator():
return dbapi.connect()
-
+
p = pool.QueuePool(creator=creator,
pool_size=2, timeout=None,
max_overflow=0)
def test_no_overflow(self):
self._test_overflow(40, 0)
-
+
def test_max_overflow(self):
self._test_overflow(40, 5)
-
+
def test_mixed_close(self):
p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
p = self._queuepool_fixture(pool_size=2,
max_overflow=2)
- # disable weakref collection of the
+ # disable weakref collection of the
# underlying connections
strong_refs = set()
def _conn():
self.assert_(p.checkedout() != 0)
c2.close()
self.assert_(p.checkedout() == 0)
-
+
def test_recycle(self):
p = self._queuepool_fixture(pool_size=1,
- max_overflow=0,
+ max_overflow=0,
recycle=3)
c1 = p.connect()
c_id = id(c1.connection)