]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- a new mutex that was added in 0.3.9 causes the pool_timeout
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 20 Jul 2007 15:10:56 +0000 (15:10 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 20 Jul 2007 15:10:56 +0000 (15:10 +0000)
feature to fail during a race condition; threads would
raise TimeoutError immediately with no delay if many threads
push the pool into overflow at the same time.  this issue has been
fixed.

CHANGES
lib/sqlalchemy/pool.py
test/engine/pool.py
test/perf/poolload.py

diff --git a/CHANGES b/CHANGES
index f61d453da0268d150cef28fd96236ba3ced3c111..41231af7db646a73cc000f01ed2683331fad78bf 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,4 +1,10 @@
 0.3.10
+- general
+    - a new mutex that was added in 0.3.9 causes the pool_timeout
+      feature to fail during a race condition; threads would 
+      raise TimeoutError immediately with no delay if many threads 
+      push the pool into overflow at the same time.  this issue has been
+      fixed.
 - sql
     - better quoting of identifiers when manipulating schemas
     - got connection-bound metadata to work with implicit execution
index c3a317c3f3fba845f223602a3ff3158b6c8f6bd6..cf442c92864f6d0050d55facbb3ea6c9bfdb780b 100644 (file)
@@ -493,11 +493,18 @@ class QueuePool(Pool):
         try:
             return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow, self._timeout)
         except Queue.Empty:
+            if self._max_overflow > -1 and self._overflow >= self._max_overflow:
+                raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout))
+
             if self._overflow_lock is not None:
                 self._overflow_lock.acquire()
+
+            if self._max_overflow > -1 and self._overflow >= self._max_overflow:
+                if self._overflow_lock is not None:
+                    self._overflow_lock.release()
+                return self.do_get()
+
             try:
-                if self._max_overflow > -1 and self._overflow >= self._max_overflow:
-                    raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out" % (self.size(), self.overflow()))
                 con = self.create_connection()
                 self._overflow += 1
             finally:
index 924dddeac9c107bf6265fd3b1838b7eab55c9365..3ae6fa2e1373d0f66f8c1e4c5fdf65974a8d183f 100644 (file)
@@ -10,9 +10,11 @@ mcid = 1
 class MockDBAPI(object):
     def __init__(self):
         self.throw_error = False
-    def connect(self, argument):
+    def connect(self, argument, delay=0):
         if self.throw_error:
             raise Exception("couldnt connect !")
+        if delay:
+            time.sleep(delay)
         return MockConnection()
 class MockConnection(object):
     def __init__(self):
@@ -118,16 +120,48 @@ class PoolTest(PersistTest):
     
     def test_timeout(self):
         p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2)
-        c1 = p.get()
-        c2 = p.get()
-        c3 = p.get()
+        c1 = p.connect()
+        c2 = p.connect()
+        c3 = p.connect()
         now = time.time()
         try:
-            c4 = p.get()
+            c4 = p.connect()
             assert False
         except exceptions.TimeoutError, e:
             assert int(time.time() - now) == 2
 
+    def test_timeout_race(self):
+        # test a race condition where the initial connecting threads all race to queue.Empty, then block on the mutex.
+        # each thread consumes a connection as they go in.  when the limit is reached, the remaining threads
+        # go in, and get TimeoutError; even though they never got to wait for the timeout on queue.get().
+        # the fix involves checking the timeout again within the mutex, and if so, unlocking and throwing them back to the start
+        # of do_get()
+        p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db', delay=.05), pool_size = 2, max_overflow = 1, use_threadlocal = False, timeout=3)
+        timeouts = []
+        def checkout():
+            for x in xrange(1):
+                now = time.time()
+                try:
+                    c1 = p.connect()
+                except exceptions.TimeoutError, e:
+                    timeouts.append(int(time.time()) - now)
+                    continue
+                time.sleep(4)
+                c1.close()
+            
+        threads = []
+        for i in xrange(10):
+            th = threading.Thread(target=checkout)
+            th.start()
+            threads.append(th)
+        for th in threads:
+            th.join()
+        
+        print timeouts
+        assert len(timeouts) > 0
+        for t in timeouts:
+            assert abs(t - 2) < 1
+        
     def _test_overflow(self, thread_count, max_overflow):
         def creator():
             time.sleep(.05)
index 090827709da5d372ed680db0ce12e98a91e238ba..d096f1c67ffa5e74dda353f275fff6b938b1948f 100644 (file)
@@ -1,36 +1,37 @@
-# this program should open three connections.  then after five seconds, the remaining
-# 45 threads should receive a timeout error.  then the program will just stop until
-# ctrl-C is pressed.  it should *NOT* open a bunch of new connections.
+# load test of connection pool
 
 from sqlalchemy import *
 import sqlalchemy.pool as pool
-import psycopg2 as psycopg
 import thread,time
-psycopg = pool.manage(psycopg,pool_size=2,max_overflow=1, timeout=5, echo=True)
-print psycopg
-db = create_engine('postgres://scott:tiger@127.0.0.1/test',pool=psycopg,strategy='threadlocal')
-print db.connection_provider._pool
+db = create_engine('mysql://scott:tiger@127.0.0.1/test', pool_timeout=30, echo_pool=True)
+
 metadata = MetaData(db)
 
 users_table = Table('users', metadata,
   Column('user_id', Integer, primary_key=True),
   Column('user_name', String(40)),
   Column('password', String(10)))
+metadata.drop_all()
 metadata.create_all()
 
-class User(object):
-    pass
-usermapper = mapper(User, users_table)
+users_table.insert().execute([{'user_name':'user#%d' % i, 'password':'pw#%d' % i} for i in range(1000)])
 
-#Then i create loads of threads and in run() of each thread:
-def run():
-    session = create_session()
-    transaction = session.create_transaction()
-    query = session.query(User)
-    u1=query.select(User.c.user_id==3)
-    
-for x in range(0,50):
-    thread.start_new_thread(run, ())
+def runfast():
+    while True:
+        c = db.connection_provider._pool.connect()
+        time.sleep(.5)
+        c.close()
+#        result = users_table.select(limit=100).execute()
+#        d = {}
+#        for row in result:
+#            for col in row.keys():
+#                d[col] = row[col]
+#        time.sleep(.005)
+#        result.close()
+        print "runfast cycle complete"
+        
+#thread.start_new_thread(runslow, ())                
+for x in xrange(0,50):
+    thread.start_new_thread(runfast, ())
 
-while True:
-    time.sleep(5)
+time.sleep(100)