.. changelog::
:version: 0.8.5
+ .. change::
+ :tags: bug, engine, pool
+ :versions: 0.9.3
+ :tickets: 2880, 2964
+
+ Fixed a critical regression caused by :ticket:`2880` where the newly
+ concurrent ability to return connections from the pool means that the
+ "first_connect" event is now no longer synchronized either, thus leading
+ to dialect mis-configurations under even minimal concurrency situations.
+
.. change::
:tags: bug, postgresql
:tickets: 2291
"""
-from __future__ import absolute_import
+from __future__ import absolute_import, with_statement
from .. import util
+from ..util import threading
from . import registry
from . import legacy
from itertools import chain
import weakref
+
class RefCollection(object):
@util.memoized_property
def ref(self):
class _CompoundListener(_HasParentDispatchDescriptor):
_exec_once = False
+ @util.memoized_property
+ def _exec_once_mutex(self):
+ return threading.Lock()
+
def exec_once(self, *args, **kw):
"""Execute this event, but only if it has not been
executed already for this collection."""
- if not self._exec_once:
- self(*args, **kw)
- self._exec_once = True
+ with self._exec_once_mutex:
+ if not self._exec_once:
+ try:
+ self(*args, **kw)
+ finally:
+ self._exec_once = True
def __call__(self, *args, **kw):
"""Execute this event."""
# going
pool.Pool.dispatch._clear()
+class PoolFirstConnectSyncTest(PoolTestBase):
+ # test [ticket:2964]
+
+ def test_sync(self):
+ pool = self._queuepool_fixture(pool_size=3, max_overflow=0)
+
+ evt = Mock()
+
+ @event.listens_for(pool, 'first_connect')
+ def slow_first_connect(dbapi_con, rec):
+ time.sleep(1)
+ evt.first_connect()
+
+ @event.listens_for(pool, 'connect')
+ def on_connect(dbapi_con, rec):
+ evt.connect()
+
+ def checkout():
+ for j in range(2):
+ c1 = pool.connect()
+ time.sleep(.02)
+ c1.close()
+ time.sleep(.02)
+
+ threads = []
+ for i in range(5):
+ th = threading.Thread(target=checkout)
+ th.start()
+ threads.append(th)
+ for th in threads:
+ th.join(join_timeout)
+
+ eq_(evt.mock_calls,
+ [call.first_connect(), call.connect(), call.connect(), call.connect()]
+ )
+
+
+
class DeprecatedPoolListenerTest(PoolTestBase):
@testing.requires.predictable_gc
@testing.uses_deprecated(r".*Use event.listen")