class OptionEngine(Engine):
+ _sa_propagate_class_events = False
+
def __init__(self, proxied, execution_options):
self._proxied = proxied
self.url = proxied.url
self.logging_name = proxied.logging_name
self.echo = proxied.echo
log.instance_logger(self, echoflag=self.echo)
+
+ # note: this will propagate events that are assigned to the parent
+ # engine after this OptionEngine is created. Since we share
+ # the events of the parent we also disallow class-level events
+ # to apply to the OptionEngine class directly.
+ #
+ # the other way this can work would be to transfer existing
+ # events only, using:
+ # self.dispatch._update(proxied.dispatch)
+ #
+ # that might be more appropriate however it would be a behavioral
+ # change for logic that assigns events to the parent engine and
+ # would like it to take effect for the already-created sub-engine.
self.dispatch = self.dispatch._join(proxied.dispatch)
+
self._execution_options = proxied._execution_options
self.update_execution_options(**execution_options)
"""
from __future__ import absolute_import, with_statement
-
+from .. import exc
from .. import util
from ..util import threading
from . import registry
return weakref.ref(self, registry._collection_gced)
+class _empty_collection(object):
+ def append(self, element):
+ pass
+
+ def extend(self, other):
+ pass
+
+ def __iter__(self):
+ return iter([])
+
+ def clear(self):
+ pass
+
+
class _ClsLevelDispatch(RefCollection):
"""Class-level events on :class:`._Dispatch` classes."""
target = event_key.dispatch_target
assert isinstance(target, type), \
"Class-level Event targets must be classes."
+ if not getattr(target, '_sa_propagate_class_events', True):
+ raise exc.InvalidRequestError(
+ "Can't assign an event directly to the %s class" % target)
stack = [target]
while stack:
cls = stack.pop(0)
self.update_subclass(cls)
else:
if cls not in self._clslevel:
- self._clslevel[cls] = collections.deque()
+ self._assign_cls_collection(cls)
self._clslevel[cls].appendleft(event_key._listen_fn)
registry._stored_in_collection(event_key, self)
target = event_key.dispatch_target
assert isinstance(target, type), \
"Class-level Event targets must be classes."
-
+ if not getattr(target, '_sa_propagate_class_events', True):
+ raise exc.InvalidRequestError(
+ "Can't assign an event directly to the %s class" % target)
stack = [target]
while stack:
cls = stack.pop(0)
self.update_subclass(cls)
else:
if cls not in self._clslevel:
- self._clslevel[cls] = collections.deque()
+ self._assign_cls_collection(cls)
self._clslevel[cls].append(event_key._listen_fn)
registry._stored_in_collection(event_key, self)
+ def _assign_cls_collection(self, target):
+ if getattr(target, '_sa_propagate_class_events', True):
+ self._clslevel[target] = collections.deque()
+ else:
+ self._clslevel[target] = _empty_collection()
+
def update_subclass(self, target):
if target not in self._clslevel:
- self._clslevel[target] = collections.deque()
+ self._assign_cls_collection(target)
clslevel = self._clslevel[target]
for cls in target.__mro__[1:]:
if cls in self._clslevel:
is_(eng1a.pool, eng2.pool)
is_(eng.pool, eng2.pool)
- @testing.requires.ad_hoc_engines
- def test_generative_engine_event_dispatch(self):
- canary = []
-
- def l1(*arg, **kw):
- canary.append("l1")
-
- def l2(*arg, **kw):
- canary.append("l2")
-
- def l3(*arg, **kw):
- canary.append("l3")
-
- eng = engines.testing_engine(options={'execution_options':
- {'base': 'x1'}})
- event.listen(eng, "before_execute", l1)
-
- eng1 = eng.execution_options(foo="b1")
- event.listen(eng, "before_execute", l2)
- event.listen(eng1, "before_execute", l3)
-
- eng.execute(select([1])).close()
- eng1.execute(select([1])).close()
-
- eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
-
- @testing.requires.ad_hoc_engines
- def test_dispose_event(self):
- canary = Mock()
- eng = create_engine(testing.db.url)
- event.listen(eng, "engine_disposed", canary)
-
- conn = eng.connect()
- conn.close()
- eng.dispose()
-
- conn = eng.connect()
- conn.close()
-
- eq_(
- canary.mock_calls,
- [call(eng)]
- )
-
- eng.dispose()
-
- eq_(
- canary.mock_calls,
- [call(eng), call(eng)]
- )
-
@testing.requires.ad_hoc_engines
def test_autocommit_option_no_issue_first_connect(self):
eng = create_engine(testing.db.url)
eq_(c3._execution_options, {'foo': 'bar', 'bar': 'bat'})
eq_(canary, ['execute', 'cursor_execute'])
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_event_dispatch(self):
+ canary = []
+
+ def l1(*arg, **kw):
+ canary.append("l1")
+
+ def l2(*arg, **kw):
+ canary.append("l2")
+
+ def l3(*arg, **kw):
+ canary.append("l3")
+
+ eng = engines.testing_engine(options={'execution_options':
+ {'base': 'x1'}})
+ event.listen(eng, "before_execute", l1)
+
+ eng1 = eng.execution_options(foo="b1")
+ event.listen(eng, "before_execute", l2)
+ event.listen(eng1, "before_execute", l3)
+
+ eng.execute(select([1])).close()
+
+ eq_(canary, ["l1", "l2"])
+
+ eng1.execute(select([1])).close()
+
+ eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
+
+ @testing.requires.ad_hoc_engines
+ def test_clslevel_engine_event_options(self):
+ canary = []
+
+ def l1(*arg, **kw):
+ canary.append("l1")
+
+ def l2(*arg, **kw):
+ canary.append("l2")
+
+ def l3(*arg, **kw):
+ canary.append("l3")
+
+ def l4(*arg, **kw):
+ canary.append("l4")
+
+ event.listen(Engine, "before_execute", l1)
+
+ eng = engines.testing_engine(options={'execution_options':
+ {'base': 'x1'}})
+ event.listen(eng, "before_execute", l2)
+
+ eng1 = eng.execution_options(foo="b1")
+ event.listen(eng, "before_execute", l3)
+ event.listen(eng1, "before_execute", l4)
+
+ eng.execute(select([1])).close()
+
+ eq_(canary, ["l1", "l2", "l3"])
+
+ eng1.execute(select([1])).close()
+
+ eq_(canary, ["l1", "l2", "l3", "l4", "l1", "l2", "l3"])
+
+ @testing.requires.ad_hoc_engines
+ def test_cant_listen_to_option_engine(self):
+ from sqlalchemy.engine import base
+
+ def evt(*arg, **kw):
+ pass
+
+ assert_raises_message(
+ tsa.exc.InvalidRequestError,
+ r"Can't assign an event directly to the "
+ "<class 'sqlalchemy.engine.base.OptionEngine'> class",
+ event.listen, base.OptionEngine, "before_cursor_execute", evt
+ )
+
+ @testing.requires.ad_hoc_engines
+ def test_dispose_event(self):
+ canary = Mock()
+ eng = create_engine(testing.db.url)
+ event.listen(eng, "engine_disposed", canary)
+
+ conn = eng.connect()
+ conn.close()
+ eng.dispose()
+
+ conn = eng.connect()
+ conn.close()
+
+ eq_(
+ canary.mock_calls,
+ [call(eng)]
+ )
+
+ eng.dispose()
+
+ eq_(
+ canary.mock_calls,
+ [call(eng), call(eng)]
+ )
+
def test_retval_flag(self):
canary = []