from sqlalchemy import queue as Queue
from sqlalchemy.util import thread, threading, pickle
-
proxies = {}
def manage(module, **params):
- """Returns a proxy for module that automatically pools connections.
+ """Return a proxy for a DB-API module that automatically pools connections.
Given a DB-API 2.0 module and pool management parameters, returns
a proxy for the module that will automatically pool connections,
self._on_connect = []
self._on_checkout = []
self._on_checkin = []
-
+
if listeners:
for l in listeners:
self.add_listener(l)
def create_connection(self):
return _ConnectionRecord(self)
-
+
def recreate(self):
- """return a new instance of this Pool's class with identical creation arguments."""
+ """Return a new instance with identical creation arguments."""
+
raise NotImplementedError()
def dispose(self):
- """dispose of this pool.
-
- this method leaves the possibility of checked-out connections remaining opened,
- so it is advised to not reuse the pool once dispose() is called, and to instead
- use a new pool constructed by the recreate() method.
+ """Dispose of this pool.
+
+ This method leaves the possibility of checked-out connections
+ remaining open, It is advised to not reuse the pool once dispose()
+ is called, and to instead use a new pool constructed by the
+ recreate() method.
"""
+
raise NotImplementedError()
-
+
def connect(self):
if not self._use_threadlocal:
return _ConnectionFairy(self).checkout()
if self.__pool._should_log_info:
self.__pool.log("Error on connect(): %s" % (str(e)))
raise
-
+
def _finalize_fairy(connection, connection_record, pool, ref=None):
if ref is not None and connection_record.backref is not ref:
return
for l in pool._on_checkin:
l.checkin(connection, connection_record)
pool.return_conn(connection_record)
-
+
class _ConnectionFairy(object):
"""Proxies a DB-API connection and provides return-on-dereference support."""
raise
if self._pool._should_log_info:
self._pool.log("Connection %s checked out from pool" % repr(self.connection))
-
+
_logger = property(lambda self: self._pool.logger)
-
+
is_valid = property(lambda self:self.connection is not None)
-
+
def _get_properties(self):
"""A property collection unique to this DB-API connection."""
-
+
try:
return self._connection_record.properties
except AttributeError:
self._detatched_properties = value = {}
return value
properties = property(_get_properties)
-
+
def invalidate(self, e=None):
"""Mark this connection as invalidated.
-
+
The connection will be immediately closed. The containing
ConnectionRecord will create a new connection when next used.
"""
+
if self.connection is None:
raise exceptions.InvalidRequestError("This connection is closed")
if self._connection_record is not None:
def detach(self):
"""Separate this connection from its Pool.
-
- This means that the connection will no longer be returned to
- the pool when closed, and will instead be literally closed.
- The containing ConnectionRecord is separated from the DB-API
- connection, and will create a new connection when next used.
-
- Note that any overall connection limiting constraints imposed
- by a Pool implementation may be violated after a detach, as
- the detached connection is removed from the pool's knowledge
- and control.
+
+ This means that the connection will no longer be returned to the
+ pool when closed, and will instead be literally closed. The
+ containing ConnectionRecord is separated from the DB-API connection,
+ and will create a new connection when next used.
+
+ Note that any overall connection limiting constraints imposed by a
+ Pool implementation may be violated after a detach, as the detached
+ connection is removed from the pool's knowledge and control.
"""
-
+
if self._connection_record is not None:
self._connection_record.connection = None
self._connection_record.backref = None
def invalidate(self, e=None):
self.__parent.invalidate(e=e)
-
+
def close(self):
try:
self.cursor.close()
return getattr(self.cursor, key)
class SingletonThreadPool(Pool):
- """Maintains a single connection per thread.
+ """A Pool that maintains one connection per thread.
- Maintains one connection per each thread, never moving a
- connection to a thread other than the one which it was created in.
+ Maintains one connection per each thread, never moving a connection to a
+ thread other than the one which it was created in.
- This is used for SQLite, which both does not handle multithreading
- by default, and also requires a singleton connection if a :memory:
- database is being used.
+ This is used for SQLite, which both does not handle multithreading by
+ default, and also requires a singleton connection if a :memory: database
+ is being used.
Options are the same as those of Pool, as well as:
def recreate(self):
self.log("Pool recreating")
- return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal)
-
+ return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners)
+
def dispose(self):
"""Dispose of this pool.
-
+
this method leaves the possibility of checked-out connections
remaining opened, so it is advised to not reuse the pool once
dispose() is called, and to instead use a new pool constructed
except (SystemExit, KeyboardInterrupt):
raise
except:
- # sqlite won't even let you close a conn from a thread
+ # sqlite won't even let you close a conn from a thread
# that didn't create it
pass
del self._conns[key]
return c
class QueuePool(Pool):
- """Use ``Queue.Queue`` to maintain a fixed-size list of connections.
+ """A Pool that imposes a limit on the number of open connections.
Arguments include all those used by the base Pool class, as well
as:
def recreate(self):
self.log("Pool recreating")
- return QueuePool(self._creator, pool_size=self._pool.maxsize, max_overflow=self._max_overflow, timeout=self._timeout, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal)
+ return QueuePool(self._creator, pool_size=self._pool.maxsize, max_overflow=self._max_overflow, timeout=self._timeout, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners)
def do_return_conn(self, conn):
try:
return self._pool.maxsize - self._pool.qsize() + self._overflow
class NullPool(Pool):
- """A Pool implementation which does not pool connections.
+ """A Pool which does not pool connections.
- Instead it literally opens and closes the underlying DB-API
- connection per each connection open/close.
+ Instead it literally opens and closes the underlying DB-API connection
+ per each connection open/close.
"""
def status(self):
return self.create_connection()
class StaticPool(Pool):
- """A Pool implementation which stores exactly one connection that is
- returned for all requests."""
+ """A Pool of exactly one connection, used for all requests."""
def __init__(self, creator, **params):
Pool.__init__(self, creator, **params)
def do_get(self):
return self.connection
-
-
+
+
class AssertionPool(Pool):
- """A Pool implementation that allows at most one checked out
- connection at a time.
+ """A Pool that allows at most one checked out connection at any given time.
- This will raise an exception if more than one connection is
- checked out at a time. Useful for debugging code that is using
- more connections than desired.
+ This will raise an exception if more than one connection is checked out
+ at a time. Useful for debugging code that is using more connections
+ than desired.
"""
## TODO: modify this to handle an arbitrary connection count.
return c
class _DBProxy(object):
- """Proxy a DB-API 2.0 connect() call to a pooled connection keyed to
- the specific connect parameters. Other attributes are proxied
- through via __getattr__.
+ """Layers connection pooling behavior on top of a standard DB-API module.
+
+ Proxies a DB-API 2.0 connect() call to a connection pool keyed to the
+ specific connect parameters. Other functions and attributes are delegated
+ to the underlying DB-API module.
"""
- def __init__(self, module, poolclass = QueuePool, **params):
- """Initialize a new proxy.
+ def __init__(self, module, poolclass=QueuePool, **params):
+ """Initializes a new proxy.
module
- a DB-API 2.0 module.
+ a DB-API 2.0 module
poolclass
- a Pool class, defaulting to QueuePool.
+ a Pool class, defaulting to QueuePool
Other parameters are sent to the Pool object's constructor.
"""
def connect(self, *args, **params):
"""Activate a connection to the database.
- Connect to the database using this DBProxy's module and the
- given connect arguments. If the arguments match an existing
- pool, the connection will be returned from the pool's current
- thread-local connection instance, or if there is no
- thread-local connection instance it will be checked out from
- the set of pooled connections.
+ Connect to the database using this DBProxy's module and the given
+ connect arguments. If the arguments match an existing pool, the
+ connection will be returned from the pool's current thread-local
+ connection instance, or if there is no thread-local connection
+ instance it will be checked out from the set of pooled connections.
- If the pool has no available connections and allows new
- connections to be created, a new database connection will be
- made.
+ If the pool has no available connections and allows new connections
+ to be created, a new database connection will be made.
"""
return self.get_pool(*args, **params).connect()
def close(self):
pass
mock_dbapi = MockDBAPI()
-
+
class PoolTest(PersistTest):
-
+
def setUp(self):
pool.clear_managers()
def testmanager(self):
manager = pool.manage(mock_dbapi, use_threadlocal=True)
-
+
connection = manager.connect('foo.db')
connection2 = manager.connect('foo.db')
connection3 = manager.connect('bar.db')
-
+
print "connection " + repr(connection)
self.assert_(connection.cursor() is not None)
self.assert_(connection is connection2)
connection = manager.connect(None)
except:
pass
-
+
def testnonthreadlocalmanager(self):
manager = pool.manage(mock_dbapi, use_threadlocal = False)
-
+
connection = manager.connect('foo.db')
connection2 = manager.connect('foo.db')
def _do_testqueuepool(self, useclose=False):
p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = False)
-
+
def status(pool):
tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout())
print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup
return tup
-
+
c1 = p.connect()
self.assert_(status(p) == (3,0,-2,1))
c2 = p.connect()
else:
c2 = None
self.assert_(status(p) == (3, 2, 0, 1))
-
+
def test_timeout(self):
p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2)
c1 = p.connect()
continue
time.sleep(4)
c1.close()
-
+
threads = []
for i in xrange(10):
th = threading.Thread(target=checkout)
threads.append(th)
for th in threads:
th.join()
-
+
print timeouts
assert len(timeouts) > 0
for t in timeouts:
assert abs(t - 3) < 1, "Not all timeouts were 3 seconds: " + repr(timeouts)
-
+
def _test_overflow(self, thread_count, max_overflow):
def creator():
time.sleep(.05)
return mock_dbapi.connect('foo.db')
-
+
p = pool.QueuePool(creator=creator,
pool_size=3, timeout=2,
max_overflow=max_overflow)
def test_max_overflow(self):
self._test_overflow(40, 5)
-
+
def test_mixed_close(self):
p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True)
c1 = p.connect()
assert p.checkedout() == 0
c3 = p.connect()
assert c3 is not None
-
+
def test_trick_the_counter(self):
"""this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread
with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an
def test_recycle(self):
p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False, recycle=3)
-
+
c1 = p.connect()
c_id = id(c1.connection)
c1.close()
time.sleep(4)
c3= p.connect()
assert id(c3.connection) != c_id
-
+
def test_invalidate(self):
dbapi = MockDBAPI()
p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
assert c1.connection.id == c_id
c1.invalidate()
c1 = None
-
+
c1 = p.connect()
assert c1.connection.id != c_id
assert p2.size() == 1
assert p2._use_threadlocal is False
assert p2._max_overflow == 0
-
+
def test_reconnect(self):
- """tests reconnect operations at the pool level. SA's engine/dialect includes another
+ """tests reconnect operations at the pool level. SA's engine/dialect includes another
layer of reconnect support for 'database was lost' errors."""
dbapi = MockDBAPI()
p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
assert not con.closed
c1.close()
assert con.closed
-
+
def test_threadfairy(self):
p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True)
c1 = p.connect()
for p in (
pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True),
pool.SingletonThreadPool(creator = lambda: mock_dbapi.connect('foo.db'), use_threadlocal = True)
- ):
+ ):
c1 = p.connect()
c2 = p.connect()
self.assert_(c1 is c2)
c1.close()
c1 = c2 = c3 = None
-
+
# extra tests with QueuePool to ensure connections get __del__()ed when dereferenced
if isinstance(p, pool.QueuePool):
self.assert_(p.checkedout() == 0)
#, pool_size=1, max_overflow=0, **kw)
def assert_listeners(p, total, conn, cout, cin):
- self.assert_(len(p.listeners) == total)
- self.assert_(len(p._on_connect) == conn)
- self.assert_(len(p._on_checkout) == cout)
- self.assert_(len(p._on_checkin) == cin)
-
+ for instance in (p, p.recreate()):
+ self.assert_(len(instance.listeners) == total)
+ self.assert_(len(instance._on_connect) == conn)
+ self.assert_(len(instance._on_checkout) == cout)
+ self.assert_(len(instance._on_checkin) == cin)
+
p = _pool()
assert_listeners(p, 0, 0, 0, 0)
snoop.clear()
- # this one depends on immediate gc
+ # this one depends on immediate gc
c = p.connect()
cc = c.connection
snoop.assert_in(cc, False, True, False)
def tearDown(self):
pool.clear_managers()
-
-
+
+
if __name__ == "__main__":
- testbase.main()
+ testbase.main()