def _copy_future_state(source, dest):
"""Internal helper to copy state from another Future.
- The other Future may be a concurrent.futures.Future.
+ The other Future must be a concurrent.futures.Future.
"""
- assert source.done()
if dest.cancelled():
return
assert not dest.done()
- if source.cancelled():
+ done, cancelled, result, exception = source._get_snapshot()
+ assert done
+ if cancelled:
dest.cancel()
+ elif exception is not None:
+ dest.set_exception(_convert_future_exc(exception))
else:
- exception = source.exception()
- if exception is not None:
- dest.set_exception(_convert_future_exc(exception))
- else:
- result = source.result()
- dest.set_result(result)
-
+ dest.set_result(result)
def _chain_future(source, destination):
"""Chain two futures so that when one completes, so does the other.
self._condition.notify_all()
self._invoke_callbacks()
+ def _get_snapshot(self):
+ """Get a snapshot of the future's current state.
+
+ This method atomically retrieves the state in one lock acquisition,
+ which is significantly faster than multiple method calls.
+
+ Returns:
+ Tuple of (done, cancelled, result, exception)
+ - done: True if the future is done (cancelled or finished)
+ - cancelled: True if the future was cancelled
+ - result: The result if available and not cancelled
+ - exception: The exception if available and not cancelled
+ """
+ # Fast path: check if already finished without lock
+ if self._state == FINISHED:
+ return True, False, self._result, self._exception
+
+ # Need lock for other states since they can change
+ with self._condition:
+ # We have to check the state again after acquiring the lock
+ # because it may have changed in the meantime.
+ if self._state == FINISHED:
+ return True, False, self._result, self._exception
+ if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
+ return True, True, None, None
+ return False, False, None, None
+
__class_getitem__ = classmethod(types.GenericAlias)
class Executor(object):
def test_copy_state(self):
from asyncio.futures import _copy_future_state
- f = self._new_future(loop=self.loop)
+ f = concurrent.futures.Future()
f.set_result(10)
newf = self._new_future(loop=self.loop)
self.assertTrue(newf.done())
self.assertEqual(newf.result(), 10)
- f_exception = self._new_future(loop=self.loop)
+ f_exception = concurrent.futures.Future()
f_exception.set_exception(RuntimeError())
newf_exception = self._new_future(loop=self.loop)
self.assertTrue(newf_exception.done())
self.assertRaises(RuntimeError, newf_exception.result)
- f_cancelled = self._new_future(loop=self.loop)
+ f_cancelled = concurrent.futures.Future()
f_cancelled.cancel()
newf_cancelled = self._new_future(loop=self.loop)
except BaseException as e:
f_exc = e
- f_conexc = self._new_future(loop=self.loop)
+ f_conexc = concurrent.futures.Future()
f_conexc.set_exception(f_exc)
newf_conexc = self._new_future(loop=self.loop)
newf_tb = ''.join(traceback.format_tb(newf_exc.__traceback__))
self.assertEqual(newf_tb.count('raise concurrent.futures.InvalidStateError'), 1)
+ def test_copy_state_from_concurrent_futures(self):
+ """Test _copy_future_state from concurrent.futures.Future.
+
+ This tests the optimized path using _get_snapshot when available.
+ """
+ from asyncio.futures import _copy_future_state
+
+ # Test with a result
+ f_concurrent = concurrent.futures.Future()
+ f_concurrent.set_result(42)
+ f_asyncio = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent, f_asyncio)
+ self.assertTrue(f_asyncio.done())
+ self.assertEqual(f_asyncio.result(), 42)
+
+ # Test with an exception
+ f_concurrent_exc = concurrent.futures.Future()
+ f_concurrent_exc.set_exception(ValueError("test exception"))
+ f_asyncio_exc = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent_exc, f_asyncio_exc)
+ self.assertTrue(f_asyncio_exc.done())
+ with self.assertRaises(ValueError) as cm:
+ f_asyncio_exc.result()
+ self.assertEqual(str(cm.exception), "test exception")
+
+ # Test with cancelled state
+ f_concurrent_cancelled = concurrent.futures.Future()
+ f_concurrent_cancelled.cancel()
+ f_asyncio_cancelled = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent_cancelled, f_asyncio_cancelled)
+ self.assertTrue(f_asyncio_cancelled.cancelled())
+
+ # Test that destination already cancelled prevents copy
+ f_concurrent_result = concurrent.futures.Future()
+ f_concurrent_result.set_result(10)
+ f_asyncio_precancelled = self._new_future(loop=self.loop)
+ f_asyncio_precancelled.cancel()
+ _copy_future_state(f_concurrent_result, f_asyncio_precancelled)
+ self.assertTrue(f_asyncio_precancelled.cancelled())
+
+ # Test exception type conversion
+ f_concurrent_invalid = concurrent.futures.Future()
+ f_concurrent_invalid.set_exception(concurrent.futures.InvalidStateError("invalid"))
+ f_asyncio_invalid = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent_invalid, f_asyncio_invalid)
+ self.assertTrue(f_asyncio_invalid.done())
+ with self.assertRaises(asyncio.exceptions.InvalidStateError) as cm:
+ f_asyncio_invalid.result()
+ self.assertEqual(str(cm.exception), "invalid")
+
def test_iter(self):
fut = self._new_future(loop=self.loop)
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
from test import support
+from test.support import threading_helper
from .util import (
PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE,
self.assertEqual(f.exception(), e)
+ def test_get_snapshot(self):
+ """Test the _get_snapshot method for atomic state retrieval."""
+ # Test with a pending future
+ f = Future()
+ done, cancelled, result, exception = f._get_snapshot()
+ self.assertFalse(done)
+ self.assertFalse(cancelled)
+ self.assertIsNone(result)
+ self.assertIsNone(exception)
+
+ # Test with a finished future (successful result)
+ f = Future()
+ f.set_result(42)
+ done, cancelled, result, exception = f._get_snapshot()
+ self.assertTrue(done)
+ self.assertFalse(cancelled)
+ self.assertEqual(result, 42)
+ self.assertIsNone(exception)
+
+ # Test with a finished future (exception)
+ f = Future()
+ exc = ValueError("test error")
+ f.set_exception(exc)
+ done, cancelled, result, exception = f._get_snapshot()
+ self.assertTrue(done)
+ self.assertFalse(cancelled)
+ self.assertIsNone(result)
+ self.assertIs(exception, exc)
+
+ # Test with a cancelled future
+ f = Future()
+ f.cancel()
+ done, cancelled, result, exception = f._get_snapshot()
+ self.assertTrue(done)
+ self.assertTrue(cancelled)
+ self.assertIsNone(result)
+ self.assertIsNone(exception)
+
+ # Test concurrent access (basic thread safety check)
+ f = Future()
+ f.set_result(100)
+ results = []
+
+ def get_snapshot():
+ for _ in range(1000):
+ snapshot = f._get_snapshot()
+ results.append(snapshot)
+
+ threads = [threading.Thread(target=get_snapshot) for _ in range(4)]
+ with threading_helper.start_threads(threads):
+ pass
+ # All snapshots should be identical for a finished future
+ expected = (True, False, 100, None)
+ for result in results:
+ self.assertEqual(result, expected)
+
def setUpModule():
setup_module()
--- /dev/null
+Speed up :mod:`asyncio` performance of transferring state from thread
+pool :class:`concurrent.futures.Future` by up to 4.4x. Patch by J. Nick
+Koston.