self._loop.call_exception_handler(context)
self = None # Needed to break cycles when an exception occurs.
+# _ThreadSafeHandle is used for callbacks scheduled with call_soon_threadsafe
+# and is thread safe unlike Handle which is not thread safe.
+class _ThreadSafeHandle(Handle):
+
+ __slots__ = ('_lock',)
+
+ def __init__(self, callback, args, loop, context=None):
+ super().__init__(callback, args, loop, context)
+ self._lock = threading.RLock()
+
+ def cancel(self):
+ with self._lock:
+ return super().cancel()
+
+ def cancelled(self):
+ with self._lock:
+ return super().cancelled()
+
+ def _run(self):
+ # The event loop checks for cancellation without holding the lock
+ # It is possible that the handle is cancelled after the check
+ # but before the callback is called so check it again after acquiring
+ # the lock and return without calling the callback if it is cancelled.
+ with self._lock:
+ if self._cancelled:
+ return
+ return super()._run()
+
class TimerHandle(Handle):
"""Object returned by timed callback registration methods."""
t.join()
self.assertEqual(results, ['hello', 'world'])
+ def test_call_soon_threadsafe_handle_block_check_cancelled(self):
+ results = []
+
+ callback_started = threading.Event()
+ callback_finished = threading.Event()
+ def callback(arg):
+ callback_started.set()
+ results.append(arg)
+ time.sleep(1)
+ callback_finished.set()
+
+ def run_in_thread():
+ handle = self.loop.call_soon_threadsafe(callback, 'hello')
+ self.assertIsInstance(handle, events._ThreadSafeHandle)
+ callback_started.wait()
+ # callback started so it should block checking for cancellation
+ # until it finishes
+ self.assertFalse(handle.cancelled())
+ self.assertTrue(callback_finished.is_set())
+ self.loop.call_soon_threadsafe(self.loop.stop)
+
+ t = threading.Thread(target=run_in_thread)
+ t.start()
+
+ self.loop.run_forever()
+ t.join()
+ self.assertEqual(results, ['hello'])
+
+ def test_call_soon_threadsafe_handle_block_cancellation(self):
+ results = []
+
+ callback_started = threading.Event()
+ callback_finished = threading.Event()
+ def callback(arg):
+ callback_started.set()
+ results.append(arg)
+ time.sleep(1)
+ callback_finished.set()
+
+ def run_in_thread():
+ handle = self.loop.call_soon_threadsafe(callback, 'hello')
+ self.assertIsInstance(handle, events._ThreadSafeHandle)
+ callback_started.wait()
+ # callback started so it cannot be cancelled from other thread until
+ # it finishes
+ handle.cancel()
+ self.assertTrue(callback_finished.is_set())
+ self.loop.call_soon_threadsafe(self.loop.stop)
+
+ t = threading.Thread(target=run_in_thread)
+ t.start()
+
+ self.loop.run_forever()
+ t.join()
+ self.assertEqual(results, ['hello'])
+
+ def test_call_soon_threadsafe_handle_cancel_same_thread(self):
+ results = []
+ callback_started = threading.Event()
+ callback_finished = threading.Event()
+
+ fut = concurrent.futures.Future()
+ def callback(arg):
+ callback_started.set()
+ handle = fut.result()
+ handle.cancel()
+ results.append(arg)
+ callback_finished.set()
+ self.loop.stop()
+
+ def run_in_thread():
+ handle = self.loop.call_soon_threadsafe(callback, 'hello')
+ fut.set_result(handle)
+ self.assertIsInstance(handle, events._ThreadSafeHandle)
+ callback_started.wait()
+ # callback cancels itself from same thread so it has no effect
+ # it runs to completion
+ self.assertTrue(handle.cancelled())
+ self.assertTrue(callback_finished.is_set())
+ self.loop.call_soon_threadsafe(self.loop.stop)
+
+ t = threading.Thread(target=run_in_thread)
+ t.start()
+
+ self.loop.run_forever()
+ t.join()
+ self.assertEqual(results, ['hello'])
+
+ def test_call_soon_threadsafe_handle_cancel_other_thread(self):
+ results = []
+ ev = threading.Event()
+
+ callback_finished = threading.Event()
+ def callback(arg):
+ results.append(arg)
+ callback_finished.set()
+ self.loop.stop()
+
+ def run_in_thread():
+ handle = self.loop.call_soon_threadsafe(callback, 'hello')
+ # handle can be cancelled from other thread if not started yet
+ self.assertIsInstance(handle, events._ThreadSafeHandle)
+ handle.cancel()
+ self.assertTrue(handle.cancelled())
+ self.assertFalse(callback_finished.is_set())
+ ev.set()
+ self.loop.call_soon_threadsafe(self.loop.stop)
+
+ # block the main loop until the callback is added and cancelled in the
+ # other thread
+ self.loop.call_soon(ev.wait)
+ t = threading.Thread(target=run_in_thread)
+ t.start()
+ self.loop.run_forever()
+ t.join()
+ self.assertEqual(results, [])
+ self.assertFalse(callback_finished.is_set())
+
def test_call_soon_threadsafe_same_thread(self):
results = []