proxies = {}
def manage(module, **params):
- """Return a proxy for a DB-API module that automatically pools connections.
+ """Return a proxy for a DB-API module that automatically
+ pools connections.
Given a DB-API 2.0 module and pool management parameters, returns
a proxy for the module that will automatically pool connections,
class Pool(object):
"""Abstract base class for connection pools."""
- def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False,
- reset_on_return=True, listeners=None):
+ def __init__(self,
+ creator, recycle=-1, echo=None,
+ use_threadlocal=False,
+ reset_on_return=True, listeners=None):
"""
Construct a Pool.
if self.__pool._on_connect:
for l in self.__pool._on_connect:
l.connect(self.connection, self)
- elif (self.__pool._recycle > -1 and time.time() - self.starttime > self.__pool._recycle):
+ elif self.__pool._recycle > -1 and \
+ time.time() - self.starttime > self.__pool._recycle:
self.__pool.logger.info("Connection %r exceeded timeout; recycling",
self.connection)
self.__close()
try:
self.__pool.logger.debug("Closing connection %r", self.connection)
self.connection.close()
+ except (SystemExit, KeyboardInterrupt):
+ raise
except Exception, e:
self.__pool.logger.debug("Connection %r threw an error on close: %s",
self.connection, e)
- if isinstance(e, (SystemExit, KeyboardInterrupt)):
- raise
def __connect(self):
try:
connection_record.invalidate(e=e)
if isinstance(e, (SystemExit, KeyboardInterrupt)):
raise
+
if connection_record is not None:
connection_record.fairy = None
pool.logger.debug("Connection %r being returned to pool", connection)
class _ConnectionFairy(object):
"""Proxies a DB-API connection and provides return-on-dereference support."""
- __slots__ = '_pool', '__counter', 'connection', '_connection_record', '__weakref__', '_detached_info'
+ __slots__ = '_pool', '__counter', 'connection', \
+ '_connection_record', '__weakref__', '_detached_info'
def __init__(self, pool):
self._pool = pool
"""A Pool that imposes a limit on the number of open connections."""
def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30,
- **params):
+ **kw):
"""
Construct a QueuePool.
pool.
"""
- Pool.__init__(self, creator, **params)
+ Pool.__init__(self, creator, **kw)
self._pool = sqla_queue.Queue(pool_size)
self._overflow = 0 - pool_size
self._max_overflow = max_overflow
if not wait:
return self.do_get()
else:
- raise exc.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout))
+ raise exc.TimeoutError(
+ "QueuePool limit of size %d overflow %d reached, "
+ "connection timed out, timeout %d" %
+ (self.size(), self.overflow(), self._timeout))
if self._overflow_lock is not None:
self._overflow_lock.acquire()
self.logger.info("Pool disposed. %s", self.status())
def status(self):
- tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout())
- return "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup
+ return "Pool size: %d Connections in pool: %d "\
+ "Current Overflow: %d Current Checked out "\
+ "connections: %d" % (self.size(),
+ self.checkedin(),
+ self.overflow(),
+ self.checkedout())
def size(self):
return self._pool.maxsize
def do_get(self):
return self.connection
-
class AssertionPool(Pool):
"""A Pool that allows at most one checked out connection at any given time.
"""
- ## TODO: modify this to handle an arbitrary connection count.
-
def __init__(self, *args, **kw):
self._conn = None
self._checked_out = False
def recreate(self):
self.logger.info("Pool recreating")
- return AssertionPool(self._creator, echo=self.echo, listeners=self.listeners)
+ return AssertionPool(self._creator, echo=self.echo,
+ listeners=self.listeners)
def do_get(self):
if self._checked_out:
to the underlying DB-API module.
"""
- def __init__(self, module, poolclass=QueuePool, **params):
+ def __init__(self, module, poolclass=QueuePool, **kw):
"""Initializes a new proxy.
module
a Pool class, defaulting to QueuePool
Other parameters are sent to the Pool object's constructor.
+
"""
self.module = module
- self.params = params
+ self.kw = kw
self.poolclass = poolclass
self.pools = {}
self._create_pool_mutex = threading.Lock()
def __getattr__(self, key):
return getattr(self.module, key)
- def get_pool(self, *args, **params):
- key = self._serialize(*args, **params)
+ def get_pool(self, *args, **kw):
+ key = self._serialize(*args, **kw)
try:
return self.pools[key]
except KeyError:
self._create_pool_mutex.acquire()
try:
if key not in self.pools:
- pool = self.poolclass(lambda: self.module.connect(*args, **params), **self.params)
+ pool = self.poolclass(lambda: self.module.connect(*args, **kw), **self.kw)
self.pools[key] = pool
return pool
else:
finally:
self._create_pool_mutex.release()
- def connect(self, *args, **params):
+ def connect(self, *args, **kw):
"""Activate a connection to the database.
Connect to the database using this DBProxy's module and the given
If the pool has no available connections and allows new connections
to be created, a new database connection will be made.
+
"""
- return self.get_pool(*args, **params).connect()
+ return self.get_pool(*args, **kw).connect()
- def dispose(self, *args, **params):
- """Dispose the connection pool referenced by the given connect arguments."""
+ def dispose(self, *args, **kw):
+ """Dispose the pool referenced by the given connect arguments."""
- key = self._serialize(*args, **params)
+ key = self._serialize(*args, **kw)
try:
del self.pools[key]
except KeyError:
pass
- def _serialize(self, *args, **params):
- return pickle.dumps([args, params])
+ def _serialize(self, *args, **kw):
+ return pickle.dumps([args, kw])