--- /dev/null
+.. change::
+ :tags: bug, engine
+ :tickets: 12289
+
+ Fixed issue where creating an :class:`.Engine` using multiple calls to
+ :meth:`.Engine.execution_options` where a subsequent call involved certain
+ options such as ``isolation_level`` would lead to an internal error
+ involving event registration.
return all(isinstance(target.dispatch, t) for t in types)
def dispatch_parent_is(t: Type[Any]) -> bool:
- return isinstance(
- cast("_JoinedDispatcher[_ET]", target.dispatch).parent, t
- )
+ parent = cast("_JoinedDispatcher[_ET]", target.dispatch).parent
+ while isinstance(parent, _JoinedDispatcher):
+ parent = cast("_JoinedDispatcher[_ET]", parent).parent
+
+ return isinstance(parent, t)
# Mapper, ClassManager, Session override this to
# also accept classes, scoped_sessions, sessionmakers, etc.
def __init__(self, parent):
self.dispatch = self.dispatch._join(parent.dispatch)
+ def create(self):
+ return TargetElement(self)
+
def run_event(self, arg):
list(self.dispatch.event_one)
self.dispatch.event_one(self, arg)
[call(element, 1), call(element, 2), call(element, 3)],
)
+ def test_join_twice(self):
+ """test #12289"""
+
+ l1 = Mock()
+ l2 = Mock()
+
+ first_target_element = self.TargetFactory().create()
+ second_target_element = first_target_element.create()
+
+ event.listen(second_target_element, "event_one", l2)
+ event.listen(first_target_element, "event_one", l1)
+
+ second_target_element.run_event(1)
+ eq_(
+ l1.mock_calls,
+ [call(second_target_element, 1)],
+ )
+ eq_(
+ l2.mock_calls,
+ [call(second_target_element, 1)],
+ )
+
+ first_target_element.run_event(2)
+ eq_(
+ l1.mock_calls,
+ [call(second_target_element, 1), call(first_target_element, 2)],
+ )
+ eq_(
+ l2.mock_calls,
+ [call(second_target_element, 1)],
+ )
+
def test_parent_class_child_instance_apply_after(self):
l1 = Mock()
l2 = Mock()
eq_(canary.be2.call_count, 1)
eq_(canary.be3.call_count, 2)
+ @testing.requires.ad_hoc_engines
+ def test_option_engine_registration_issue_one(self):
+ """test #12289"""
+
+ e1 = create_engine(testing.db.url)
+ e2 = e1.execution_options(foo="bar")
+ e3 = e2.execution_options(isolation_level="AUTOCOMMIT")
+
+ eq_(
+ e3._execution_options,
+ {"foo": "bar", "isolation_level": "AUTOCOMMIT"},
+ )
+
+ @testing.requires.ad_hoc_engines
+ def test_option_engine_registration_issue_two(self):
+ """test #12289"""
+
+ e1 = create_engine(testing.db.url)
+ e2 = e1.execution_options(foo="bar")
+
+ @event.listens_for(e2, "engine_connect")
+ def r1(*arg, **kw):
+ pass
+
+ e3 = e2.execution_options(bat="hoho")
+
+ @event.listens_for(e3, "engine_connect")
+ def r2(*arg, **kw):
+ pass
+
+ eq_(e3._execution_options, {"foo": "bar", "bat": "hoho"})
+
def test_emit_sql_in_autobegin(self, testing_engine):
e1 = testing_engine(config.db_url)