From: Mike Bayer Date: Fri, 20 Jul 2007 15:10:56 +0000 (+0000) Subject: - a new mutex that was added in 0.3.9 causes the pool_timeout X-Git-Tag: rel_0_3_10~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=687d9342e6a4a59f63648bb83d28e338f274a0f6;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - a new mutex that was added in 0.3.9 causes the pool_timeout feature to fail during a race condition; threads would raise TimeoutError immediately with no delay if many threads push the pool into overflow at the same time. this issue has been fixed. --- diff --git a/CHANGES b/CHANGES index f61d453da0..41231af7db 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,10 @@ 0.3.10 +- general + - a new mutex that was added in 0.3.9 causes the pool_timeout + feature to fail during a race condition; threads would + raise TimeoutError immediately with no delay if many threads + push the pool into overflow at the same time. this issue has been + fixed. - sql - better quoting of identifiers when manipulating schemas - got connection-bound metadata to work with implicit execution diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index c3a317c3f3..cf442c9286 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -493,11 +493,18 @@ class QueuePool(Pool): try: return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow, self._timeout) except Queue.Empty: + if self._max_overflow > -1 and self._overflow >= self._max_overflow: + raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout)) + if self._overflow_lock is not None: self._overflow_lock.acquire() + + if self._max_overflow > -1 and self._overflow >= self._max_overflow: + if self._overflow_lock is not None: + self._overflow_lock.release() + return self.do_get() + try: - if self._max_overflow > -1 and self._overflow >= self._max_overflow: - raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out" % (self.size(), self.overflow())) con = self.create_connection() self._overflow += 1 finally: diff --git a/test/engine/pool.py b/test/engine/pool.py index 924dddeac9..3ae6fa2e13 100644 --- a/test/engine/pool.py +++ b/test/engine/pool.py @@ -10,9 +10,11 @@ mcid = 1 class MockDBAPI(object): def __init__(self): self.throw_error = False - def connect(self, argument): + def connect(self, argument, delay=0): if self.throw_error: raise Exception("couldnt connect !") + if delay: + time.sleep(delay) return MockConnection() class MockConnection(object): def __init__(self): @@ -118,16 +120,48 @@ class PoolTest(PersistTest): def test_timeout(self): p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2) - c1 = p.get() - c2 = p.get() - c3 = p.get() + c1 = p.connect() + c2 = p.connect() + c3 = p.connect() now = time.time() try: - c4 = p.get() + c4 = p.connect() assert False except exceptions.TimeoutError, e: assert int(time.time() - now) == 2 + def test_timeout_race(self): + # test a race condition where the initial connecting threads all race to queue.Empty, then block on the mutex. + # each thread consumes a connection as they go in. when the limit is reached, the remaining threads + # go in, and get TimeoutError; even though they never got to wait for the timeout on queue.get(). + # the fix involves checking the timeout again within the mutex, and if so, unlocking and throwing them back to the start + # of do_get() + p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db', delay=.05), pool_size = 2, max_overflow = 1, use_threadlocal = False, timeout=3) + timeouts = [] + def checkout(): + for x in xrange(1): + now = time.time() + try: + c1 = p.connect() + except exceptions.TimeoutError, e: + timeouts.append(int(time.time()) - now) + continue + time.sleep(4) + c1.close() + + threads = [] + for i in xrange(10): + th = threading.Thread(target=checkout) + th.start() + threads.append(th) + for th in threads: + th.join() + + print timeouts + assert len(timeouts) > 0 + for t in timeouts: + assert abs(t - 2) < 1 + def _test_overflow(self, thread_count, max_overflow): def creator(): time.sleep(.05) diff --git a/test/perf/poolload.py b/test/perf/poolload.py index 090827709d..d096f1c67f 100644 --- a/test/perf/poolload.py +++ b/test/perf/poolload.py @@ -1,36 +1,37 @@ -# this program should open three connections. then after five seconds, the remaining -# 45 threads should receive a timeout error. then the program will just stop until -# ctrl-C is pressed. it should *NOT* open a bunch of new connections. +# load test of connection pool from sqlalchemy import * import sqlalchemy.pool as pool -import psycopg2 as psycopg import thread,time -psycopg = pool.manage(psycopg,pool_size=2,max_overflow=1, timeout=5, echo=True) -print psycopg -db = create_engine('postgres://scott:tiger@127.0.0.1/test',pool=psycopg,strategy='threadlocal') -print db.connection_provider._pool +db = create_engine('mysql://scott:tiger@127.0.0.1/test', pool_timeout=30, echo_pool=True) + metadata = MetaData(db) users_table = Table('users', metadata, Column('user_id', Integer, primary_key=True), Column('user_name', String(40)), Column('password', String(10))) +metadata.drop_all() metadata.create_all() -class User(object): - pass -usermapper = mapper(User, users_table) +users_table.insert().execute([{'user_name':'user#%d' % i, 'password':'pw#%d' % i} for i in range(1000)]) -#Then i create loads of threads and in run() of each thread: -def run(): - session = create_session() - transaction = session.create_transaction() - query = session.query(User) - u1=query.select(User.c.user_id==3) - -for x in range(0,50): - thread.start_new_thread(run, ()) +def runfast(): + while True: + c = db.connection_provider._pool.connect() + time.sleep(.5) + c.close() +# result = users_table.select(limit=100).execute() +# d = {} +# for row in result: +# for col in row.keys(): +# d[col] = row[col] +# time.sleep(.005) +# result.close() + print "runfast cycle complete" + +#thread.start_new_thread(runslow, ()) +for x in xrange(0,50): + thread.start_new_thread(runfast, ()) -while True: - time.sleep(5) +time.sleep(100)