]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Fixed a critical regression caused by :ticket:`2880` where the newly
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 19 Feb 2014 15:48:32 +0000 (10:48 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 19 Feb 2014 15:51:57 +0000 (10:51 -0500)
concurrent ability to return connections from the pool means that the
"first_connect" event is now no longer synchronized either, thus leading
to dialect mis-configurations under even minimal concurrency situations.

Conflicts:
lib/sqlalchemy/event/attr.py

doc/build/changelog/changelog_08.rst
lib/sqlalchemy/event.py
test/engine/test_pool.py

index 3b9c7611bcb6841d4bcc45244c884bb50105189d..3484c94c0a44c74aa302921a430fd14b7bed8248 100644 (file)
 .. changelog::
     :version: 0.8.5
 
+     .. change::
+        :tags: bug, engine, pool
+        :versions: 0.9.3
+        :tickets: 2880, 2964
+
+        Fixed a critical regression caused by :ticket:`2880` where the newly
+        concurrent ability to return connections from the pool means that the
+        "first_connect" event is now no longer synchronized either, thus leading
+        to dialect mis-configurations under even minimal concurrency situations.
+
     .. change::
         :tags: bug, sqlite
         :pullreq: github:72
index f28f19ee920a560ac3c2d372b0eb4bc519e374a1..ec8481caf37faed88a106cc86066c32156e60ff8 100644 (file)
@@ -5,8 +5,10 @@
 # the MIT License: http://www.opensource.org/licenses/mit-license.php
 
 """Base event API."""
+from __future__ import with_statement
 
 from . import util, exc
+from .util import threading
 from itertools import chain
 import weakref
 
@@ -384,13 +386,21 @@ class _EmptyListener(object):
 class _CompoundListener(object):
     _exec_once = False
 
+    @util.memoized_property
+    def _exec_once_mutex(self):
+        return threading.Lock()
+
     def exec_once(self, *args, **kw):
         """Execute this event, but only if it has not been
         executed already for this collection."""
 
-        if not self._exec_once:
-            self(*args, **kw)
-            self._exec_once = True
+        with self._exec_once_mutex:
+            if not self._exec_once:
+                try:
+                    self(*args, **kw)
+                finally:
+                    self._exec_once = True
+
 
     # I'm not entirely thrilled about the overhead here,
     # but this allows class-level listeners to be added
index 702ec5560773627ef4135797469e142fa5d25cf9..3c2d9c5fb55e738fc1fdfc2ab1b0a49d859b40db 100644 (file)
@@ -495,6 +495,44 @@ class PoolEventsTest(PoolTestBase):
         # going
         pool.Pool.dispatch._clear()
 
+class PoolFirstConnectSyncTest(PoolTestBase):
+    # test [ticket:2964]
+
+    def test_sync(self):
+        pool = self._queuepool_fixture(pool_size=3, max_overflow=0)
+
+        evt = Mock()
+
+        @event.listens_for(pool, 'first_connect')
+        def slow_first_connect(dbapi_con, rec):
+            time.sleep(1)
+            evt.first_connect()
+
+        @event.listens_for(pool, 'connect')
+        def on_connect(dbapi_con, rec):
+            evt.connect()
+
+        def checkout():
+            for j in range(2):
+                c1 = pool.connect()
+                time.sleep(.02)
+                c1.close()
+                time.sleep(.02)
+
+        threads = []
+        for i in range(5):
+            th = threading.Thread(target=checkout)
+            th.start()
+            threads.append(th)
+        for th in threads:
+            th.join(join_timeout)
+
+        eq_(evt.mock_calls,
+                [call.first_connect(), call.connect(), call.connect(), call.connect()]
+            )
+
+
+
 class DeprecatedPoolListenerTest(PoolTestBase):
     @testing.requires.predictable_gc
     @testing.uses_deprecated(r".*Use event.listen")