def _call_check_cancel(destination):
if destination.cancelled():
- if source_loop is None or source_loop is dest_loop:
+ if source_loop is None or source_loop is events._get_running_loop():
source.cancel()
else:
source_loop.call_soon_threadsafe(source.cancel)
if (destination.cancelled() and
dest_loop is not None and dest_loop.is_closed()):
return
- if dest_loop is None or dest_loop is source_loop:
+ if dest_loop is None or dest_loop is events._get_running_loop():
_set_state(destination, source)
else:
if dest_loop.is_closed():
(loop, context), kwargs = callback.call_args
self.assertEqual(context['exception'], exc_context.exception)
+ def test_run_coroutine_threadsafe_and_cancel(self):
+ task = None
+ thread_future = None
+ # Use a custom task factory to capture the created Task
+ def task_factory(loop, coro):
+ nonlocal task
+ task = asyncio.Task(coro, loop=loop)
+ return task
+
+ self.addCleanup(self.loop.set_task_factory,
+ self.loop.get_task_factory())
+
+ async def target():
+ nonlocal thread_future
+ self.loop.set_task_factory(task_factory)
+ thread_future = asyncio.run_coroutine_threadsafe(asyncio.sleep(10), self.loop)
+ await asyncio.sleep(0)
+
+ thread_future.cancel()
+
+ self.loop.run_until_complete(target())
+ self.assertTrue(task.cancelled())
+ self.assertTrue(thread_future.cancelled())
+
class SleepTests(test_utils.TestCase):
def setUp(self):