SQLAlchemy connection pool.
"""
-import weakref, time
+import weakref, time, threading
from sqlalchemy import exc, log
from sqlalchemy import queue as Queue
def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False,
reset_on_return=True, listeners=None):
self.logger = log.instance_logger(self, echoflag=echo)
- # the WeakValueDictionary works more nicely than a regular dict of
- # weakrefs. the latter can pile up dead reference objects which don't
- # get cleaned out. WVD adds from 1-6 method calls to a checkout
- # operation.
- self._threadconns = weakref.WeakValueDictionary()
+ self._threadconns = threading.local()
self._creator = creator
self._recycle = recycle
self._use_threadlocal = use_threadlocal
return _ConnectionFairy(self).checkout()
try:
- return self._threadconns[thread.get_ident()].checkout()
- except KeyError:
+ return self._threadconns.current().checkout()
+ except AttributeError:
agent = _ConnectionFairy(self)
- self._threadconns[thread.get_ident()] = agent
+ self._threadconns.current = weakref.ref(agent)
return agent.checkout()
def return_conn(self, record):
- if self._use_threadlocal and thread.get_ident() in self._threadconns:
- del self._threadconns[thread.get_ident()]
+ if self._use_threadlocal and hasattr(self._threadconns, "current"):
+ del self._threadconns.current
self.do_return_conn(record)
def get(self):
self.__pool.log("Error on connect(): %s" % e)
raise
- properties = property(lambda self: self.info,
- doc="A synonym for .info, will be removed in 0.5.")
def _finalize_fairy(connection, connection_record, pool, ref=None):
if ref is not None and connection_record.backref is not ref:
self._pool.log("Connection %r checked out from pool" %
self.connection)
- _logger = property(lambda self: self._pool.logger)
+ @property
+ def _logger(self):
+ return self._pool.logger
- is_valid = property(lambda self:self.connection is not None)
+ @property
+ def is_valid(self):
+ return self.connection is not None
- def _get_info(self):
+ @property
+ def info(self):
"""An info collection unique to this DB-API connection."""
try:
except AttributeError:
self._detached_info = value = {}
return value
- info = property(_get_info)
- properties = property(_get_info)
def invalidate(self, e=None):
"""Mark this connection as invalidated.
def __init__(self, creator, pool_size=5, **params):
params['use_threadlocal'] = True
Pool.__init__(self, creator, **params)
- self._conns = {}
+ self._conn = threading.local()
+ self._all_conns = set()
self.size = pool_size
def recreate(self):
self.log("Pool recreating")
- return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners)
+ return SingletonThreadPool(self._creator,
+ pool_size=self.size,
+ recycle=self._recycle,
+ echo=self._should_log_info,
+ use_threadlocal=self._use_threadlocal,
+ listeners=self.listeners)
def dispose(self):
- """Dispose of this pool.
+ """Dispose of this pool."""
- this method leaves the possibility of checked-out connections
- remaining opened, so it is advised to not reuse the pool once
- dispose() is called, and to instead use a new pool constructed
- by the recreate() method.
- """
-
- for key, conn in self._conns.items():
+ for conn in self._all_conns:
try:
conn.close()
except (SystemExit, KeyboardInterrupt):
raise
except:
- # sqlite won't even let you close a conn from a thread
+ # pysqlite won't even let you close a conn from a thread
# that didn't create it
pass
- del self._conns[key]
-
+
+ self._all_conns.clear()
+
def dispose_local(self):
- try:
- del self._conns[thread.get_ident()]
- except KeyError:
- pass
+ if hasattr(self._conn, 'current'):
+ conn = self._conn.current()
+ self._all_conns.discard(conn)
+ del self._conn.current
def cleanup(self):
- for key in self._conns.keys():
- try:
- del self._conns[key]
- except KeyError:
- pass
- if len(self._conns) <= self.size:
+ for conn in list(self._all_conns):
+ self._all_conns.discard(conn)
+ if len(self._all_conns) <= self.size:
return
def status(self):
- return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns))
+ return "SingletonThreadPool id:%d size: %d" % (id(self), len(self._all_conns))
def do_return_conn(self, conn):
pass
def do_get(self):
try:
- return self._conns[thread.get_ident()]
- except KeyError:
+ return self._conn.current()
+ except AttributeError:
c = self.create_connection()
- self._conns[thread.get_ident()] = c
- if len(self._conns) > self.size:
+ self._conn.current = weakref.ref(c)
+ self._all_conns.add(c)
+ if len(self._all_conns) > self.size:
self.cleanup()
return c
self._list.remove(item[0])
return item
-
class OrderedSet(set):
def __init__(self, d=None):
set.__init__(self)
a callable that returns a new object to be placed in the registry
scopefunc
- a callable that will return a key to store/retrieve an object,
- defaults to ``thread.get_ident`` for thread-local objects. Use
- a value like ``lambda: True`` for application scope.
- """
+ a callable that will return a key to store/retrieve an object.
+ If None, ScopedRegistry uses a threading.local object instead.
- def __init__(self, createfunc, scopefunc=None):
- self.createfunc = createfunc
- if scopefunc is None:
- self.scopefunc = thread.get_ident
+ """
+ def __new__(cls, createfunc, scopefunc=None):
+ if not scopefunc:
+ return object.__new__(_TLocalRegistry)
else:
- self.scopefunc = scopefunc
+ return object.__new__(cls)
+
+ def __init__(self, createfunc, scopefunc):
+ self.createfunc = createfunc
+ self.scopefunc = scopefunc
self.registry = {}
def __call__(self):
- key = self._get_key()
+ key = self.scopefunc()
try:
return self.registry[key]
except KeyError:
return self.registry.setdefault(key, self.createfunc())
def has(self):
- return self._get_key() in self.registry
+ return self.scopefunc() in self.registry
def set(self, obj):
- self.registry[self._get_key()] = obj
+ self.registry[self.scopefunc()] = obj
def clear(self):
try:
- del self.registry[self._get_key()]
+ del self.registry[self.scopefunc()]
except KeyError:
pass
- def _get_key(self):
- return self.scopefunc()
+class _TLocalRegistry(ScopedRegistry):
+ def __init__(self, createfunc, scopefunc=None):
+ self.createfunc = createfunc
+ self.registry = threading.local()
+
+ def __call__(self):
+ try:
+ return self.registry.value
+ except AttributeError:
+ val = self.registry.value = self.createfunc()
+ return val
+ def has(self):
+ return hasattr(self.registry, "value")
+
+ def set(self, obj):
+ self.registry.value = obj
+
+ def clear(self):
+ del self.registry.value
class WeakCompositeKey(object):
"""an weak-referencable, hashable collection which is strongly referenced
pool_size=3, max_overflow=-1,
use_threadlocal=True)
- # the WeakValueDictionary used for the pool's "threadlocal" idea adds 1-6
- # method calls to each of these. however its just a lot easier stability
- # wise than dealing with a strongly referencing dict of weakrefs.
- # [ticket:754] immediately got opened when we tried a dict of weakrefs,
- # and though the solution there is simple, it still doesn't solve the
- # issue of "dead" weakrefs sitting in the dict taking up space
-
- @profiling.function_call_count(63, {'2.3': 42, '2.4': 43})
+
+ @profiling.function_call_count(54, {'2.3': 42, '2.4': 43})
def test_first_connect(self):
conn = pool.connect()
conn = pool.connect()
conn.close()
- @profiling.function_call_count(39, {'2.3': 26, '2.4': 26})
+ @profiling.function_call_count(31, {'2.3': 26, '2.4': 26})
def go():
conn2 = pool.connect()
return conn2
def test_second_samethread_connect(self):
conn = pool.connect()
- @profiling.function_call_count(7, {'2.3': 4, '2.4': 4})
+ @profiling.function_call_count(5, {'2.3': 4, '2.4': 4})
def go():
return pool.connect()
c2 = go()