From: Mike Bayer Date: Wed, 19 Feb 2014 15:48:32 +0000 (-0500) Subject: - Fixed a critical regression caused by :ticket:`2880` where the newly X-Git-Tag: rel_0_9_3~14 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=dd32540dabbee0678530fb1b0868d1eb41572dca;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - Fixed a critical regression caused by :ticket:`2880` where the newly concurrent ability to return connections from the pool means that the "first_connect" event is now no longer synchronized either, thus leading to dialect mis-configurations under even minimal concurrency situations. --- diff --git a/doc/build/changelog/changelog_08.rst b/doc/build/changelog/changelog_08.rst index c70a10ce90..4e91839cc5 100644 --- a/doc/build/changelog/changelog_08.rst +++ b/doc/build/changelog/changelog_08.rst @@ -11,6 +11,16 @@ .. changelog:: :version: 0.8.5 + .. change:: + :tags: bug, engine, pool + :versions: 0.9.3 + :tickets: 2880, 2964 + + Fixed a critical regression caused by :ticket:`2880` where the newly + concurrent ability to return connections from the pool means that the + "first_connect" event is now no longer synchronized either, thus leading + to dialect mis-configurations under even minimal concurrency situations. + .. change:: :tags: bug, postgresql :tickets: 2291 diff --git a/lib/sqlalchemy/event/attr.py b/lib/sqlalchemy/event/attr.py index 3f89475466..ca94493f57 100644 --- a/lib/sqlalchemy/event/attr.py +++ b/lib/sqlalchemy/event/attr.py @@ -28,14 +28,16 @@ as well as support for subclass propagation (e.g. events assigned to """ -from __future__ import absolute_import +from __future__ import absolute_import, with_statement from .. import util +from ..util import threading from . import registry from . import legacy from itertools import chain import weakref + class RefCollection(object): @util.memoized_property def ref(self): @@ -230,13 +232,20 @@ class _EmptyListener(_HasParentDispatchDescriptor): class _CompoundListener(_HasParentDispatchDescriptor): _exec_once = False + @util.memoized_property + def _exec_once_mutex(self): + return threading.Lock() + def exec_once(self, *args, **kw): """Execute this event, but only if it has not been executed already for this collection.""" - if not self._exec_once: - self(*args, **kw) - self._exec_once = True + with self._exec_once_mutex: + if not self._exec_once: + try: + self(*args, **kw) + finally: + self._exec_once = True def __call__(self, *args, **kw): """Execute this event.""" diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 2e4c2dc486..fc6f3dceaa 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -525,6 +525,44 @@ class PoolEventsTest(PoolTestBase): # going pool.Pool.dispatch._clear() +class PoolFirstConnectSyncTest(PoolTestBase): + # test [ticket:2964] + + def test_sync(self): + pool = self._queuepool_fixture(pool_size=3, max_overflow=0) + + evt = Mock() + + @event.listens_for(pool, 'first_connect') + def slow_first_connect(dbapi_con, rec): + time.sleep(1) + evt.first_connect() + + @event.listens_for(pool, 'connect') + def on_connect(dbapi_con, rec): + evt.connect() + + def checkout(): + for j in range(2): + c1 = pool.connect() + time.sleep(.02) + c1.close() + time.sleep(.02) + + threads = [] + for i in range(5): + th = threading.Thread(target=checkout) + th.start() + threads.append(th) + for th in threads: + th.join(join_timeout) + + eq_(evt.mock_calls, + [call.first_connect(), call.connect(), call.connect(), call.connect()] + ) + + + class DeprecatedPoolListenerTest(PoolTestBase): @testing.requires.predictable_gc @testing.uses_deprecated(r".*Use event.listen")