From: Mike Bayer Date: Fri, 20 Jul 2007 15:42:46 +0000 (+0000) Subject: merged pool fix from [changeset:2989] and timeout fix from [changeset:2990] X-Git-Tag: rel_0_4_6~64 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d1a776efeda832f837c38ef264a8dd92673f86b5;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git merged pool fix from [changeset:2989] and timeout fix from [changeset:2990] --- diff --git a/CHANGES b/CHANGES index dc93d7d0f1..b9f8b8bf09 100644 --- a/CHANGES +++ b/CHANGES @@ -159,6 +159,12 @@ - Added PGArray datatype for using postgres array datatypes 0.3.10 +- general + - a new mutex that was added in 0.3.9 causes the pool_timeout + feature to fail during a race condition; threads would + raise TimeoutError immediately with no delay if many threads + push the pool into overflow at the same time. this issue has been + fixed. - sql - got connection-bound metadata to work with implicit execution - foreign key specs can have any chararcter in their identifiers diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index e9297f5a46..373ec30ac8 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -493,11 +493,18 @@ class QueuePool(Pool): try: return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow, self._timeout) except Queue.Empty: + if self._max_overflow > -1 and self._overflow >= self._max_overflow: + raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout)) + if self._overflow_lock is not None: self._overflow_lock.acquire() + + if self._max_overflow > -1 and self._overflow >= self._max_overflow: + if self._overflow_lock is not None: + self._overflow_lock.release() + return self.do_get() + try: - if self._max_overflow > -1 and self._overflow >= self._max_overflow: - raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out" % (self.size(), self.overflow())) con = self.create_connection() self._overflow += 1 finally: diff --git a/test/engine/pool.py b/test/engine/pool.py index 924dddeac9..85d44dfd38 100644 --- a/test/engine/pool.py +++ b/test/engine/pool.py @@ -10,9 +10,11 @@ mcid = 1 class MockDBAPI(object): def __init__(self): self.throw_error = False - def connect(self, argument): + def connect(self, argument, delay=0): if self.throw_error: raise Exception("couldnt connect !") + if delay: + time.sleep(delay) return MockConnection() class MockConnection(object): def __init__(self): @@ -118,16 +120,48 @@ class PoolTest(PersistTest): def test_timeout(self): p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2) - c1 = p.get() - c2 = p.get() - c3 = p.get() + c1 = p.connect() + c2 = p.connect() + c3 = p.connect() now = time.time() try: - c4 = p.get() + c4 = p.connect() assert False except exceptions.TimeoutError, e: assert int(time.time() - now) == 2 + def test_timeout_race(self): + # test a race condition where the initial connecting threads all race to queue.Empty, then block on the mutex. + # each thread consumes a connection as they go in. when the limit is reached, the remaining threads + # go in, and get TimeoutError; even though they never got to wait for the timeout on queue.get(). + # the fix involves checking the timeout again within the mutex, and if so, unlocking and throwing them back to the start + # of do_get() + p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db', delay=.05), pool_size = 2, max_overflow = 1, use_threadlocal = False, timeout=3) + timeouts = [] + def checkout(): + for x in xrange(1): + now = time.time() + try: + c1 = p.connect() + except exceptions.TimeoutError, e: + timeouts.append(int(time.time()) - now) + continue + time.sleep(4) + c1.close() + + threads = [] + for i in xrange(10): + th = threading.Thread(target=checkout) + th.start() + threads.append(th) + for th in threads: + th.join() + + print timeouts + assert len(timeouts) > 0 + for t in timeouts: + assert abs(t - 3) < 1, "Not all timeouts were 3 seconds: " + repr(timeouts) + def _test_overflow(self, thread_count, max_overflow): def creator(): time.sleep(.05) diff --git a/test/perf/poolload.py b/test/perf/poolload.py index b2a84033e3..d096f1c67f 100644 --- a/test/perf/poolload.py +++ b/test/perf/poolload.py @@ -1,37 +1,37 @@ -# this program should open three connections. then after five seconds, the remaining -# 45 threads should receive a timeout error. then the program will just stop until -# ctrl-C is pressed. it should *NOT* open a bunch of new connections. +# load test of connection pool from sqlalchemy import * import sqlalchemy.pool as pool -from testbase import Table, Column -import psycopg2 as psycopg import thread,time -psycopg = pool.manage(psycopg,pool_size=2,max_overflow=1, timeout=5, echo=True) -print psycopg -db = create_engine('postgres://scott:tiger@127.0.0.1/test',pool=psycopg,strategy='threadlocal') -print db.connection_provider._pool +db = create_engine('mysql://scott:tiger@127.0.0.1/test', pool_timeout=30, echo_pool=True) + metadata = MetaData(db) users_table = Table('users', metadata, Column('user_id', Integer, primary_key=True), Column('user_name', String(40)), Column('password', String(10))) +metadata.drop_all() metadata.create_all() -class User(object): - pass -usermapper = mapper(User, users_table) +users_table.insert().execute([{'user_name':'user#%d' % i, 'password':'pw#%d' % i} for i in range(1000)]) -#Then i create loads of threads and in run() of each thread: -def run(): - session = create_session() - transaction = session.create_transaction() - query = session.query(User) - u1=query.select(User.c.user_id==3) - -for x in range(0,50): - thread.start_new_thread(run, ()) +def runfast(): + while True: + c = db.connection_provider._pool.connect() + time.sleep(.5) + c.close() +# result = users_table.select(limit=100).execute() +# d = {} +# for row in result: +# for col in row.keys(): +# d[col] = row[col] +# time.sleep(.005) +# result.close() + print "runfast cycle complete" + +#thread.start_new_thread(runslow, ()) +for x in xrange(0,50): + thread.start_new_thread(runfast, ()) -while True: - time.sleep(5) +time.sleep(100)