]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- with 2.3 support dropped,
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 13 Aug 2008 22:41:17 +0000 (22:41 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 13 Aug 2008 22:41:17 +0000 (22:41 +0000)
all usage of thread.get_ident() is removed, and replaced
with threading.local() usage.  this allows potentially
faster and safer thread local access.

lib/sqlalchemy/pool.py
lib/sqlalchemy/util.py
test/profiling/pool.py
test/profiling/zoomark_orm.py

index 6cc05b2870cd886433f21a9d0e5da7f7c5f559a2..ddf7cb51ede2c67623b68a52033f6a360cdec74e 100644 (file)
@@ -16,7 +16,7 @@ regular DB-API connect() methods to be transparently managed by a
 SQLAlchemy connection pool.
 """
 
-import weakref, time
+import weakref, time, threading
 
 from sqlalchemy import exc, log
 from sqlalchemy import queue as Queue
@@ -119,11 +119,7 @@ class Pool(object):
     def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False,
                  reset_on_return=True, listeners=None):
         self.logger = log.instance_logger(self, echoflag=echo)
-        # the WeakValueDictionary works more nicely than a regular dict of
-        # weakrefs.  the latter can pile up dead reference objects which don't
-        # get cleaned out.  WVD adds from 1-6 method calls to a checkout
-        # operation.
-        self._threadconns = weakref.WeakValueDictionary()
+        self._threadconns = threading.local()
         self._creator = creator
         self._recycle = recycle
         self._use_threadlocal = use_threadlocal
@@ -165,15 +161,15 @@ class Pool(object):
             return _ConnectionFairy(self).checkout()
 
         try:
-            return self._threadconns[thread.get_ident()].checkout()
-        except KeyError:
+            return self._threadconns.current().checkout()
+        except AttributeError:
             agent = _ConnectionFairy(self)
-            self._threadconns[thread.get_ident()] = agent
+            self._threadconns.current = weakref.ref(agent)
             return agent.checkout()
 
     def return_conn(self, record):
-        if self._use_threadlocal and thread.get_ident() in self._threadconns:
-            del self._threadconns[thread.get_ident()]
+        if self._use_threadlocal and hasattr(self._threadconns, "current"):
+            del self._threadconns.current
         self.do_return_conn(record)
 
     def get(self):
@@ -286,8 +282,6 @@ class _ConnectionRecord(object):
                 self.__pool.log("Error on connect(): %s" % e)
             raise
 
-    properties = property(lambda self: self.info,
-                          doc="A synonym for .info, will be removed in 0.5.")
 
 def _finalize_fairy(connection, connection_record, pool, ref=None):
     if ref is not None and connection_record.backref is not ref:
@@ -331,11 +325,16 @@ class _ConnectionFairy(object):
             self._pool.log("Connection %r checked out from pool" %
                            self.connection)
 
-    _logger = property(lambda self: self._pool.logger)
+    @property
+    def _logger(self):
+        return self._pool.logger
 
-    is_valid = property(lambda self:self.connection is not None)
+    @property
+    def is_valid(self):
+        return self.connection is not None
 
-    def _get_info(self):
+    @property
+    def info(self):
         """An info collection unique to this DB-API connection."""
 
         try:
@@ -348,8 +347,6 @@ class _ConnectionFairy(object):
             except AttributeError:
                 self._detached_info = value = {}
                 return value
-    info = property(_get_info)
-    properties = property(_get_info)
 
     def invalidate(self, e=None):
         """Mark this connection as invalidated.
@@ -478,61 +475,60 @@ class SingletonThreadPool(Pool):
     def __init__(self, creator, pool_size=5, **params):
         params['use_threadlocal'] = True
         Pool.__init__(self, creator, **params)
-        self._conns = {}
+        self._conn = threading.local()
+        self._all_conns = set()
         self.size = pool_size
 
     def recreate(self):
         self.log("Pool recreating")
-        return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners)
+        return SingletonThreadPool(self._creator, 
+            pool_size=self.size, 
+            recycle=self._recycle, 
+            echo=self._should_log_info, 
+            use_threadlocal=self._use_threadlocal, 
+            listeners=self.listeners)
 
     def dispose(self):
-        """Dispose of this pool.
+        """Dispose of this pool."""
 
-        this method leaves the possibility of checked-out connections
-        remaining opened, so it is advised to not reuse the pool once
-        dispose() is called, and to instead use a new pool constructed
-        by the recreate() method.
-        """
-
-        for key, conn in self._conns.items():
+        for conn in self._all_conns:
             try:
                 conn.close()
             except (SystemExit, KeyboardInterrupt):
                 raise
             except:
-                # sqlite won't even let you close a conn from a thread
+                # pysqlite won't even let you close a conn from a thread
                 # that didn't create it
                 pass
-            del self._conns[key]
-
+        
+        self._all_conns.clear()
+            
     def dispose_local(self):
-        try:
-            del self._conns[thread.get_ident()]
-        except KeyError:
-            pass
+        if hasattr(self._conn, 'current'):
+            conn = self._conn.current()
+            self._all_conns.discard(conn)
+            del self._conn.current
 
     def cleanup(self):
-        for key in self._conns.keys():
-            try:
-                del self._conns[key]
-            except KeyError:
-                pass
-            if len(self._conns) <= self.size:
+        for conn in list(self._all_conns):
+            self._all_conns.discard(conn)
+            if len(self._all_conns) <= self.size:
                 return
 
     def status(self):
