--- /dev/null
+.. change::
+ :tags: bug, asyncio
+
+ Fixed issues where a descriptive error message was not raised for some
+ classes of event listening with an async engine, which should instead be a
+ sync engine instance.
\ No newline at end of file
@classmethod
def _accept_with(
cls, target: Union[Engine, Type[Engine], Dialect, Type[Dialect]]
- ) -> Union[Dialect, Type[Dialect]]:
+ ) -> Optional[Union[Dialect, Type[Dialect]]]:
if isinstance(target, type):
if issubclass(target, Engine):
return Dialect
return target
elif isinstance(target, Engine):
return target.dialect
- else:
+ elif isinstance(target, Dialect):
return target
+ elif hasattr(target, "dispatch") and hasattr(
+ target.dispatch._events, "_no_async_engine_events"
+ ):
+ target.dispatch._events._no_async_engine_events()
+ else:
+ return None
def do_connect(
self,
_dispatch_target = AsyncConnectable
@classmethod
- def _listen(cls, event_key, retval=False):
+ def _no_async_engine_events(cls):
raise NotImplementedError(
"asynchronous events are not implemented at this time. Apply "
"synchronous listeners to the AsyncEngine.sync_engine or "
"AsyncConnection.sync_connection attributes."
)
+ @classmethod
+ def _listen(cls, event_key, retval=False):
+ cls._no_async_engine_events()
+
class AsyncSessionEvents(orm_event.SessionEvents):
_target_class_doc = "SomeSession"
_dispatch_target = AsyncSession
@classmethod
- def _listen(cls, event_key, retval=False):
+ def _no_async_engine_events(cls):
raise NotImplementedError(
"asynchronous events are not implemented at this time. Apply "
"synchronous listeners to the AsyncSession.sync_session."
)
+
+ @classmethod
+ def _listen(cls, event_key, retval=False):
+ cls._no_async_engine_events()
@classmethod
def _accept_with(
cls, target: Union[Pool, Type[Pool], Engine, Type[Engine]]
- ) -> Union[Pool, Type[Pool]]:
+ ) -> Optional[Union[Pool, Type[Pool]]]:
if not typing.TYPE_CHECKING:
Engine = util.preloaded.engine.Engine
return target
elif isinstance(target, Engine):
return target.pool
- else:
- assert isinstance(target, Pool)
+ elif isinstance(target, Pool):
return target
+ elif hasattr(target, "dispatch") and hasattr(
+ target.dispatch._events, "_no_async_engine_events"
+ ):
+ target.dispatch._events._no_async_engine_events()
+ else:
+ return None
@classmethod
def _listen( # type: ignore[override] # would rather keep **kw
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_deprecated
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_not
eq_(m1.mock_calls, [call(5, 6), call(9, 10)])
+ def test_real_name_wrong_dispatch(self):
+ m1 = Mock()
+
+ class E1(event.Events):
+ @classmethod
+ def _accept_with(cls, target):
+ if isinstance(target, T1):
+ return target
+ else:
+ m1.yup()
+ return None
+
+ def event_one(self, x, y):
+ pass
+
+ def event_two(self, x):
+ pass
+
+ def event_three(self, x):
+ pass
+
+ class T1:
+ dispatch = event.dispatcher(E1)
+
+ class T2:
+ pass
+
+ class E2(event.Events):
+
+ _dispatch_target = T2
+
+ def event_four(self, x):
+ pass
+
+ with expect_raises_message(
+ exc.InvalidRequestError, "No such event 'event_three'"
+ ):
+
+ @event.listens_for(E2, "event_three")
+ def go(*arg):
+ pass
+
+ eq_(m1.mock_calls, [call.yup()])
+
def test_exec_once_exception(self):
m1 = Mock()
m1.side_effect = ValueError
):
event.listen(conn, "before_cursor_execute", mock.Mock())
+ @async_test
+ async def test_no_async_listeners_dialect_event(self, async_engine):
+ with testing.expect_raises_message(
+ NotImplementedError,
+ "asynchronous events are not implemented "
+ "at this time. Apply synchronous listeners to the "
+ "AsyncEngine.sync_engine or "
+ "AsyncConnection.sync_connection attributes.",
+ ):
+ event.listen(async_engine, "do_execute", mock.Mock())
+
+ @async_test
+ async def test_no_async_listeners_pool_event(self, async_engine):
+ with testing.expect_raises_message(
+ NotImplementedError,
+ "asynchronous events are not implemented "
+ "at this time. Apply synchronous listeners to the "
+ "AsyncEngine.sync_engine or "
+ "AsyncConnection.sync_connection attributes.",
+ ):
+ event.listen(async_engine, "checkout", mock.Mock())
+
@async_test
async def test_sync_before_cursor_execute_engine(self, async_engine):
canary = mock.Mock()