0.3.10
+- general
+ - a new mutex that was added in 0.3.9 causes the pool_timeout
+ feature to fail during a race condition; threads would
+ raise TimeoutError immediately with no delay if many threads
+ push the pool into overflow at the same time. this issue has been
+ fixed.
- sql
- better quoting of identifiers when manipulating schemas
- got connection-bound metadata to work with implicit execution
try:
return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow, self._timeout)
except Queue.Empty:
+ if self._max_overflow > -1 and self._overflow >= self._max_overflow:
+ raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout))
+
if self._overflow_lock is not None:
self._overflow_lock.acquire()
+
+ if self._max_overflow > -1 and self._overflow >= self._max_overflow:
+ if self._overflow_lock is not None:
+ self._overflow_lock.release()
+ return self.do_get()
+
try:
- if self._max_overflow > -1 and self._overflow >= self._max_overflow:
- raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out" % (self.size(), self.overflow()))
con = self.create_connection()
self._overflow += 1
finally:
class MockDBAPI(object):
def __init__(self):
self.throw_error = False
- def connect(self, argument):
+ def connect(self, argument, delay=0):
if self.throw_error:
raise Exception("couldnt connect !")
+ if delay:
+ time.sleep(delay)
return MockConnection()
class MockConnection(object):
def __init__(self):
def test_timeout(self):
p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2)
- c1 = p.get()
- c2 = p.get()
- c3 = p.get()
+ c1 = p.connect()
+ c2 = p.connect()
+ c3 = p.connect()
now = time.time()
try:
- c4 = p.get()
+ c4 = p.connect()
assert False
except exceptions.TimeoutError, e:
assert int(time.time() - now) == 2
+ def test_timeout_race(self):
+ # test a race condition where the initial connecting threads all race to queue.Empty, then block on the mutex.
+ # each thread consumes a connection as they go in. when the limit is reached, the remaining threads
+ # go in, and get TimeoutError; even though they never got to wait for the timeout on queue.get().
+ # the fix involves checking the timeout again within the mutex, and if so, unlocking and throwing them back to the start
+ # of do_get()
+ p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db', delay=.05), pool_size = 2, max_overflow = 1, use_threadlocal = False, timeout=3)
+ timeouts = []
+ def checkout():
+ for x in xrange(1):
+ now = time.time()
+ try:
+ c1 = p.connect()
+ except exceptions.TimeoutError, e:
+ timeouts.append(int(time.time()) - now)
+ continue
+ time.sleep(4)
+ c1.close()
+
+ threads = []
+ for i in xrange(10):
+ th = threading.Thread(target=checkout)
+ th.start()
+ threads.append(th)
+ for th in threads:
+ th.join()
+
+ print timeouts
+ assert len(timeouts) > 0
+ for t in timeouts:
+ assert abs(t - 2) < 1
+
def _test_overflow(self, thread_count, max_overflow):
def creator():
time.sleep(.05)
-# this program should open three connections. then after five seconds, the remaining
-# 45 threads should receive a timeout error. then the program will just stop until
-# ctrl-C is pressed. it should *NOT* open a bunch of new connections.
+# load test of connection pool
from sqlalchemy import *
import sqlalchemy.pool as pool
-import psycopg2 as psycopg
import thread,time
-psycopg = pool.manage(psycopg,pool_size=2,max_overflow=1, timeout=5, echo=True)
-print psycopg
-db = create_engine('postgres://scott:tiger@127.0.0.1/test',pool=psycopg,strategy='threadlocal')
-print db.connection_provider._pool
+db = create_engine('mysql://scott:tiger@127.0.0.1/test', pool_timeout=30, echo_pool=True)
+
metadata = MetaData(db)
users_table = Table('users', metadata,
Column('user_id', Integer, primary_key=True),
Column('user_name', String(40)),
Column('password', String(10)))
+metadata.drop_all()
metadata.create_all()
-class User(object):
- pass
-usermapper = mapper(User, users_table)
+users_table.insert().execute([{'user_name':'user#%d' % i, 'password':'pw#%d' % i} for i in range(1000)])
-#Then i create loads of threads and in run() of each thread:
-def run():
- session = create_session()
- transaction = session.create_transaction()
- query = session.query(User)
- u1=query.select(User.c.user_id==3)
-
-for x in range(0,50):
- thread.start_new_thread(run, ())
+def runfast():
+ while True:
+ c = db.connection_provider._pool.connect()
+ time.sleep(.5)
+ c.close()
+# result = users_table.select(limit=100).execute()
+# d = {}
+# for row in result:
+# for col in row.keys():
+# d[col] = row[col]
+# time.sleep(.005)
+# result.close()
+ print "runfast cycle complete"
+
+#thread.start_new_thread(runslow, ())
+for x in xrange(0,50):
+ thread.start_new_thread(runfast, ())
-while True:
- time.sleep(5)
+time.sleep(100)