-        return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns))
+        return "SingletonThreadPool id:%d size: %d" % (id(self), len(self._all_conns))
 
     def do_return_conn(self, conn):
         pass
 
     def do_get(self):
         try:
-            return self._conns[thread.get_ident()]
-        except KeyError:
+            return self._conn.current()
+        except AttributeError:
             c = self.create_connection()
-            self._conns[thread.get_ident()] = c
-            if len(self._conns) > self.size:
+            self._conn.current = weakref.ref(c)
+            self._all_conns.add(c)
+            if len(self._all_conns) > self.size:
                 self.cleanup()
             return c
 
index 76c73ca6ae4d338fc09c1f61e33f15e6f4f5b026..735843d2d4d9459d6b46ea8b5f29ed1e3010ec22 100644 (file)
@@ -746,7 +746,6 @@ class OrderedDict(dict):
         self._list.remove(item[0])
         return item
 
-
 class OrderedSet(set):
     def __init__(self, d=None):
         set.__init__(self)
@@ -1101,41 +1100,60 @@ class ScopedRegistry(object):
       a callable that returns a new object to be placed in the registry
 
     scopefunc
-      a callable that will return a key to store/retrieve an object,
-      defaults to ``thread.get_ident`` for thread-local objects.  Use
-      a value like ``lambda: True`` for application scope.
-    """
+      a callable that will return a key to store/retrieve an object.
+      If None, ScopedRegistry uses a threading.local object instead.
 
-    def __init__(self, createfunc, scopefunc=None):
-        self.createfunc = createfunc
-        if scopefunc is None:
-            self.scopefunc = thread.get_ident
+    """
+    def __new__(cls, createfunc, scopefunc=None):
+        if not scopefunc:
+            return object.__new__(_TLocalRegistry)
         else:
-            self.scopefunc = scopefunc
+            return object.__new__(cls)
+        
+    def __init__(self, createfunc, scopefunc):
+        self.createfunc = createfunc
+        self.scopefunc = scopefunc
         self.registry = {}
 
     def __call__(self):
-        key = self._get_key()
+        key = self.scopefunc()
         try:
             return self.registry[key]
         except KeyError:
             return self.registry.setdefault(key, self.createfunc())
 
     def has(self):
-        return self._get_key() in self.registry
+        return self.scopefunc() in self.registry
 
     def set(self, obj):
-        self.registry[self._get_key()] = obj
+        self.registry[self.scopefunc()] = obj
 
     def clear(self):
         try:
-            del self.registry[self._get_key()]
+            del self.registry[self.scopefunc()]
         except KeyError:
             pass
 
-    def _get_key(self):
-        return self.scopefunc()
+class _TLocalRegistry(ScopedRegistry):
+    def __init__(self, createfunc, scopefunc=None):
+        self.createfunc = createfunc
+        self.registry = threading.local()
+
+    def __call__(self):
+        try:
+            return self.registry.value
+        except AttributeError:
+            val = self.registry.value = self.createfunc()
+            return val
 
+    def has(self):
+        return hasattr(self.registry, "value")
+
+    def set(self, obj):
+        self.registry.value = obj
+
+    def clear(self):
+        del self.registry.value
 
 class WeakCompositeKey(object):
     """an weak-referencable, hashable collection which is strongly referenced
index 4b146fbabdc1a549680494513cf4647e78ae498b..72c4b03e442749edf334cf70a357614dfb4e7a50 100644 (file)
@@ -15,14 +15,8 @@ class QueuePoolTest(TestBase, AssertsExecutionResults):
                          pool_size=3, max_overflow=-1,
                          use_threadlocal=True)
 
-    # the WeakValueDictionary used for the pool's "threadlocal" idea adds 1-6
-    # method calls to each of these.  however its just a lot easier stability
-    # wise than dealing with a strongly referencing dict of weakrefs.
-    # [ticket:754] immediately got opened when we tried a dict of weakrefs,
-    # and though the solution there is simple, it still doesn't solve the
-    # issue of "dead" weakrefs sitting in the dict taking up space
-
-    @profiling.function_call_count(63, {'2.3': 42, '2.4': 43})
+
+    @profiling.function_call_count(54, {'2.3': 42, '2.4': 43})
     def test_first_connect(self):
         conn = pool.connect()
 
@@ -30,7 +24,7 @@ class QueuePoolTest(TestBase, AssertsExecutionResults):
         conn = pool.connect()
         conn.close()
 
-        @profiling.function_call_count(39, {'2.3': 26, '2.4': 26})
+        @profiling.function_call_count(31, {'2.3': 26, '2.4': 26})
         def go():
             conn2 = pool.connect()
             return conn2
@@ -39,7 +33,7 @@ class QueuePoolTest(TestBase, AssertsExecutionResults):
     def test_second_samethread_connect(self):
         conn = pool.connect()
 
-        @profiling.function_call_count(7, {'2.3': 4, '2.4': 4})
+        @profiling.function_call_count(5, {'2.3': 4, '2.4': 4})
         def go():
             return pool.connect()
         c2 = go()
index 9e6bcbadda22d3e08fff5b31d2b4c86794cd10b8..a8c1d5dcee6ec2af86a1024524388c609ab0d803 100644 (file)
@@ -286,7 +286,7 @@ class ZooMarkTest(TestBase):
         metadata = MetaData(engine)
         session = sessionmaker()()
 
-    @profiling.function_call_count(4659)
+    @profiling.function_call_count(4898)
     def test_profile_1_create_tables(self):
         self.test_baseline_1_create_tables()