]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
fix the timeout test once and for all
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 13 Mar 2010 16:52:17 +0000 (11:52 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 13 Mar 2010 16:52:17 +0000 (11:52 -0500)
lib/sqlalchemy/pool.py
test/engine/test_pool.py

index 6dbadcb3b1b8d2dfe937500a7b71fbf53eb2b0ff..8d6ae3292a267e01f47ce6331a7f70281dbd550d 100644 (file)
@@ -25,7 +25,8 @@ from sqlalchemy.util import threading, pickle, as_interface, memoized_property
 proxies = {}
 
 def manage(module, **params):
-    """Return a proxy for a DB-API module that automatically pools connections.
+    """Return a proxy for a DB-API module that automatically 
+    pools connections.
 
     Given a DB-API 2.0 module and pool management parameters, returns
     a proxy for the module that will automatically pool connections,
@@ -58,8 +59,10 @@ def clear_managers():
 class Pool(object):
     """Abstract base class for connection pools."""
 
-    def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False,
-                 reset_on_return=True, listeners=None):
+    def __init__(self, 
+                    creator, recycle=-1, echo=None, 
+                    use_threadlocal=False,
+                    reset_on_return=True, listeners=None):
         """
         Construct a Pool.
 
@@ -232,7 +235,8 @@ class _ConnectionRecord(object):
             if self.__pool._on_connect:
                 for l in self.__pool._on_connect:
                     l.connect(self.connection, self)
-        elif (self.__pool._recycle > -1 and time.time() - self.starttime > self.__pool._recycle):
+        elif self.__pool._recycle > -1 and \
+                time.time() - self.starttime > self.__pool._recycle:
             self.__pool.logger.info("Connection %r exceeded timeout; recycling",
                             self.connection)
             self.__close()
@@ -247,11 +251,11 @@ class _ConnectionRecord(object):
         try:
             self.__pool.logger.debug("Closing connection %r", self.connection)
             self.connection.close()
+        except (SystemExit, KeyboardInterrupt):
+            raise
         except Exception, e:
             self.__pool.logger.debug("Connection %r threw an error on close: %s",
                             self.connection, e)
-            if isinstance(e, (SystemExit, KeyboardInterrupt)):
-                raise
 
     def __connect(self):
         try:
@@ -282,6 +286,7 @@ def _finalize_fairy(connection, connection_record, pool, ref=None):
                 connection_record.invalidate(e=e)
             if isinstance(e, (SystemExit, KeyboardInterrupt)):
                 raise
+                
     if connection_record is not None:
         connection_record.fairy = None
         pool.logger.debug("Connection %r being returned to pool", connection)
@@ -295,7 +300,8 @@ _refs = set()
 class _ConnectionFairy(object):
     """Proxies a DB-API connection and provides return-on-dereference support."""
 
-    __slots__ = '_pool', '__counter', 'connection', '_connection_record', '__weakref__', '_detached_info'
+    __slots__ = '_pool', '__counter', 'connection', \
+                '_connection_record', '__weakref__', '_detached_info'
     
     def __init__(self, pool):
         self._pool = pool
@@ -536,7 +542,7 @@ class QueuePool(Pool):
     """A Pool that imposes a limit on the number of open connections."""
 
     def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30,
-                 **params):
+                 **kw):
         """
         Construct a QueuePool.
 
@@ -598,7 +604,7 @@ class QueuePool(Pool):
           pool.
 
         """
-        Pool.__init__(self, creator, **params)
+        Pool.__init__(self, creator, **kw)
         self._pool = sqla_queue.Queue(pool_size)
         self._overflow = 0 - pool_size
         self._max_overflow = max_overflow
@@ -634,7 +640,10 @@ class QueuePool(Pool):
                 if not wait:
                     return self.do_get()
                 else:
-                    raise exc.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout))
+                    raise exc.TimeoutError(
+                                    "QueuePool limit of size %d overflow %d reached, "
+                                    "connection timed out, timeout %d" % 
+                                    (self.size(), self.overflow(), self._timeout))
 
             if self._overflow_lock is not None:
                 self._overflow_lock.acquire()
@@ -664,8 +673,12 @@ class QueuePool(Pool):
         self.logger.info("Pool disposed. %s", self.status())
 
     def status(self):
-        tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout())
-        return "Pool size: %d  Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup
+        return "Pool size: %d  Connections in pool: %d "\
+                "Current Overflow: %d Current Checked out "\
+                "connections: %d" % (self.size(), 
+                                    self.checkedin(), 
+                                    self.overflow(), 
+                                    self.checkedout())
 
     def size(self):
         return self._pool.maxsize
@@ -787,7 +800,6 @@ class StaticPool(Pool):
     def do_get(self):
         return self.connection
 
-
 class AssertionPool(Pool):
     """A Pool that allows at most one checked out connection at any given time.
 
