]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Fixed a critical regression caused by :ticket:`2880` where the newly
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 19 Feb 2014 15:48:32 +0000 (10:48 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 19 Feb 2014 15:48:32 +0000 (10:48 -0500)
concurrent ability to return connections from the pool means that the
"first_connect" event is now no longer synchronized either, thus leading
to dialect mis-configurations under even minimal concurrency situations.

doc/build/changelog/changelog_08.rst
lib/sqlalchemy/event/attr.py
test/engine/test_pool.py

index c70a10ce90ca95af58fcb4bd61727367589437af..4e91839cc5dc69590c86952ac5ca987cedd6a171 100644 (file)
 .. changelog::
     :version: 0.8.5
 
+     .. change::
+        :tags: bug, engine, pool
+        :versions: 0.9.3
+        :tickets: 2880, 2964
+
+        Fixed a critical regression caused by :ticket:`2880` where the newly
+        concurrent ability to return connections from the pool means that the
+        "first_connect" event is now no longer synchronized either, thus leading
+        to dialect mis-configurations under even minimal concurrency situations.
+
     .. change::
         :tags: bug, postgresql
         :tickets: 2291
index 3f89475466f8522ec13e43fb49b6feb5de456331..ca94493f578b4001d62616142bb231f81d7ed6fa 100644 (file)
@@ -28,14 +28,16 @@ as well as support for subclass propagation (e.g. events assigned to
 
 """
 
-from __future__ import absolute_import
+from __future__ import absolute_import, with_statement
 
 from .. import util
+from ..util import threading
 from . import registry
 from . import legacy
 from itertools import chain
 import weakref
 
+
 class RefCollection(object):
     @util.memoized_property
     def ref(self):
@@ -230,13 +232,20 @@ class _EmptyListener(_HasParentDispatchDescriptor):
 class _CompoundListener(_HasParentDispatchDescriptor):
     _exec_once = False
 
+    @util.memoized_property
+    def _exec_once_mutex(self):
+        return threading.Lock()
+
     def exec_once(self, *args, **kw):
         """Execute this event, but only if it has not been
         executed already for this collection."""
 
-        if not self._exec_once:
-            self(*args, **kw)
-            self._exec_once = True
+        with self._exec_once_mutex:
+            if not self._exec_once:
+                try:
+                    self(*args, **kw)
+                finally:
+                    self._exec_once = True
 
     def __call__(self, *args, **kw):
         """Execute this event."""
index 2e4c2dc486359c1cb0428b101c5bf87ef5268f45..fc6f3dceaa97307c5b837a9d9c271d58491c76e6 100644 (file)
@@ -525,6 +525,44 @@ class PoolEventsTest(PoolTestBase):
         # going
         pool.Pool.dispatch._clear()
 
+class PoolFirstConnectSyncTest(PoolTestBase):
+    # test [ticket:2964]
+
+    def test_sync(self):
+        pool = self._queuepool_fixture(pool_size=3, max_overflow=0)
+
+        evt = Mock()
+
+        @event.listens_for(pool, 'first_connect')
+        def slow_first_connect(dbapi_con, rec):
+            time.sleep(1)
+            evt.first_connect()
+
+        @event.listens_for(pool, 'connect')
+        def on_connect(dbapi_con, rec):
+            evt.connect()
+
+        def checkout():
+            for j in range(2):
+                c1 = pool.connect()
+                time.sleep(.02)
+                c1.close()
+                time.sleep(.02)
+
+        threads = []
+        for i in range(5):
+            th = threading.Thread(target=checkout)
+            th.start()
+            threads.append(th)
+        for th in threads:
+            th.join(join_timeout)
+
+        eq_(evt.mock_calls,
+                [call.first_connect(), call.connect(), call.connect(), call.connect()]
+            )
+
+
+
 class DeprecatedPoolListenerTest(PoolTestBase):
     @testing.requires.predictable_gc
     @testing.uses_deprecated(r".*Use event.listen")