From: Mike Bayer Date: Fri, 25 Aug 2006 16:27:10 +0000 (+0000) Subject: - cleanup on connection methods + documentation. custom DBAPI X-Git-Tag: rel_0_2_8~32 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8260ca2723ab3b08339ec9273fa729f70862fdf3;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - cleanup on connection methods + documentation. custom DBAPI arguments specified in query string, 'connect_args' argument to 'create_engine', or custom creation function via 'creator' function to 'create_engine'. - added "recycle" argument to Pool, is "pool_recycle" on create_engine, defaults to 3600 seconds; connections after this age will be closed and replaced with a new one, to handle db's that automatically close stale connections [ticket:274] --- diff --git a/CHANGES b/CHANGES index 4b83290122..db4d15054c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,12 @@ 0.2.8 +- cleanup on connection methods + documentation. custom DBAPI +arguments specified in query string, 'connect_args' argument +to 'create_engine', or custom creation function via 'creator' +function to 'create_engine'. +- added "recycle" argument to Pool, is "pool_recycle" on create_engine, +defaults to 3600 seconds; connections after this age will be closed and +replaced with a new one, to handle db's that automatically close +stale connections [ticket:274] - eesh ! the tutorial doctest was broken for quite some time. - add_property() method on mapper does a "compile all mappers" step in case the given property references a non-compiled mapper diff --git a/doc/build/content/dbengine.txt b/doc/build/content/dbengine.txt index e35cc0499f..d8dec80c0f 100644 --- a/doc/build/content/dbengine.txt +++ b/doc/build/content/dbengine.txt @@ -61,6 +61,28 @@ Available drivernames are `sqlite`, `mysql`, `postgres`, `oracle`, `mssql`, and The `Engine` will create its first connection to the database when a SQL statement is executed. As concurrent statements are executed, the underlying connection pool will grow to a default size of five connections, and will allow a default "overflow" of ten. Since the `Engine` is essentially "home base" for the connection pool, it follows that you should keep a single `Engine` per database established within an application, rather than creating a new one for each connection. +#### Custom DBAPI keyword arguments + +Custom arguments can be passed to the underlying DBAPI in three ways. String-based arguments can be passed directly from the URL string as query arguments: + + {python} + db = create_engine('postgres://scott:tiger@localhost/test?argument1=foo&argument2=bar') + +If SQLAlchemy's database connector is aware of a particular query argument, it may convert its type from string to its proper type. + +`create_engine` also takes an argument `connect_args` which is an additional dictionary that will be passed to `connect()`. This can be used when arguments of a type other than string are required, and SQLAlchemy's database connector has no type conversion logic present for that parameter: + + {python} + db = create_engine('postgres://scott:tiger@localhost/test', create_args = {'argument1':17, 'argument2':'bar'}) + +The most customizable connection method of all is to pass a `creator` argument, which specifies a callable that returns a DBAPI connection: + + {python} + def connect(): + return psycopg.connect(user='scott', host='localhost') + + db = create_engine('postgres://', creator=connect) + ### Database Engine Options {@name=options} Keyword options can also be specified to `create_engine()`, following the string URL as follows: @@ -71,9 +93,10 @@ Keyword options can also be specified to `create_engine()`, following the string Options that can be specified include the following: * strategy='plain' : the Strategy describes the general configuration used to create this Engine. The two available values are `plain`, which is the default, and `threadlocal`, which applies a "thread-local context" to implicit executions performed by the Engine. This context is further described in [dbengine_connections_context](rel:dbengine_connections_context). -* pool=None : an instance of `sqlalchemy.pool.Pool` to be used as the underlying source for connections, overriding the engine's connect arguments (pooling is described in [pooling](rel:pooling)). If None, a default `Pool` (usually `QueuePool`, or `SingletonThreadPool` in the case of SQLite) will be created using the engine's connect arguments. +* poolclass=None : a `sqlalchemy.pool.Pool` subclass (or duck-typed equivalent) that will be instantated in place of the default connection pool. +* pool=None : an actual pool instance. Note that an already-constructed pool should already know how to create database connections, so this option supercedes any other connect options specified. Typically, it is an instance of `sqlalchemy.pool.Pool` to be used as the underlying source for connections. For more on connection pooling, see [pooling](rel:pooling). -Example: +Example of a manual invocation of `pool.QueuePool` (which is the pool instance used for all databases except sqlite): {python} from sqlalchemy import * @@ -88,6 +111,7 @@ Example: * pool_size=5 : the number of connections to keep open inside the connection pool. This used with `QueuePool` as well as `SingletonThreadPool` as of 0.2.7. * max_overflow=10 : the number of connections to allow in "overflow", that is connections that can be opened above and beyond the initial five. this is only used with `QueuePool`. * pool_timeout=30 : number of seconds to wait before giving up on getting a connection from the pool. This is only used with `QueuePool`. +* pool_recycle=3600 : this setting causes the pool to recycle connections after the given number of seconds has passed. It defaults to 3600 seconds, or one hour. Note that MySQL in particular will disconnect automatically if no activity is detected on a connection for eight hours. * echo=False : if True, the Engine will log all statements as well as a repr() of their parameter lists to the engines logger, which defaults to sys.stdout. The `echo` attribute of `ComposedSQLEngine` can be modified at any time to turn logging on and off. If set to the string `"debug"`, result rows will be printed to the standard output as well. * logger=None : a file-like object where logging output can be sent, if echo is set to True. Newlines will not be sent with log messages. This defaults to an internal logging object which references `sys.stdout`. * module=None : used by database implementations which support multiple DBAPI modules, this is a reference to a DBAPI2 module to be used instead of the engine's default module. For Postgres, the default is psycopg2, or psycopg1 if 2 cannot be found. For Oracle, its cx_Oracle. diff --git a/doc/build/content/pooling.txt b/doc/build/content/pooling.txt index 186b3e596f..424a64ab47 100644 --- a/doc/build/content/pooling.txt +++ b/doc/build/content/pooling.txt @@ -31,6 +31,7 @@ When proxying a DBAPI module through the `pool` module, options exist for how th * echo=False : if set to True, connections being pulled and retrieved from/to the pool will be logged to the standard output, as well as pool sizing information. * use\_threadlocal=True : if set to True, repeated calls to connect() within the same application thread will be guaranteed to return the **same** connection object, if one has already been retrieved from the pool and has not been returned yet. This allows code to retrieve a connection from the pool, and then while still holding on to that connection, to call other functions which also ask the pool for a connection of the same arguments; those functions will act upon the same connection that the calling method is using. Note that once the connection is returned to the pool, it then may be used by another thread. To guarantee a single unique connection per thread that **never** changes, use the option `poolclass=SingletonThreadPool`, in which case the use_threadlocal parameter is not used. +* recycle=3600 : this setting causes the pool to recycle connections after the given number of seconds has passed. It defaults to 3600 seconds, or one hour. Note that MySQL in particular will disconnect automatically if no activity is detected on a connection for eight hours. * poolclass=QueuePool : the Pool class used by the pool module to provide pooling. QueuePool uses the Python `Queue.Queue` class to maintain a list of available connections. A developer can supply his or her own Pool class to supply a different pooling algorithm. Also included is the `SingletonThreadPool`, which provides a single distinct connection per thread and is required with SQLite. * pool\_size=5 : used by `QueuePool` as well as `SingletonThreadPool` as of 0.2.7 - the size of the pool to be maintained. This is the largest number of connections that will be kept persistently in the pool. Note that the pool begins with no connections; once this number of connections is requested, that number of connections will remain. * max\_overflow=10 : used by `QueuePool` - the maximum overflow size of the pool. When the number of checked-out connections reaches the size set in pool_size, additional connections will be returned up to this limit. When those additional connections are returned to the pool, they are disconnected and discarded. It follows then that the total number of simultaneous connections the pool will allow is `pool_size` + `max_overflow`, and the total number of "sleeping" connections the pool will allow is `pool_size`. `max_overflow` can be set to -1 to indicate no overflow limit; no limit will be placed on the total number of concurrent connections. diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 6bef1fabd1..f73ede7565 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -16,6 +16,8 @@ import base class PoolConnectionProvider(base.ConnectionProvider): def __init__(self, dialect, url, poolclass=None, pool=None, **kwargs): (cargs, cparams) = dialect.create_connect_args(url) + cparams.update(kwargs.pop('connect_args', {})) + if pool is None: kwargs.setdefault('echo', False) kwargs.setdefault('use_threadlocal',True) @@ -29,7 +31,8 @@ class PoolConnectionProvider(base.ConnectionProvider): return dbapi.connect(*cargs, **cparams) except Exception, e: raise exceptions.DBAPIError("Connection failed", e) - self._pool = poolclass(connect, **kwargs) + creator = kwargs.pop('creator', connect) + self._pool = poolclass(creator, **kwargs) else: if isinstance(pool, sqlalchemy.pool.DBProxy): self._pool = pool.get_pool(*cargs, **cparams) diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py index e2f5c8b7c5..716e5ffb99 100644 --- a/lib/sqlalchemy/engine/strategies.py +++ b/lib/sqlalchemy/engine/strategies.py @@ -20,54 +20,53 @@ class EngineStrategy(object): def create(self, *args, **kwargs): """given arguments, returns a new sql.Engine instance.""" raise NotImplementedError() - -class PlainEngineStrategy(EngineStrategy): - def __init__(self): - EngineStrategy.__init__(self, 'plain') - def create(self, name_or_url, **kwargs): +class DefaultEngineStrategy(EngineStrategy): + def create(self, name_or_url, **kwargs): u = url.make_url(name_or_url) module = u.get_module() - args = u.query.copy() - args.update(kwargs) - dialect = module.dialect(**args) + dialect = module.dialect(**kwargs) poolargs = {} - for key in (('echo_pool', 'echo'), ('pool_size', 'pool_size'), ('max_overflow', 'max_overflow'), ('poolclass', 'poolclass'), ('pool_timeout','timeout'), ('pool', 'pool')): - if kwargs.has_key(key[0]): - poolargs[key[1]] = kwargs[key[0]] + for key in (('echo_pool', 'echo'), ('pool_size', 'pool_size'), ('max_overflow', 'max_overflow'), ('poolclass', 'poolclass'), ('pool_timeout','timeout'), ('pool', 'pool'), ('pool_recycle','recycle'),('connect_args', 'connect_args'), ('creator', 'creator')): + if kwargs.has_key(key[0]): + poolargs[key[1]] = kwargs[key[0]] poolclass = getattr(module, 'poolclass', None) if poolclass is not None: - poolargs.setdefault('poolclass', poolclass) - poolargs['use_threadlocal'] = False - provider = default.PoolConnectionProvider(dialect, u, **poolargs) + poolargs.setdefault('poolclass', poolclass) + poolargs['use_threadlocal'] = self.pool_threadlocal() + provider = self.get_pool_provider(dialect, u, **poolargs) - return base.ComposedSQLEngine(provider, dialect, **args) -PlainEngineStrategy() + return self.get_engine(provider, dialect, **kwargs) -class ThreadLocalEngineStrategy(EngineStrategy): + def pool_threadlocal(self): + raise NotImplementedError() + def get_pool_provider(self, dialect, url, **kwargs): + raise NotImplementedError() + def get_engine(self, provider, dialect, **kwargs): + raise NotImplementedError() + +class PlainEngineStrategy(DefaultEngineStrategy): def __init__(self): - EngineStrategy.__init__(self, 'threadlocal') - def create(self, name_or_url, **kwargs): - u = url.make_url(name_or_url) - module = u.get_module() - - args = u.query.copy() - args.update(kwargs) - dialect = module.dialect(**args) - - poolargs = {} - for key in (('echo_pool', 'echo'), ('pool_size', 'pool_size'), ('max_overflow', 'max_overflow'), ('poolclass', 'poolclass'), ('pool_timeout','timeout'), ('pool', 'pool')): - if kwargs.has_key(key[0]): - poolargs[key[1]] = kwargs[key[0]] - poolclass = getattr(module, 'poolclass', None) - if poolclass is not None: - poolargs.setdefault('poolclass', poolclass) - poolargs['use_threadlocal'] = True - provider = threadlocal.TLocalConnectionProvider(dialect, u, **poolargs) + DefaultEngineStrategy.__init__(self, 'plain') + def pool_threadlocal(self): + return False + def get_pool_provider(self, dialect, url, **poolargs): + return default.PoolConnectionProvider(dialect, url, **poolargs) + def get_engine(self, provider, dialect, **kwargs): + return base.ComposedSQLEngine(provider, dialect, **kwargs) +PlainEngineStrategy() - return threadlocal.TLEngine(provider, dialect, **args) +class ThreadLocalEngineStrategy(DefaultEngineStrategy): + def __init__(self): + DefaultEngineStrategy.__init__(self, 'threadlocal') + def pool_threadlocal(self): + return True + def get_pool_provider(self, dialect, url, **poolargs): + return threadlocal.TLocalConnectionProvider(dialect, url, **poolargs) + def get_engine(self, provider, dialect, **kwargs): + return threadlocal.TLEngine(provider, dialect, **kwargs) ThreadLocalEngineStrategy() diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index f601dd9a2c..211f96070d 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -10,7 +10,12 @@ on a thread local basis. Also provides a DBAPI2 transparency layer so that pool be managed automatically, based on module type and connect arguments, simply by calling regular DBAPI connect() methods.""" -import weakref, string, cPickle +import weakref, string, time, sys +try: + import cPickle as pickle +except: + import pickle + from sqlalchemy import util, exceptions import sqlalchemy.queue as Queue @@ -71,40 +76,36 @@ def clear_managers(): class Pool(object): - def __init__(self, echo = False, use_threadlocal = True, logger=None): + def __init__(self, creator, recycle=-1, echo = False, use_threadlocal = True, logger=None): self._threadconns = weakref.WeakValueDictionary() + self._creator = creator + self._recycle = recycle self._use_threadlocal = use_threadlocal self.echo = echo self._logger = logger or util.Logger(origin='pool') def unique_connection(self): - return ConnectionFairy(self).checkout() - + return _ConnectionFairy(self).checkout() + + def create_connection(self): + return _ConnectionRecord(self) + def connect(self): if not self._use_threadlocal: - return ConnectionFairy(self).checkout() + return _ConnectionFairy(self).checkout() try: - return self._threadconns[thread.get_ident()].checkout() + return self._threadconns[thread.get_ident()].connfairy().checkout() except KeyError: - agent = ConnectionFairy(self).checkout() - self._threadconns[thread.get_ident()] = agent + agent = _ConnectionFairy(self).checkout() + self._threadconns[thread.get_ident()] = agent._threadfairy return agent - def _purge_for_threadlocal(self): - if self._use_threadlocal: - try: - del self._threadconns[thread.get_ident()] - except KeyError: - pass - def return_conn(self, agent): - self._purge_for_threadlocal() - self.do_return_conn(agent.connection) + self.do_return_conn(agent._connection_record) def return_invalid(self, agent): - self._purge_for_threadlocal() - self.do_return_invalid(agent.connection) + self.do_return_invalid(agent._connection_record) def get(self): return self.do_get() @@ -129,29 +130,60 @@ class Pool(object): def __del__(self): self.dispose() - -class ConnectionFairy(object): - def __init__(self, pool, connection=None): + +class _ConnectionRecord(object): + def __init__(self, pool): self.pool = pool - self.__counter = 0 - if connection is not None: - self.connection = connection - else: + self.connection = self.__connect() + def close(self): + self.connection.close() + def get_connection(self): + if self.pool._recycle > -1 and time.time() - self.starttime > self.pool._recycle: + self.pool.log("Connection %s exceeded timeout; recycling" % repr(self.connection)) try: - self.connection = pool.get() - except: - self.connection = None - self.pool.return_invalid(self) - raise - if self.pool.echo: - self.pool.log("Connection %s checked out from pool" % repr(self.connection)) + self.connection.close() + except Exception, e: + self.pool.log("Connection %s threw an error: %s" % (repr(self.connection), str(e))) + self.connection = self.__connect() + return self.connection + def __connect(self): + try: + self.starttime = time.time() + return self.pool._creator() + except: + raise + # TODO: reconnect support here ? + +class _ThreadFairy(object): + """marks a thread identifier as owning a connection, for a thread local pool.""" + def __init__(self, connfairy): + self.connfairy = weakref.ref(connfairy) + +class _ConnectionFairy(object): + """proxies a DBAPI connection object and provides return-on-dereference support""" + def __init__(self, pool): + self._threadfairy = _ThreadFairy(self) + self.__pool = pool + self.__counter = 0 + try: + self._connection_record = pool.get() + self.connection = self._connection_record.get_connection() + except: + self.connection = None # helps with endless __getattr__ loops later on + self._connection_record = None + self.__pool.return_invalid(self) + raise + if self.__pool.echo: + self.__pool.log("Connection %s checked out from pool" % repr(self.connection)) def invalidate(self): - if self.pool.echo: - self.pool.log("Invalidate connection %s" % repr(self.connection)) + if self.__pool.echo: + self.__pool.log("Invalidate connection %s" % repr(self.connection)) self.connection = None - self.pool.return_invalid(self) + self._connection_record = None + self._threadfairy = None + self.__pool.return_invalid(self) def cursor(self, *args, **kwargs): - return CursorFairy(self, self.connection.cursor(*args, **kwargs)) + return _CursorFairy(self, self.connection.cursor(*args, **kwargs)) def __getattr__(self, key): return getattr(self.connection, key) def checkout(self): @@ -167,20 +199,22 @@ class ConnectionFairy(object): self._close() def _close(self): if self.connection is not None: - if self.pool.echo: - self.pool.log("Connection %s being returned to pool" % repr(self.connection)) + if self.__pool.echo: + self.__pool.log("Connection %s being returned to pool" % repr(self.connection)) try: self.connection.rollback() except: # damn mysql -- (todo look for NotSupportedError) pass - self.pool.return_conn(self) - self.pool = None - self.connection = None + self.__pool.return_conn(self) + self.__pool = None + self.connection = None + self._connection_record = None + self._threadfairy = None -class CursorFairy(object): +class _CursorFairy(object): def __init__(self, parent, cursor): - self.parent = parent + self.__parent = parent self.cursor = cursor def __getattr__(self, key): return getattr(self.cursor, key) @@ -189,9 +223,8 @@ class SingletonThreadPool(Pool): """Maintains one connection per each thread, never moving to another thread. this is used for SQLite.""" def __init__(self, creator, pool_size=5, **params): - Pool.__init__(self, **params) + Pool.__init__(self, creator, **params) self._conns = {} - self._creator = creator self.size = pool_size def dispose(self): @@ -234,7 +267,7 @@ class SingletonThreadPool(Pool): try: return self._conns[thread.get_ident()] except KeyError: - c = self._creator() + c = self.create_connection() self._conns[thread.get_ident()] = c if len(self._conns) > self.size: self.cleanup() @@ -243,10 +276,8 @@ class SingletonThreadPool(Pool): class QueuePool(Pool): """uses Queue.Queue to maintain a fixed-size list of connections.""" def __init__(self, creator, pool_size = 5, max_overflow = 10, timeout=30, **params): - Pool.__init__(self, **params) - self._creator = creator + Pool.__init__(self, creator, **params) self._pool = Queue.Queue(pool_size) - self._overflow = 0 - pool_size self._max_overflow = max_overflow self._timeout = timeout @@ -268,7 +299,7 @@ class QueuePool(Pool): if self._max_overflow > -1 and self._overflow >= self._max_overflow: raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out" % (self.size(), self.overflow())) self._overflow += 1 - return self._creator() + return self.create_connection() def dispose(self): while True: @@ -344,5 +375,5 @@ class DBProxy(object): pass def _serialize(self, *args, **params): - return cPickle.dumps([args, params]) + return pickle.dumps([args, params]) diff --git a/test/engine/parseconnect.py b/test/engine/parseconnect.py index 9465af684f..5fffb4ae74 100644 --- a/test/engine/parseconnect.py +++ b/test/engine/parseconnect.py @@ -1,6 +1,8 @@ from testbase import PersistTest import sqlalchemy.engine.url as url +from sqlalchemy import * import unittest + class ParseConnectTest(PersistTest): def testrfc1738(self): @@ -33,6 +35,56 @@ class ParseConnectTest(PersistTest): assert u.host == 'hostspec' or u.host == '127.0.0.1' or (not u.host) assert str(u) == text +class CreateEngineTest(PersistTest): + """test that create_engine arguments of different types get propigated properly""" + def testconnectquery(self): + dbapi = MockDBAPI(foober='12', lala='18', fooz='somevalue') + + # start the postgres dialect, but put our mock DBAPI as the module instead of psycopg + e = create_engine('postgres://scott:tiger@somehost/test?foober=12&lala=18&fooz=somevalue', module=dbapi) + c = e.connect() + + def testkwargs(self): + dbapi = MockDBAPI(foober=12, lala=18, hoho={'this':'dict'}, fooz='somevalue') + + # start the postgres dialect, but put our mock DBAPI as the module instead of psycopg + e = create_engine('postgres://scott:tiger@somehost/test?fooz=somevalue', connect_args={'foober':12, 'lala':18, 'hoho':{'this':'dict'}}, module=dbapi) + c = e.connect() + + def testcustom(self): + dbapi = MockDBAPI(foober=12, lala=18, hoho={'this':'dict'}, fooz='somevalue') + + def connect(): + return dbapi.connect(foober=12, lala=18, fooz='somevalue', hoho={'this':'dict'}) + + # start the postgres dialect, but put our mock DBAPI as the module instead of psycopg + e = create_engine('postgres://', creator=connect, module=dbapi) + c = e.connect() + + def testrecycle(self): + dbapi = MockDBAPI(foober=12, lala=18, hoho={'this':'dict'}, fooz='somevalue') + e = create_engine('postgres://', pool_recycle=472, module=dbapi) + assert e.connection_provider._pool._recycle == 472 + +class MockDBAPI(object): + def __init__(self, **kwargs): + self.kwargs = kwargs + self.paramstyle = 'named' + def connect(self, **kwargs): + print kwargs, self.kwargs + for k in self.kwargs: + assert k in kwargs, "key %s not present in dictionary" % k + assert kwargs[k]==self.kwargs[k], "value %s does not match %s" % (kwargs[k], self.kwargs[k]) + return MockConnection() +class MockConnection(object): + def close(self): + pass + def cursor(self): + return MockCursor() +class MockCursor(object): + def close(self): + pass +mock_dbapi = MockDBAPI() if __name__ == "__main__": unittest.main() diff --git a/test/engine/pool.py b/test/engine/pool.py index d21f928a16..cfc8f5684a 100644 --- a/test/engine/pool.py +++ b/test/engine/pool.py @@ -115,6 +115,46 @@ class PoolTest(PersistTest): except exceptions.TimeoutError, e: assert int(time.time() - now) == 2 + def test_mixed_close(self): + p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo=True) + c1 = p.connect() + c2 = p.connect() + assert c1 is c2 + c1.close() + c2 = None + assert p.checkedout() == 1 + c1 = None + assert p.checkedout() == 0 + + def test_trick_the_counter(self): + """this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread + with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an + ambiguous counter. i.e. its not true reference counting.""" + p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo=True) + c1 = p.connect() + c2 = p.connect() + assert c1 is c2 + c1.close() + c2 = p.connect() + c2.close() + self.assert_(p.checkedout() != 0) + + c2.close() + self.assert_(p.checkedout() == 0) + + def test_recycle(self): + p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False, echo=True, recycle=3) + + c1 = p.connect() + c_id = id(c1.connection) + c1.close() + c2 = p.connect() + assert id(c2.connection) == c_id + c2.close() + time.sleep(3) + c3= p.connect() + assert id(c3.connection) != c_id + def testthreadlocal_del(self): self._do_testthreadlocal(useclose=False) @@ -123,7 +163,7 @@ class PoolTest(PersistTest): def _do_testthreadlocal(self, useclose=False): for p in ( - pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo = False), + pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo = True), pool.SingletonThreadPool(creator = lambda: mock_dbapi.connect('foo.db'), use_threadlocal = True) ): c1 = p.connect() @@ -143,8 +183,6 @@ class PoolTest(PersistTest): else: c2 = None - c3 = None - if useclose: c1 = p.connect() c2 = p.connect() @@ -153,8 +191,8 @@ class PoolTest(PersistTest): c2.close() self.assert_(c1.connection is not None) c1.close() - else: - c1 = c2 = c3 = None + + c1 = c2 = c3 = None # extra tests with QueuePool to insure connections get __del__()ed when dereferenced if isinstance(p, pool.QueuePool): diff --git a/test/testbase.py b/test/testbase.py index ddec64179c..041b87700a 100644 --- a/test/testbase.py +++ b/test/testbase.py @@ -146,18 +146,22 @@ class PersistTest(unittest.TestCase): class MockPool(pool.Pool): """this pool is hardcore about only one connection being used at a time.""" def __init__(self, creator, **params): - pool.Pool.__init__(self, **params) - self.connection = creator() + pool.Pool.__init__(self, creator, **params) + self.connection = pool._ConnectionRecord(self) self._conn = self.connection def status(self): return "MockPool" + def create_connection(self): + raise "Invalid" + def do_return_conn(self, conn): assert conn is self._conn and self.connection is None self.connection = conn - def do_return_invalid(self): + def do_return_invalid(self, conn): + pass raise "Invalid" def do_get(self):