.. changelog::
:version: 0.8.5
+ .. change::
+ :tags: bug, engine, pool
+ :versions: 0.9.3
+ :tickets: 2880, 2964
+
+ Fixed a critical regression caused by :ticket:`2880` where the newly
+ concurrent ability to return connections from the pool means that the
+ "first_connect" event is now no longer synchronized either, thus leading
+ to dialect mis-configurations under even minimal concurrency situations.
+
.. change::
:tags: bug, sqlite
:pullreq: github:72
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""Base event API."""
+from __future__ import with_statement
from . import util, exc
+from .util import threading
from itertools import chain
import weakref
class _CompoundListener(object):
_exec_once = False
+ @util.memoized_property
+ def _exec_once_mutex(self):
+ return threading.Lock()
+
def exec_once(self, *args, **kw):
"""Execute this event, but only if it has not been
executed already for this collection."""
- if not self._exec_once:
- self(*args, **kw)
- self._exec_once = True
+ with self._exec_once_mutex:
+ if not self._exec_once:
+ try:
+ self(*args, **kw)
+ finally:
+ self._exec_once = True
+
# I'm not entirely thrilled about the overhead here,
# but this allows class-level listeners to be added
# going
pool.Pool.dispatch._clear()
+class PoolFirstConnectSyncTest(PoolTestBase):
+ # test [ticket:2964]
+
+ def test_sync(self):
+ pool = self._queuepool_fixture(pool_size=3, max_overflow=0)
+
+ evt = Mock()
+
+ @event.listens_for(pool, 'first_connect')
+ def slow_first_connect(dbapi_con, rec):
+ time.sleep(1)
+ evt.first_connect()
+
+ @event.listens_for(pool, 'connect')
+ def on_connect(dbapi_con, rec):
+ evt.connect()
+
+ def checkout():
+ for j in range(2):
+ c1 = pool.connect()
+ time.sleep(.02)
+ c1.close()
+ time.sleep(.02)
+
+ threads = []
+ for i in range(5):
+ th = threading.Thread(target=checkout)
+ th.start()
+ threads.append(th)
+ for th in threads:
+ th.join(join_timeout)
+
+ eq_(evt.mock_calls,
+ [call.first_connect(), call.connect(), call.connect(), call.connect()]
+ )
+
+
+
class DeprecatedPoolListenerTest(PoolTestBase):
@testing.requires.predictable_gc
@testing.uses_deprecated(r".*Use event.listen")