From: Mike Bayer Date: Sat, 13 Mar 2010 16:52:17 +0000 (-0500) Subject: fix the timeout test once and for all X-Git-Tag: rel_0_6beta2~54^2~16 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c4d429fc2a0700d61ba01cecd2157c6774feddf0;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git fix the timeout test once and for all --- diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 6dbadcb3b1..8d6ae3292a 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -25,7 +25,8 @@ from sqlalchemy.util import threading, pickle, as_interface, memoized_property proxies = {} def manage(module, **params): - """Return a proxy for a DB-API module that automatically pools connections. + """Return a proxy for a DB-API module that automatically + pools connections. Given a DB-API 2.0 module and pool management parameters, returns a proxy for the module that will automatically pool connections, @@ -58,8 +59,10 @@ def clear_managers(): class Pool(object): """Abstract base class for connection pools.""" - def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False, - reset_on_return=True, listeners=None): + def __init__(self, + creator, recycle=-1, echo=None, + use_threadlocal=False, + reset_on_return=True, listeners=None): """ Construct a Pool. @@ -232,7 +235,8 @@ class _ConnectionRecord(object): if self.__pool._on_connect: for l in self.__pool._on_connect: l.connect(self.connection, self) - elif (self.__pool._recycle > -1 and time.time() - self.starttime > self.__pool._recycle): + elif self.__pool._recycle > -1 and \ + time.time() - self.starttime > self.__pool._recycle: self.__pool.logger.info("Connection %r exceeded timeout; recycling", self.connection) self.__close() @@ -247,11 +251,11 @@ class _ConnectionRecord(object): try: self.__pool.logger.debug("Closing connection %r", self.connection) self.connection.close() + except (SystemExit, KeyboardInterrupt): + raise except Exception, e: self.__pool.logger.debug("Connection %r threw an error on close: %s", self.connection, e) - if isinstance(e, (SystemExit, KeyboardInterrupt)): - raise def __connect(self): try: @@ -282,6 +286,7 @@ def _finalize_fairy(connection, connection_record, pool, ref=None): connection_record.invalidate(e=e) if isinstance(e, (SystemExit, KeyboardInterrupt)): raise + if connection_record is not None: connection_record.fairy = None pool.logger.debug("Connection %r being returned to pool", connection) @@ -295,7 +300,8 @@ _refs = set() class _ConnectionFairy(object): """Proxies a DB-API connection and provides return-on-dereference support.""" - __slots__ = '_pool', '__counter', 'connection', '_connection_record', '__weakref__', '_detached_info' + __slots__ = '_pool', '__counter', 'connection', \ + '_connection_record', '__weakref__', '_detached_info' def __init__(self, pool): self._pool = pool @@ -536,7 +542,7 @@ class QueuePool(Pool): """A Pool that imposes a limit on the number of open connections.""" def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30, - **params): + **kw): """ Construct a QueuePool. @@ -598,7 +604,7 @@ class QueuePool(Pool): pool. """ - Pool.__init__(self, creator, **params) + Pool.__init__(self, creator, **kw) self._pool = sqla_queue.Queue(pool_size) self._overflow = 0 - pool_size self._max_overflow = max_overflow @@ -634,7 +640,10 @@ class QueuePool(Pool): if not wait: return self.do_get() else: - raise exc.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout)) + raise exc.TimeoutError( + "QueuePool limit of size %d overflow %d reached, " + "connection timed out, timeout %d" % + (self.size(), self.overflow(), self._timeout)) if self._overflow_lock is not None: self._overflow_lock.acquire() @@ -664,8 +673,12 @@ class QueuePool(Pool): self.logger.info("Pool disposed. %s", self.status()) def status(self): - tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout()) - return "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup + return "Pool size: %d Connections in pool: %d "\ + "Current Overflow: %d Current Checked out "\ + "connections: %d" % (self.size(), + self.checkedin(), + self.overflow(), + self.checkedout()) def size(self): return self._pool.maxsize @@ -787,7 +800,6 @@ class StaticPool(Pool): def do_get(self): return self.connection - class AssertionPool(Pool): """A Pool that allows at most one checked out connection at any given time. @@ -797,8 +809,6 @@ class AssertionPool(Pool): """ - ## TODO: modify this to handle an arbitrary connection count. - def __init__(self, *args, **kw): self._conn = None self._checked_out = False @@ -823,7 +833,8 @@ class AssertionPool(Pool): def recreate(self): self.logger.info("Pool recreating") - return AssertionPool(self._creator, echo=self.echo, listeners=self.listeners) + return AssertionPool(self._creator, echo=self.echo, + listeners=self.listeners) def do_get(self): if self._checked_out: @@ -843,7 +854,7 @@ class _DBProxy(object): to the underlying DB-API module. """ - def __init__(self, module, poolclass=QueuePool, **params): + def __init__(self, module, poolclass=QueuePool, **kw): """Initializes a new proxy. module @@ -853,10 +864,11 @@ class _DBProxy(object): a Pool class, defaulting to QueuePool Other parameters are sent to the Pool object's constructor. + """ self.module = module - self.params = params + self.kw = kw self.poolclass = poolclass self.pools = {} self._create_pool_mutex = threading.Lock() @@ -871,15 +883,15 @@ class _DBProxy(object): def __getattr__(self, key): return getattr(self.module, key) - def get_pool(self, *args, **params): - key = self._serialize(*args, **params) + def get_pool(self, *args, **kw): + key = self._serialize(*args, **kw) try: return self.pools[key] except KeyError: self._create_pool_mutex.acquire() try: if key not in self.pools: - pool = self.poolclass(lambda: self.module.connect(*args, **params), **self.params) + pool = self.poolclass(lambda: self.module.connect(*args, **kw), **self.kw) self.pools[key] = pool return pool else: @@ -887,7 +899,7 @@ class _DBProxy(object): finally: self._create_pool_mutex.release() - def connect(self, *args, **params): + def connect(self, *args, **kw): """Activate a connection to the database. Connect to the database using this DBProxy's module and the given @@ -898,18 +910,19 @@ class _DBProxy(object): If the pool has no available connections and allows new connections to be created, a new database connection will be made. + """ - return self.get_pool(*args, **params).connect() + return self.get_pool(*args, **kw).connect() - def dispose(self, *args, **params): - """Dispose the connection pool referenced by the given connect arguments.""" + def dispose(self, *args, **kw): + """Dispose the pool referenced by the given connect arguments.""" - key = self._serialize(*args, **params) + key = self._serialize(*args, **kw) try: del self.pools[key] except KeyError: pass - def _serialize(self, *args, **params): - return pickle.dumps([args, params]) + def _serialize(self, *args, **kw): + return pickle.dumps([args, kw]) diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 924f2e8afd..93bcae654b 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -511,7 +511,7 @@ class QueuePoolTest(PoolTestBase): try: c1 = p.connect() except tsa.exc.TimeoutError, e: - timeouts.append(int(time.time()) - now) + timeouts.append(time.time() - now) continue time.sleep(4) c1.close() @@ -524,11 +524,12 @@ class QueuePoolTest(PoolTestBase): for th in threads: th.join() - print timeouts assert len(timeouts) > 0 for t in timeouts: - assert abs(t - 3) < 1.5, "Not all timeouts were within 50% of 3 seconds: "\ - + repr(timeouts) + assert t >= 3, "Not all timeouts were >= 3 seconds %r" % timeouts + # normally, the timeout should under 4 seconds, + # but on a loaded down buildbot it can go up. + assert t < 10, "Not all timeouts were < 10 seconds %r" % timeouts def _test_overflow(self, thread_count, max_overflow): gc_collect()