condition that can bypass max_overflow; merged from 0.4 branch r2736-2738.
also made the locking logic simpler, tried to get the test to create a
failure on OSX (not successful)
- MetaData and all SchemaItems are safe to use with pickle. slow
table reflections can be dumped into a pickled file to be reused later.
Just reconnect the engine to the metadata after unpickling. [ticket:619]
+ - added a mutex to QueuePool's "overflow" calculation to prevent a race
+ condition that can bypass max_overflow
- fixed grouping of compound selects to give correct results. will break
on sqlite in some cases, but those cases were producing incorrect
results anyway, sqlite doesn't support grouped compound selects
from sqlalchemy import queue as Queue
try:
- import thread
+ import thread, threading
except:
import dummy_thread as thread
+ import dummy_threading as threading
proxies = {}
self._overflow = 0 - pool_size
self._max_overflow = max_overflow
self._timeout = timeout
+ self._overflow_lock = threading.Lock()
def recreate(self):
self.log("Pool recreating")
try:
self._pool.put(conn, False)
except Queue.Full:
- self._overflow -= 1
+ self._overflow_lock.acquire()
+ try:
+ self._overflow -= 1
+ finally:
+ self._overflow_lock.release()
def do_get(self):
try:
return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow, self._timeout)
except Queue.Empty:
- if self._max_overflow > -1 and self._overflow >= self._max_overflow:
- raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out" % (self.size(), self.overflow()))
- con = self.create_connection()
- self._overflow += 1
+ self._overflow_lock.acquire()
+ try:
+ if self._max_overflow > -1 and self._overflow >= self._max_overflow:
+ raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out" % (self.size(), self.overflow()))
+ con = self.create_connection()
+ self._overflow += 1
+ finally:
+ self._overflow_lock.release()
return con
def dispose(self):
import testbase
from testbase import PersistTest
import unittest, sys, os, time
+import threading, thread
import sqlalchemy.pool as pool
import sqlalchemy.exceptions as exceptions
assert False
except exceptions.TimeoutError, e:
assert int(time.time() - now) == 2
+
+ def _test_overflow(self, thread_count, max_overflow):
+ # i cant really get this to fail on OSX. linux? windows ?
+ p = pool.QueuePool(creator=lambda: mock_dbapi.connect('foo.db'),
+ pool_size=3, timeout=2,
+ max_overflow=max_overflow)
+ peaks = []
+ def whammy():
+ for i in range(10):
+ try:
+ con = p.connect()
+ peaks.append(p.overflow())
+ time.sleep(.005)
+ con.close()
+ del con
+ except exceptions.TimeoutError:
+ pass
+ threads = []
+ for i in xrange(thread_count):
+ th = threading.Thread(target=whammy)
+ th.start()
+ threads.append(th)
+ for th in threads:
+ th.join()
+
+ self.assert_(max(peaks) <= max_overflow)
+
+ def test_no_overflow(self):
+ self._test_overflow(40, 0)
+
+ def test_max_overflow(self):
+ self._test_overflow(40, 5)
def test_mixed_close(self):
p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True)