--- /dev/null
+.. change::
+ :tags: bug, engine
+ :tickets: 4807
+
+ Fixed an issue whereby if the dialect "initialize" process which occurs on
+ first connect would encounter an unexpected exception, the initialize
+ process would fail to complete and then no longer attempt on subsequent
+ connection attempts, leaving the dialect in an un-initialized, or partially
+ initialized state, within the scope of parameters that need to be
+ established based on inspection of a live connection. The "invoke once"
+ logic in the event system has been reworked to accommodate for this
+ occurrence using new, private API features that establish an "exec once"
+ hook that will continue to allow the initializer to fire off on subsequent
+ connections, until it completes without raising an exception. This does not
+ impact the behavior of the existing ``once=True`` flag within the event
+ system.
dialect.initialize(c)
dialect.do_rollback(c.connection)
- event.listen(pool, "first_connect", first_connect, once=True)
+ event.listen(
+ pool,
+ "first_connect",
+ first_connect,
+ _once_unless_exception=True,
+ )
dialect_cls.engine_created(engine)
if entrypoint is not dialect_cls:
def _needs_modify(self, *args, **kw):
raise NotImplementedError("need to call for_modify()")
- exec_once = insert = append = remove = clear = _needs_modify
+ exec_once = (
+ exec_once_unless_exception
+ ) = insert = append = remove = clear = _needs_modify
def __call__(self, *args, **kw):
"""Execute this event."""
def _memoized_attr__exec_once_mutex(self):
return threading.Lock()
+ def _exec_once_impl(self, retry_on_exception, *args, **kw):
+ with self._exec_once_mutex:
+ if not self._exec_once:
+ try:
+ self(*args, **kw)
+ exception = False
+ except:
+ exception = True
+ raise
+ finally:
+ if not exception or not retry_on_exception:
+ self._exec_once = True
+
def exec_once(self, *args, **kw):
"""Execute this event, but only if it has not been
executed already for this collection."""
if not self._exec_once:
- with self._exec_once_mutex:
- if not self._exec_once:
- try:
- self(*args, **kw)
- finally:
- self._exec_once = True
+ self._exec_once_impl(False, *args, **kw)
+
+ def exec_once_unless_exception(self, *args, **kw):
+ """Execute this event, but only if it has not been
+ executed already for this collection, or was called
+ by a previous exec_once_unless_exception call and
+ raised an exception.
+
+ If exec_once was already called, then this method will never run
+ the callable regardless of whether it raised or not.
+
+ .. versionadded:: 1.3.8
+
+ """
+ if not self._exec_once:
+ self._exec_once_impl(True, *args, **kw)
def __call__(self, *args, **kw):
"""Execute this event."""
def listen(self, *args, **kw):
once = kw.pop("once", False)
+ once_unless_exception = kw.pop("_once_unless_exception", False)
named = kw.pop("named", False)
target, identifier, fn = (
if hasattr(stub_function, "_sa_warn"):
stub_function._sa_warn()
- if once:
- self.with_wrapper(util.only_once(self._listen_fn)).listen(
- *args, **kw
- )
+ if once or once_unless_exception:
+ self.with_wrapper(
+ util.only_once(
+ self._listen_fn, retry_on_exception=once_unless_exception
+ )
+ ).listen(*args, **kw)
else:
self.dispatch_target.dispatch._listen(self, *args, **kw)
if first_connect_check:
pool.dispatch.first_connect.for_modify(
pool.dispatch
- ).exec_once(self.connection, self)
+ ).exec_once_unless_exception(self.connection, self)
if pool.dispatch.connect:
pool.dispatch.connect(self.connection, self)
warnings.warn(msg, exc.SAWarning, stacklevel=2)
-def only_once(fn):
+def only_once(fn, retry_on_exception):
"""Decorate the given function to be a no-op after it is called exactly
once."""
strong_fn = fn # noqa
if once:
once_fn = once.pop()
- return once_fn(*arg, **kw)
+ try:
+ return once_fn(*arg, **kw)
+ except:
+ if retry_on_exception:
+ once.insert(0, once_fn)
+ raise
return go
t2.dispatch.event_one,
)
+ def test_exec_once(self):
+ m1 = Mock()
+
+ event.listen(self.Target, "event_one", m1)
+
+ t1 = self.Target()
+ t2 = self.Target()
+
+ t1.dispatch.event_one.for_modify(t1.dispatch).exec_once(5, 6)
+
+ t1.dispatch.event_one.for_modify(t1.dispatch).exec_once(7, 8)
+
+ t2.dispatch.event_one.for_modify(t2.dispatch).exec_once(9, 10)
+
+ eq_(m1.mock_calls, [call(5, 6), call(9, 10)])
+
+ def test_exec_once_exception(self):
+ m1 = Mock()
+ m1.side_effect = ValueError
+
+ event.listen(self.Target, "event_one", m1)
+
+ t1 = self.Target()
+
+ assert_raises(
+ ValueError,
+ t1.dispatch.event_one.for_modify(t1.dispatch).exec_once,
+ 5,
+ 6,
+ )
+
+ t1.dispatch.event_one.for_modify(t1.dispatch).exec_once(7, 8)
+
+ eq_(m1.mock_calls, [call(5, 6)])
+
+ def test_exec_once_unless_exception(self):
+ m1 = Mock()
+ m1.side_effect = ValueError
+
+ event.listen(self.Target, "event_one", m1)
+
+ t1 = self.Target()
+
+ assert_raises(
+ ValueError,
+ t1.dispatch.event_one.for_modify(
+ t1.dispatch
+ ).exec_once_unless_exception,
+ 5,
+ 6,
+ )
+
+ assert_raises(
+ ValueError,
+ t1.dispatch.event_one.for_modify(
+ t1.dispatch
+ ).exec_once_unless_exception,
+ 7,
+ 8,
+ )
+
+ m1.side_effect = None
+ t1.dispatch.event_one.for_modify(
+ t1.dispatch
+ ).exec_once_unless_exception(9, 10)
+
+ t1.dispatch.event_one.for_modify(
+ t1.dispatch
+ ).exec_once_unless_exception(11, 12)
+
+ eq_(m1.mock_calls, [call(5, 6), call(7, 8), call(9, 10)])
+
def test_immutable_methods(self):
t1 = self.Target()
for meth in [
eq_(m3.mock_calls, [call("x")])
eq_(m4.mock_calls, [call("z")])
+ def test_once_unless_exception(self):
+ Target = self._fixture()
+
+ m1 = Mock()
+ m2 = Mock()
+ m3 = Mock()
+ m4 = Mock()
+
+ m1.side_effect = ValueError
+ m2.side_effect = ValueError
+ m3.side_effect = ValueError
+
+ event.listen(Target, "event_one", m1)
+ event.listen(Target, "event_one", m2, _once_unless_exception=True)
+ event.listen(Target, "event_one", m3, _once_unless_exception=True)
+
+ t1 = Target()
+
+ # only m1 is called, raises
+ assert_raises(ValueError, t1.dispatch.event_one, "x")
+
+ # now m1 and m2 can be called but not m3
+ m1.side_effect = None
+
+ assert_raises(ValueError, t1.dispatch.event_one, "y")
+
+ # now m3 can be called
+ m2.side_effect = None
+
+ event.listen(Target, "event_one", m4, _once_unless_exception=True)
+ assert_raises(ValueError, t1.dispatch.event_one, "z")
+
+ assert_raises(ValueError, t1.dispatch.event_one, "q")
+
+ eq_(m1.mock_calls, [call("x"), call("y"), call("z"), call("q")])
+ eq_(m2.mock_calls, [call("y"), call("z")])
+ eq_(m3.mock_calls, [call("z"), call("q")])
+ eq_(m4.mock_calls, []) # m4 never got called because m3 blocked it
+
+ # now m4 can be called
+ m3.side_effect = None
+
+ t1.dispatch.event_one("p")
+ eq_(
+ m1.mock_calls,
+ [call("x"), call("y"), call("z"), call("q"), call("p")],
+ )
+
+ # m2 already got called, so no "p"
+ eq_(m2.mock_calls, [call("y"), call("z")])
+ eq_(m3.mock_calls, [call("z"), call("q"), call("p")])
+ eq_(m4.mock_calls, [call("p")])
+
+ t1.dispatch.event_one("j")
+ eq_(
+ m1.mock_calls,
+ [call("x"), call("y"), call("z"), call("q"), call("p"), call("j")],
+ )
+
+ # nobody got "j" because they've all been successful
+ eq_(m2.mock_calls, [call("y"), call("z")])
+ eq_(m3.mock_calls, [call("z"), call("q"), call("p")])
+ eq_(m4.mock_calls, [call("p")])
+
def test_once_doesnt_dereference_listener(self):
# test for [ticket:4794]
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_false
+from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing import ne_
from sqlalchemy.testing.engines import testing_engine
engine = create_engine(MyURL("foo://"), module=dbapi)
engine.connect()
+
+ # note that the dispose() call replaces the old pool with a new one;
+ # this is to test that even though a single pool is using
+ # dispatch.exec_once(), by replacing the pool with a new one, the event
+ # would normally fire again onless once=True is set on the original
+ # listen as well.
+
engine.dispose()
engine.connect()
eq_(Dialect.initialize.call_count, 1)
+ def test_dialect_initialize_retry_if_exception(self):
+ from sqlalchemy.engine.url import URL
+ from sqlalchemy.engine.default import DefaultDialect
+
+ dbapi = self.dbapi
+
+ class MyURL(URL):
+ def _get_entrypoint(self):
+ return Dialect
+
+ def get_dialect(self):
+ return Dialect
+
+ class Dialect(DefaultDialect):
+ initialize = Mock()
+
+ # note that the first_connect hook is only invoked when the pool
+ # makes a new DBAPI connection, and not when it checks out an existing
+ # connection. So there is a dependency here that if the initializer
+ # raises an exception, the pool-level connection attempt is also
+ # failed, meaning no DBAPI connection is pooled. If the first_connect
+ # exception raise did not prevent the connection from being pooled,
+ # there could be the case where the pool could return that connection
+ # on a subsequent attempt without initialization having proceeded.
+
+ Dialect.initialize.side_effect = TypeError
+ engine = create_engine(MyURL("foo://"), module=dbapi)
+
+ assert_raises(TypeError, engine.connect)
+ eq_(Dialect.initialize.call_count, 1)
+ is_true(engine.pool._pool.empty())
+
+ assert_raises(TypeError, engine.connect)
+ eq_(Dialect.initialize.call_count, 2)
+ is_true(engine.pool._pool.empty())
+
+ engine.dispose()
+
+ assert_raises(TypeError, engine.connect)
+ eq_(Dialect.initialize.call_count, 3)
+ is_true(engine.pool._pool.empty())
+
+ Dialect.initialize.side_effect = None
+
+ conn = engine.connect()
+ eq_(Dialect.initialize.call_count, 4)
+ conn.close()
+ is_false(engine.pool._pool.empty())
+
+ conn = engine.connect()
+ eq_(Dialect.initialize.call_count, 4)
+ conn.close()
+ is_false(engine.pool._pool.empty())
+
+ engine.dispose()
+ conn = engine.connect()
+
+ eq_(Dialect.initialize.call_count, 4)
+ conn.close()
+ is_false(engine.pool._pool.empty())
+
def test_invalidate_conn_w_contextmanager_interrupt(self):
# test [ticket:3803]
pool = self.db.pool