From: Jason Kirtland Date: Sat, 10 Nov 2007 19:31:06 +0000 (+0000) Subject: - Pool listeners preserved on pool.recreate() X-Git-Tag: rel_0_4_1~31 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=7f85c1e44323a49540fd6fefb02b955445302d8a;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - Pool listeners preserved on pool.recreate() - Docstring rampage --- diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 8b3acbec5b..40114144b9 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -22,11 +22,10 @@ from sqlalchemy import exceptions, logging from sqlalchemy import queue as Queue from sqlalchemy.util import thread, threading, pickle - proxies = {} def manage(module, **params): - """Returns a proxy for module that automatically pools connections. + """Return a proxy for a DB-API module that automatically pools connections. Given a DB-API 2.0 module and pool management parameters, returns a proxy for the module that will automatically pool connections, @@ -127,7 +126,7 @@ class Pool(object): self._on_connect = [] self._on_checkout = [] self._on_checkin = [] - + if listeners: for l in listeners: self.add_listener(l) @@ -137,20 +136,23 @@ class Pool(object): def create_connection(self): return _ConnectionRecord(self) - + def recreate(self): - """return a new instance of this Pool's class with identical creation arguments.""" + """Return a new instance with identical creation arguments.""" + raise NotImplementedError() def dispose(self): - """dispose of this pool. - - this method leaves the possibility of checked-out connections remaining opened, - so it is advised to not reuse the pool once dispose() is called, and to instead - use a new pool constructed by the recreate() method. + """Dispose of this pool. + + This method leaves the possibility of checked-out connections + remaining open, It is advised to not reuse the pool once dispose() + is called, and to instead use a new pool constructed by the + recreate() method. """ + raise NotImplementedError() - + def connect(self): if not self._use_threadlocal: return _ConnectionFairy(self).checkout() @@ -257,7 +259,7 @@ class _ConnectionRecord(object): if self.__pool._should_log_info: self.__pool.log("Error on connect(): %s" % (str(e))) raise - + def _finalize_fairy(connection, connection_record, pool, ref=None): if ref is not None and connection_record.backref is not ref: return @@ -280,7 +282,7 @@ def _finalize_fairy(connection, connection_record, pool, ref=None): for l in pool._on_checkin: l.checkin(connection, connection_record) pool.return_conn(connection_record) - + class _ConnectionFairy(object): """Proxies a DB-API connection and provides return-on-dereference support.""" @@ -297,14 +299,14 @@ class _ConnectionFairy(object): raise if self._pool._should_log_info: self._pool.log("Connection %s checked out from pool" % repr(self.connection)) - + _logger = property(lambda self: self._pool.logger) - + is_valid = property(lambda self:self.connection is not None) - + def _get_properties(self): """A property collection unique to this DB-API connection.""" - + try: return self._connection_record.properties except AttributeError: @@ -316,13 +318,14 @@ class _ConnectionFairy(object): self._detatched_properties = value = {} return value properties = property(_get_properties) - + def invalidate(self, e=None): """Mark this connection as invalidated. - + The connection will be immediately closed. The containing ConnectionRecord will create a new connection when next used. """ + if self.connection is None: raise exceptions.InvalidRequestError("This connection is closed") if self._connection_record is not None: @@ -371,18 +374,17 @@ class _ConnectionFairy(object): def detach(self): """Separate this connection from its Pool. - - This means that the connection will no longer be returned to - the pool when closed, and will instead be literally closed. - The containing ConnectionRecord is separated from the DB-API - connection, and will create a new connection when next used. - - Note that any overall connection limiting constraints imposed - by a Pool implementation may be violated after a detach, as - the detached connection is removed from the pool's knowledge - and control. + + This means that the connection will no longer be returned to the + pool when closed, and will instead be literally closed. The + containing ConnectionRecord is separated from the DB-API connection, + and will create a new connection when next used. + + Note that any overall connection limiting constraints imposed by a + Pool implementation may be violated after a detach, as the detached + connection is removed from the pool's knowledge and control. """ - + if self._connection_record is not None: self._connection_record.connection = None self._connection_record.backref = None @@ -408,7 +410,7 @@ class _CursorFairy(object): def invalidate(self, e=None): self.__parent.invalidate(e=e) - + def close(self): try: self.cursor.close() @@ -421,14 +423,14 @@ class _CursorFairy(object): return getattr(self.cursor, key) class SingletonThreadPool(Pool): - """Maintains a single connection per thread. + """A Pool that maintains one connection per thread. - Maintains one connection per each thread, never moving a - connection to a thread other than the one which it was created in. + Maintains one connection per each thread, never moving a connection to a + thread other than the one which it was created in. - This is used for SQLite, which both does not handle multithreading - by default, and also requires a singleton connection if a :memory: - database is being used. + This is used for SQLite, which both does not handle multithreading by + default, and also requires a singleton connection if a :memory: database + is being used. Options are the same as those of Pool, as well as: @@ -444,11 +446,11 @@ class SingletonThreadPool(Pool): def recreate(self): self.log("Pool recreating") - return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal) - + return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners) + def dispose(self): """Dispose of this pool. - + this method leaves the possibility of checked-out connections remaining opened, so it is advised to not reuse the pool once dispose() is called, and to instead use a new pool constructed @@ -461,7 +463,7 @@ class SingletonThreadPool(Pool): except (SystemExit, KeyboardInterrupt): raise except: - # sqlite won't even let you close a conn from a thread + # sqlite won't even let you close a conn from a thread # that didn't create it pass del self._conns[key] @@ -498,7 +500,7 @@ class SingletonThreadPool(Pool): return c class QueuePool(Pool): - """Use ``Queue.Queue`` to maintain a fixed-size list of connections. + """A Pool that imposes a limit on the number of open connections. Arguments include all those used by the base Pool class, as well as: @@ -538,7 +540,7 @@ class QueuePool(Pool): def recreate(self): self.log("Pool recreating") - return QueuePool(self._creator, pool_size=self._pool.maxsize, max_overflow=self._max_overflow, timeout=self._timeout, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal) + return QueuePool(self._creator, pool_size=self._pool.maxsize, max_overflow=self._max_overflow, timeout=self._timeout, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners) def do_return_conn(self, conn): try: @@ -609,10 +611,10 @@ class QueuePool(Pool): return self._pool.maxsize - self._pool.qsize() + self._overflow class NullPool(Pool): - """A Pool implementation which does not pool connections. + """A Pool which does not pool connections. - Instead it literally opens and closes the underlying DB-API - connection per each connection open/close. + Instead it literally opens and closes the underlying DB-API connection + per each connection open/close. """ def status(self): @@ -628,8 +630,7 @@ class NullPool(Pool): return self.create_connection() class StaticPool(Pool): - """A Pool implementation which stores exactly one connection that is - returned for all requests.""" + """A Pool of exactly one connection, used for all requests.""" def __init__(self, creator, **params): Pool.__init__(self, creator, **params) @@ -650,15 +651,14 @@ class StaticPool(Pool): def do_get(self): return self.connection - - + + class AssertionPool(Pool): - """A Pool implementation that allows at most one checked out - connection at a time. + """A Pool that allows at most one checked out connection at any given time. - This will raise an exception if more than one connection is - checked out at a time. Useful for debugging code that is using - more connections than desired. + This will raise an exception if more than one connection is checked out + at a time. Useful for debugging code that is using more connections + than desired. """ ## TODO: modify this to handle an arbitrary connection count. @@ -688,19 +688,21 @@ class AssertionPool(Pool): return c class _DBProxy(object): - """Proxy a DB-API 2.0 connect() call to a pooled connection keyed to - the specific connect parameters. Other attributes are proxied - through via __getattr__. + """Layers connection pooling behavior on top of a standard DB-API module. + + Proxies a DB-API 2.0 connect() call to a connection pool keyed to the + specific connect parameters. Other functions and attributes are delegated + to the underlying DB-API module. """ - def __init__(self, module, poolclass = QueuePool, **params): - """Initialize a new proxy. + def __init__(self, module, poolclass=QueuePool, **params): + """Initializes a new proxy. module - a DB-API 2.0 module. + a DB-API 2.0 module poolclass - a Pool class, defaulting to QueuePool. + a Pool class, defaulting to QueuePool Other parameters are sent to the Pool object's constructor. """ @@ -732,16 +734,14 @@ class _DBProxy(object): def connect(self, *args, **params): """Activate a connection to the database. - Connect to the database using this DBProxy's module and the - given connect arguments. If the arguments match an existing - pool, the connection will be returned from the pool's current - thread-local connection instance, or if there is no - thread-local connection instance it will be checked out from - the set of pooled connections. + Connect to the database using this DBProxy's module and the given + connect arguments. If the arguments match an existing pool, the + connection will be returned from the pool's current thread-local + connection instance, or if there is no thread-local connection + instance it will be checked out from the set of pooled connections. - If the pool has no available connections and allows new - connections to be created, a new database connection will be - made. + If the pool has no available connections and allows new connections + to be created, a new database connection will be made. """ return self.get_pool(*args, **params).connect() diff --git a/test/engine/pool.py b/test/engine/pool.py index 658d682c21..9c957ddafb 100644 --- a/test/engine/pool.py +++ b/test/engine/pool.py @@ -32,19 +32,19 @@ class MockCursor(object): def close(self): pass mock_dbapi = MockDBAPI() - + class PoolTest(PersistTest): - + def setUp(self): pool.clear_managers() def testmanager(self): manager = pool.manage(mock_dbapi, use_threadlocal=True) - + connection = manager.connect('foo.db') connection2 = manager.connect('foo.db') connection3 = manager.connect('bar.db') - + print "connection " + repr(connection) self.assert_(connection.cursor() is not None) self.assert_(connection is connection2) @@ -57,10 +57,10 @@ class PoolTest(PersistTest): connection = manager.connect(None) except: pass - + def testnonthreadlocalmanager(self): manager = pool.manage(mock_dbapi, use_threadlocal = False) - + connection = manager.connect('foo.db') connection2 = manager.connect('foo.db') @@ -77,12 +77,12 @@ class PoolTest(PersistTest): def _do_testqueuepool(self, useclose=False): p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = False) - + def status(pool): tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout()) print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup return tup - + c1 = p.connect() self.assert_(status(p) == (3,0,-2,1)) c2 = p.connect() @@ -117,7 +117,7 @@ class PoolTest(PersistTest): else: c2 = None self.assert_(status(p) == (3, 2, 0, 1)) - + def test_timeout(self): p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2) c1 = p.connect() @@ -148,7 +148,7 @@ class PoolTest(PersistTest): continue time.sleep(4) c1.close() - + threads = [] for i in xrange(10): th = threading.Thread(target=checkout) @@ -156,17 +156,17 @@ class PoolTest(PersistTest): threads.append(th) for th in threads: th.join() - + print timeouts assert len(timeouts) > 0 for t in timeouts: assert abs(t - 3) < 1, "Not all timeouts were 3 seconds: " + repr(timeouts) - + def _test_overflow(self, thread_count, max_overflow): def creator(): time.sleep(.05) return mock_dbapi.connect('foo.db') - + p = pool.QueuePool(creator=creator, pool_size=3, timeout=2, max_overflow=max_overflow) @@ -196,7 +196,7 @@ class PoolTest(PersistTest): def test_max_overflow(self): self._test_overflow(40, 5) - + def test_mixed_close(self): p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True) c1 = p.connect() @@ -220,7 +220,7 @@ class PoolTest(PersistTest): assert p.checkedout() == 0 c3 = p.connect() assert c3 is not None - + def test_trick_the_counter(self): """this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an @@ -239,7 +239,7 @@ class PoolTest(PersistTest): def test_recycle(self): p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False, recycle=3) - + c1 = p.connect() c_id = id(c1.connection) c1.close() @@ -249,7 +249,7 @@ class PoolTest(PersistTest): time.sleep(4) c3= p.connect() assert id(c3.connection) != c_id - + def test_invalidate(self): dbapi = MockDBAPI() p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) @@ -260,7 +260,7 @@ class PoolTest(PersistTest): assert c1.connection.id == c_id c1.invalidate() c1 = None - + c1 = p.connect() assert c1.connection.id != c_id @@ -271,9 +271,9 @@ class PoolTest(PersistTest): assert p2.size() == 1 assert p2._use_threadlocal is False assert p2._max_overflow == 0 - + def test_reconnect(self): - """tests reconnect operations at the pool level. SA's engine/dialect includes another + """tests reconnect operations at the pool level. SA's engine/dialect includes another layer of reconnect support for 'database was lost' errors.""" dbapi = MockDBAPI() p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) @@ -313,7 +313,7 @@ class PoolTest(PersistTest): assert not con.closed c1.close() assert con.closed - + def test_threadfairy(self): p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True) c1 = p.connect() @@ -331,7 +331,7 @@ class PoolTest(PersistTest): for p in ( pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True), pool.SingletonThreadPool(creator = lambda: mock_dbapi.connect('foo.db'), use_threadlocal = True) - ): + ): c1 = p.connect() c2 = p.connect() self.assert_(c1 is c2) @@ -359,7 +359,7 @@ class PoolTest(PersistTest): c1.close() c1 = c2 = c3 = None - + # extra tests with QueuePool to ensure connections get __del__()ed when dereferenced if isinstance(p, pool.QueuePool): self.assert_(p.checkedout() == 0) @@ -459,11 +459,12 @@ class PoolTest(PersistTest): #, pool_size=1, max_overflow=0, **kw) def assert_listeners(p, total, conn, cout, cin): - self.assert_(len(p.listeners) == total) - self.assert_(len(p._on_connect) == conn) - self.assert_(len(p._on_checkout) == cout) - self.assert_(len(p._on_checkin) == cin) - + for instance in (p, p.recreate()): + self.assert_(len(instance.listeners) == total) + self.assert_(len(instance._on_connect) == conn) + self.assert_(len(instance._on_checkout) == cout) + self.assert_(len(instance._on_checkin) == cin) + p = _pool() assert_listeners(p, 0, 0, 0, 0) @@ -495,7 +496,7 @@ class PoolTest(PersistTest): snoop.clear() - # this one depends on immediate gc + # this one depends on immediate gc c = p.connect() cc = c.connection snoop.assert_in(cc, False, True, False) @@ -550,7 +551,7 @@ class PoolTest(PersistTest): def tearDown(self): pool.clear_managers() - - + + if __name__ == "__main__": - testbase.main() + testbase.main()