for contextual connection, unique_connection() for non-contextual.
- Pool use_threadlocal defaults to True, can be set to false at create_engine()
level with pool_threadlocal=False
- made all logger statements in pool conditional based on a flag calcualted once.
- chagned WeakValueDictionary() used for "threadlocal" pool to be a regular dict
referencing weakref objects. WVD had a lot of overhead, apparently. *CAUTION* -
im pretty confident about this change, as the threadlocal dict gets explicitly managed
anyway, tests pass with PG etc., but keep a close eye on this one regardless.
return self.__engine.dialect.create_execution_context(connection=self, **kwargs)
def __execute_raw(self, context):
- if logging.is_info_enabled(self.__engine.logger):
+ if self.__engine._should_log:
self.__engine.logger.info(context.statement)
self.__engine.logger.info(repr(context.parameters))
if context.parameters is not None and isinstance(context.parameters, list) and len(context.parameters) > 0 and isinstance(context.parameters[0], (list, tuple, dict)):
self._dialect=dialect
self.echo = echo
self.logger = logging.instance_logger(self)
+ self._should_log = logging.is_info_enabled(self.logger)
name = property(lambda s:sys.modules[s.dialect.__module__].descriptor()['name'], doc="String name of the [sqlalchemy.engine#Dialect] in use by this ``Engine``.")
engine = property(lambda s:s)
This Connection is meant to be used by the various "auto-connecting" operations.
"""
- return Connection(self, close_with_result=close_with_result, **kwargs)
+ return Connection(self, self.pool.connect(), close_with_result=close_with_result, **kwargs)
def table_names(self, schema=None, connection=None):
"""Return a list of all table names available in the database.
def raw_connection(self):
"""Return a DB-API connection."""
- return self.pool.connect()
+ return self.pool.unique_connection()
def log(self, msg):
"""Log a message using this SQLEngine's logger stream."""
self.dialect = context.dialect
self.closed = False
self.cursor = context.cursor
- self.__echo = logging.is_debug_enabled(context.engine.logger)
+ self.__echo = context.engine._should_log
self._process_row = self._row_processor()
if context.is_select():
self._init_metadata()
# the arguments
translate = {'echo': 'echo_pool',
'timeout': 'pool_timeout',
- 'recycle': 'pool_recycle'}
+ 'recycle': 'pool_recycle',
+ 'use_threadlocal':'pool_threadlocal'}
for k in util.get_cls_kwargs(poolclass):
tk = translate.get(k, k)
if tk in kwargs:
pool_args[k] = kwargs.pop(tk)
- pool_args['use_threadlocal'] = self.pool_threadlocal()
+ pool_args.setdefault('use_threadlocal', self.pool_threadlocal())
pool = poolclass(creator, **pool_args)
else:
if isinstance(pool, poollib._DBProxy):
try:
return self.__transaction._increment_connect()
except AttributeError:
- return TLConnection(self, close_with_result=close_with_result)
+ return TLConnection(self, self.engine.pool.connect(), close_with_result=close_with_result)
def reset(self):
try:
class TLConnection(base.Connection):
- def __init__(self, session, close_with_result):
- base.Connection.__init__(self, session.engine,
- close_with_result=close_with_result)
+ def __init__(self, session, connection, close_with_result):
+ base.Connection.__init__(self, session.engine, connection, close_with_result=close_with_result)
self.__session = session
self.__opencount = 1
super(TLEngine, self).__init__(*args, **kwargs)
self.context = util.ThreadLocal()
- def raw_connection(self):
- """Return a DB-API connection."""
-
- return self.pool.connect()
-
- def connect(self, **kwargs):
- """Return a Connection that is not thread-locally scoped.
-
- This is the equivalent to calling ``connect()`` on a
- base.Engine.
- """
-
- return base.Connection(self, self.pool.unique_connection())
-
def _session(self):
if not hasattr(self.context, 'session'):
self.context.session = TLSession(self)
the pool.
"""
- def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False,
+ def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=True,
listeners=None):
self.logger = logging.instance_logger(self)
- self._threadconns = weakref.WeakValueDictionary()
+ self._threadconns = {}
self._creator = creator
self._recycle = recycle
self._use_threadlocal = use_threadlocal
self._on_connect = []
self._on_checkout = []
self._on_checkin = []
+ self._should_log = logging.is_info_enabled(self.logger)
+
if listeners:
for l in listeners:
self.add_listener(l)
return _ConnectionFairy(self).checkout()
try:
- return self._threadconns[thread.get_ident()].checkout()
+ return self._threadconns[thread.get_ident()]().checkout()
except KeyError:
agent = _ConnectionFairy(self)
- self._threadconns[thread.get_ident()] = agent
+ self._threadconns[thread.get_ident()] = weakref.ref(agent)
return agent.checkout()
def return_conn(self, agent):
def close(self):
if self.connection is not None:
- self.__pool.log("Closing connection %s" % repr(self.connection))
+ if self.__pool._should_log:
+ self.__pool.log("Closing connection %s" % repr(self.connection))
self.connection.close()
def invalidate(self, e=None):
- if e is not None:
- self.__pool.log("Invalidate connection %s (reason: %s:%s)" % (repr(self.connection), e.__class__.__name__, str(e)))
- else:
- self.__pool.log("Invalidate connection %s" % repr(self.connection))
+ if self.__pool._should_log:
+ if e is not None:
+ self.__pool.log("Invalidate connection %s (reason: %s:%s)" % (repr(self.connection), e.__class__.__name__, str(e)))
+ else:
+ self.__pool.log("Invalidate connection %s" % repr(self.connection))
self.__close()
self.connection = None
for l in self.__pool._on_connect:
l.connect(self.connection, self)
elif (self.__pool._recycle > -1 and time.time() - self.starttime > self.__pool._recycle):
- self.__pool.log("Connection %s exceeded timeout; recycling" % repr(self.connection))
+ if self.__pool._should_log:
+ self.__pool.log("Connection %s exceeded timeout; recycling" % repr(self.connection))
self.__close()
self.connection = self.__connect()
self.properties.clear()
def __close(self):
try:
- self.__pool.log("Closing connection %s" % (repr(self.connection)))
+ if self.__pool._should_log:
+ self.__pool.log("Closing connection %s" % (repr(self.connection)))
self.connection.close()
except Exception, e:
- self.__pool.log("Connection %s threw an error on close: %s" % (repr(self.connection), str(e)))
+ if self.__pool._should_log:
+ self.__pool.log("Connection %s threw an error on close: %s" % (repr(self.connection), str(e)))
if isinstance(e, (SystemExit, KeyboardInterrupt)):
raise
try:
self.starttime = time.time()
connection = self.__pool._creator()
- self.__pool.log("Created new connection %s" % repr(connection))
+ if self.__pool._should_log:
+ self.__pool.log("Created new connection %s" % repr(connection))
return connection
except Exception, e:
- self.__pool.log("Error on connect(): %s" % (str(e)))
+ if self.__pool._should_log:
+ self.__pool.log("Error on connect(): %s" % (str(e)))
raise
class _ConnectionFairy(object):
self.connection = None # helps with endless __getattr__ loops later on
self._connection_record = None
raise
- if self._pool.echo:
+ if self._pool._should_log:
self._pool.log("Connection %s checked out from pool" % repr(self.connection))
_logger = property(lambda self: self._pool.logger)
l.checkout(self.connection, self._connection_record, self)
return self
except exceptions.DisconnectionError, e:
- self._pool.log(
+ if self._pool._should_log:
+ self._pool.log(
"Disconnection detected on checkout: %s" % (str(e)))
self._connection_record.invalidate(e)
self.connection = self._connection_record.get_connection()
attempts -= 1
- self._pool.log("Reconnection attempts exhausted on checkout")
+ if self._pool._should_log:
+ self._pool.log("Reconnection attempts exhausted on checkout")
self.invalidate()
raise exceptions.InvalidRequestError("This connection is closed")
if isinstance(e, (SystemExit, KeyboardInterrupt)):
raise
if self._connection_record is not None:
- if self._pool.echo:
+ if self._pool._should_log:
self._pool.log("Connection %s being returned to pool" % repr(self.connection))
if self._pool._on_checkin:
for l in self._pool._on_checkin:
break
self._overflow = 0 - self.size()
- self.log("Pool disposed. " + self.status())
+ if self._should_log:
+ self.log("Pool disposed. " + self.status())
def status(self):
tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout())
assert p.checkedout() == 1
c1 = None
assert p.checkedout() == 0
+
+ def test_weakref_kaboom(self):
+ p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True)
+ c1 = p.connect()
+ c2 = p.connect()
+ c1.close()
+ c2 = None
+ del c1
+ del c2
+ gc.collect()
+ assert p.checkedout() == 0
+ c3 = p.connect()
+ assert c3 is not None
def test_trick_the_counter(self):
"""this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread
def test_properties(self):
dbapi = MockDBAPI()
p = pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
- pool_size=1, max_overflow=0)
+ pool_size=1, max_overflow=0, use_threadlocal=False)
c = p.connect()
self.assert_(not c.properties)
pass
def _pool(**kw):
- return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
+ return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), use_threadlocal=False, **kw)
#, pool_size=1, max_overflow=0, **kw)
def assert_listeners(p, total, conn, cout, cin):