.. changelog::
:version: 0.8.4
+ .. change::
+ :tags: bug, engine, pool
+ :versions: 0.9.0b2
+ :tickets: 2522
+
+ Made a slight adjustment to the logic which waits for a pooled
+ connection to be available, such that for a connection pool
+ with no timeout specified, it will every half a second break out of
+ the wait to check for the so-called "abort" flag, which allows the
+ waiter to break out in case the whole connection pool was dumped;
+ normally the waiter should break out due to a notify_all() but it's
+ possible this notify_all() is missed in very slim cases.
+ This is an extension of logic first introduced in 0.8.0, and the
+ issue has only been observed occasionally in stress tests.
+
.. change::
:tags: bug, mssql
:versions: 0.9.0b2
return an item if one is immediately available, else raise the
``Empty`` exception (`timeout` is ignored in that case).
"""
-
self.not_empty.acquire()
try:
if not block:
raise Empty
elif timeout is None:
while self._empty():
- self.not_empty.wait()
+ # wait for only half a second, then
+ # loop around, so that we can see a change in
+ # _sqla_abort_context in case we missed the notify_all()
+ # called by abort()
+ self.not_empty.wait(.5)
if self._sqla_abort_context:
raise SAAbort(self._sqla_abort_context)
else:
if not self.not_full.acquire(False):
return
try:
+ # note that this is now optional
+ # as the waiters in get() both loop around
+ # to check the _sqla_abort_context flag periodically
notify_all(self.not_empty)
finally:
self.not_full.release()
handled when the pool is replaced.
"""
+ mutex = threading.Lock()
dbapi = MockDBAPI()
def creator():
- return dbapi.connect()
+ mutex.acquire()
+ try:
+ return dbapi.connect()
+ finally:
+ mutex.release()
success = []
for timeout in (None, 30):
for i in range(2):
t = threading.Thread(target=waiter,
args=(p, timeout, max_overflow))
+ t.daemon = True
t.start()
threads.add(t)
- c1.invalidate()
- c2.invalidate()
+ # this sleep makes sure that the
+ # two waiter threads hit upon wait()
+ # inside the queue, before we invalidate the other
+ # two conns
+ time.sleep(.2)
p2 = p._replace()
for t in threads:
p1 = pool.QueuePool(creator=creator1,
pool_size=1, timeout=None,
max_overflow=0)
- p2 = pool.QueuePool(creator=creator2,
- pool_size=1, timeout=None,
- max_overflow=-1)
+ p2 = pool.NullPool(creator=creator2)
def waiter(p):
conn = p.connect()
time.sleep(.5)