From cd7678a965594ff2db153a7cade0fe8555bd0d38 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Wed, 13 Aug 2008 22:41:17 +0000 Subject: [PATCH] - with 2.3 support dropped, all usage of thread.get_ident() is removed, and replaced with threading.local() usage. this allows potentially faster and safer thread local access. --- lib/sqlalchemy/pool.py | 88 +++++++++++++++++------------------ lib/sqlalchemy/util.py | 50 +++++++++++++------- test/profiling/pool.py | 14 ++---- test/profiling/zoomark_orm.py | 2 +- 4 files changed, 81 insertions(+), 73 deletions(-) diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 6cc05b2870..ddf7cb51ed 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -16,7 +16,7 @@ regular DB-API connect() methods to be transparently managed by a SQLAlchemy connection pool. """ -import weakref, time +import weakref, time, threading from sqlalchemy import exc, log from sqlalchemy import queue as Queue @@ -119,11 +119,7 @@ class Pool(object): def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False, reset_on_return=True, listeners=None): self.logger = log.instance_logger(self, echoflag=echo) - # the WeakValueDictionary works more nicely than a regular dict of - # weakrefs. the latter can pile up dead reference objects which don't - # get cleaned out. WVD adds from 1-6 method calls to a checkout - # operation. - self._threadconns = weakref.WeakValueDictionary() + self._threadconns = threading.local() self._creator = creator self._recycle = recycle self._use_threadlocal = use_threadlocal @@ -165,15 +161,15 @@ class Pool(object): return _ConnectionFairy(self).checkout() try: - return self._threadconns[thread.get_ident()].checkout() - except KeyError: + return self._threadconns.current().checkout() + except AttributeError: agent = _ConnectionFairy(self) - self._threadconns[thread.get_ident()] = agent + self._threadconns.current = weakref.ref(agent) return agent.checkout() def return_conn(self, record): - if self._use_threadlocal and thread.get_ident() in self._threadconns: - del self._threadconns[thread.get_ident()] + if self._use_threadlocal and hasattr(self._threadconns, "current"): + del self._threadconns.current self.do_return_conn(record) def get(self): @@ -286,8 +282,6 @@ class _ConnectionRecord(object): self.__pool.log("Error on connect(): %s" % e) raise - properties = property(lambda self: self.info, - doc="A synonym for .info, will be removed in 0.5.") def _finalize_fairy(connection, connection_record, pool, ref=None): if ref is not None and connection_record.backref is not ref: @@ -331,11 +325,16 @@ class _ConnectionFairy(object): self._pool.log("Connection %r checked out from pool" % self.connection) - _logger = property(lambda self: self._pool.logger) + @property + def _logger(self): + return self._pool.logger - is_valid = property(lambda self:self.connection is not None) + @property + def is_valid(self): + return self.connection is not None - def _get_info(self): + @property + def info(self): """An info collection unique to this DB-API connection.""" try: @@ -348,8 +347,6 @@ class _ConnectionFairy(object): except AttributeError: self._detached_info = value = {} return value - info = property(_get_info) - properties = property(_get_info) def invalidate(self, e=None): """Mark this connection as invalidated. @@ -478,61 +475,60 @@ class SingletonThreadPool(Pool): def __init__(self, creator, pool_size=5, **params): params['use_threadlocal'] = True Pool.__init__(self, creator, **params) - self._conns = {} + self._conn = threading.local() + self._all_conns = set() self.size = pool_size def recreate(self): self.log("Pool recreating") - return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners) + return SingletonThreadPool(self._creator, + pool_size=self.size, + recycle=self._recycle, + echo=self._should_log_info, + use_threadlocal=self._use_threadlocal, + listeners=self.listeners) def dispose(self): - """Dispose of this pool. + """Dispose of this pool.""" - this method leaves the possibility of checked-out connections - remaining opened, so it is advised to not reuse the pool once - dispose() is called, and to instead use a new pool constructed - by the recreate() method. - """ - - for key, conn in self._conns.items(): + for conn in self._all_conns: try: conn.close() except (SystemExit, KeyboardInterrupt): raise except: - # sqlite won't even let you close a conn from a thread + # pysqlite won't even let you close a conn from a thread # that didn't create it pass - del self._conns[key] - + + self._all_conns.clear() + def dispose_local(self): - try: - del self._conns[thread.get_ident()] - except KeyError: - pass + if hasattr(self._conn, 'current'): + conn = self._conn.current() + self._all_conns.discard(conn) + del self._conn.current def cleanup(self): - for key in self._conns.keys(): - try: - del self._conns[key] - except KeyError: - pass - if len(self._conns) <= self.size: + for conn in list(self._all_conns): + self._all_conns.discard(conn) + if len(self._all_conns) <= self.size: return def status(self): - return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns)) + return "SingletonThreadPool id:%d size: %d" % (id(self), len(self._all_conns)) def do_return_conn(self, conn): pass def do_get(self): try: - return self._conns[thread.get_ident()] - except KeyError: + return self._conn.current() + except AttributeError: c = self.create_connection() - self._conns[thread.get_ident()] = c - if len(self._conns) > self.size: + self._conn.current = weakref.ref(c) + self._all_conns.add(c) + if len(self._all_conns) > self.size: self.cleanup() return c diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py index 76c73ca6ae..735843d2d4 100644 --- a/lib/sqlalchemy/util.py +++ b/lib/sqlalchemy/util.py @@ -746,7 +746,6 @@ class OrderedDict(dict): self._list.remove(item[0]) return item - class OrderedSet(set): def __init__(self, d=None): set.__init__(self) @@ -1101,41 +1100,60 @@ class ScopedRegistry(object): a callable that returns a new object to be placed in the registry scopefunc - a callable that will return a key to store/retrieve an object, - defaults to ``thread.get_ident`` for thread-local objects. Use - a value like ``lambda: True`` for application scope. - """ + a callable that will return a key to store/retrieve an object. + If None, ScopedRegistry uses a threading.local object instead. - def __init__(self, createfunc, scopefunc=None): - self.createfunc = createfunc - if scopefunc is None: - self.scopefunc = thread.get_ident + """ + def __new__(cls, createfunc, scopefunc=None): + if not scopefunc: + return object.__new__(_TLocalRegistry) else: - self.scopefunc = scopefunc + return object.__new__(cls) + + def __init__(self, createfunc, scopefunc): + self.createfunc = createfunc + self.scopefunc = scopefunc self.registry = {} def __call__(self): - key = self._get_key() + key = self.scopefunc() try: return self.registry[key] except KeyError: return self.registry.setdefault(key, self.createfunc()) def has(self): - return self._get_key() in self.registry + return self.scopefunc() in self.registry def set(self, obj): - self.registry[self._get_key()] = obj + self.registry[self.scopefunc()] = obj def clear(self): try: - del self.registry[self._get_key()] + del self.registry[self.scopefunc()] except KeyError: pass - def _get_key(self): - return self.scopefunc() +class _TLocalRegistry(ScopedRegistry): + def __init__(self, createfunc, scopefunc=None): + self.createfunc = createfunc + self.registry = threading.local() + + def __call__(self): + try: + return self.registry.value + except AttributeError: + val = self.registry.value = self.createfunc() + return val + def has(self): + return hasattr(self.registry, "value") + + def set(self, obj): + self.registry.value = obj + + def clear(self): + del self.registry.value class WeakCompositeKey(object): """an weak-referencable, hashable collection which is strongly referenced diff --git a/test/profiling/pool.py b/test/profiling/pool.py index 4b146fbabd..72c4b03e44 100644 --- a/test/profiling/pool.py +++ b/test/profiling/pool.py @@ -15,14 +15,8 @@ class QueuePoolTest(TestBase, AssertsExecutionResults): pool_size=3, max_overflow=-1, use_threadlocal=True) - # the WeakValueDictionary used for the pool's "threadlocal" idea adds 1-6 - # method calls to each of these. however its just a lot easier stability - # wise than dealing with a strongly referencing dict of weakrefs. - # [ticket:754] immediately got opened when we tried a dict of weakrefs, - # and though the solution there is simple, it still doesn't solve the - # issue of "dead" weakrefs sitting in the dict taking up space - - @profiling.function_call_count(63, {'2.3': 42, '2.4': 43}) + + @profiling.function_call_count(54, {'2.3': 42, '2.4': 43}) def test_first_connect(self): conn = pool.connect() @@ -30,7 +24,7 @@ class QueuePoolTest(TestBase, AssertsExecutionResults): conn = pool.connect() conn.close() - @profiling.function_call_count(39, {'2.3': 26, '2.4': 26}) + @profiling.function_call_count(31, {'2.3': 26, '2.4': 26}) def go(): conn2 = pool.connect() return conn2 @@ -39,7 +33,7 @@ class QueuePoolTest(TestBase, AssertsExecutionResults): def test_second_samethread_connect(self): conn = pool.connect() - @profiling.function_call_count(7, {'2.3': 4, '2.4': 4}) + @profiling.function_call_count(5, {'2.3': 4, '2.4': 4}) def go(): return pool.connect() c2 = go() diff --git a/test/profiling/zoomark_orm.py b/test/profiling/zoomark_orm.py index 9e6bcbadda..a8c1d5dcee 100644 --- a/test/profiling/zoomark_orm.py +++ b/test/profiling/zoomark_orm.py @@ -286,7 +286,7 @@ class ZooMarkTest(TestBase): metadata = MetaData(engine) session = sessionmaker()() - @profiling.function_call_count(4659) + @profiling.function_call_count(4898) def test_profile_1_create_tables(self): self.test_baseline_1_create_tables() -- 2.47.3