@@ -797,8 +809,6 @@ class AssertionPool(Pool):
 
     """
 
-    ## TODO: modify this to handle an arbitrary connection count.
-
     def __init__(self, *args, **kw):
         self._conn = None
         self._checked_out = False
@@ -823,7 +833,8 @@ class AssertionPool(Pool):
 
     def recreate(self):
         self.logger.info("Pool recreating")
-        return AssertionPool(self._creator, echo=self.echo, listeners=self.listeners)
+        return AssertionPool(self._creator, echo=self.echo, 
+                            listeners=self.listeners)
         
     def do_get(self):
         if self._checked_out:
@@ -843,7 +854,7 @@ class _DBProxy(object):
     to the underlying DB-API module.
     """
 
-    def __init__(self, module, poolclass=QueuePool, **params):
+    def __init__(self, module, poolclass=QueuePool, **kw):
         """Initializes a new proxy.
 
         module
@@ -853,10 +864,11 @@ class _DBProxy(object):
           a Pool class, defaulting to QueuePool
 
         Other parameters are sent to the Pool object's constructor.
+        
         """
 
         self.module = module
-        self.params = params
+        self.kw = kw
         self.poolclass = poolclass
         self.pools = {}
         self._create_pool_mutex = threading.Lock()
@@ -871,15 +883,15 @@ class _DBProxy(object):
     def __getattr__(self, key):
         return getattr(self.module, key)
 
-    def get_pool(self, *args, **params):
-        key = self._serialize(*args, **params)
+    def get_pool(self, *args, **kw):
+        key = self._serialize(*args, **kw)
         try:
             return self.pools[key]
         except KeyError:
             self._create_pool_mutex.acquire()
             try:
                 if key not in self.pools:
-                    pool = self.poolclass(lambda: self.module.connect(*args, **params), **self.params)
+                    pool = self.poolclass(lambda: self.module.connect(*args, **kw), **self.kw)
                     self.pools[key] = pool
                     return pool
                 else:
@@ -887,7 +899,7 @@ class _DBProxy(object):
             finally:
                 self._create_pool_mutex.release()
                 
-    def connect(self, *args, **params):
+    def connect(self, *args, **kw):
         """Activate a connection to the database.
 
         Connect to the database using this DBProxy's module and the given
@@ -898,18 +910,19 @@ class _DBProxy(object):
 
         If the pool has no available connections and allows new connections
         to be created, a new database connection will be made.
+        
         """
 
-        return self.get_pool(*args, **params).connect()
+        return self.get_pool(*args, **kw).connect()
 
-    def dispose(self, *args, **params):
-        """Dispose the connection pool referenced by the given connect arguments."""
+    def dispose(self, *args, **kw):
+        """Dispose the pool referenced by the given connect arguments."""
 
-        key = self._serialize(*args, **params)
+        key = self._serialize(*args, **kw)
         try:
             del self.pools[key]
         except KeyError:
             pass
 
-    def _serialize(self, *args, **params):
-        return pickle.dumps([args, params])
+    def _serialize(self, *args, **kw):
+        return pickle.dumps([args, kw])
index 924f2e8afd1bad833f2b33c11fba01947ca0f0ca..93bcae654b4dcf95d3a31ba47382c55d9dda0047 100644 (file)
@@ -511,7 +511,7 @@ class QueuePoolTest(PoolTestBase):
                 try:
                     c1 = p.connect()
                 except tsa.exc.TimeoutError, e:
-                    timeouts.append(int(time.time()) - now)
+                    timeouts.append(time.time() - now)
                     continue
                 time.sleep(4)
                 c1.close()  
@@ -524,11 +524,12 @@ class QueuePoolTest(PoolTestBase):
         for th in threads:
             th.join() 
 
-        print timeouts
         assert len(timeouts) > 0
         for t in timeouts:
-            assert abs(t - 3) < 1.5, "Not all timeouts were within 50% of 3 seconds: "\
-                                        + repr(timeouts)
+            assert t >= 3, "Not all timeouts were >= 3 seconds %r" % timeouts
+            # normally, the timeout should under 4 seconds,
+            # but on a loaded down buildbot it can go up.
+            assert t < 10, "Not all timeouts were < 10 seconds %r" % timeouts
 
     def _test_overflow(self, thread_count, max_overflow):
         gc_collect()