get_running_loop_impl = None
get_event_loop_impl = None
+ Task = None
+ Future = None
+
def setUp(self):
self._get_running_loop_saved = events._get_running_loop
self._set_running_loop_saved = events._set_running_loop
self.get_running_loop_saved = events.get_running_loop
self.get_event_loop_saved = events.get_event_loop
+ self._Task_saved = asyncio.Task
+ self._Future_saved = asyncio.Future
events._get_running_loop = type(self)._get_running_loop_impl
events._set_running_loop = type(self)._set_running_loop_impl
asyncio.get_running_loop = type(self).get_running_loop_impl
asyncio.get_event_loop = type(self).get_event_loop_impl
+ asyncio.Task = asyncio.tasks.Task = type(self).Task
+ asyncio.Future = asyncio.futures.Future = type(self).Future
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.get_running_loop = self.get_running_loop_saved
asyncio.get_event_loop = self.get_event_loop_saved
- if sys.platform != 'win32':
+ asyncio.Task = asyncio.tasks.Task = self._Task_saved
+ asyncio.Future = asyncio.futures.Future = self._Future_saved
+ if sys.platform != 'win32':
def test_get_event_loop_new_process(self):
# bpo-32126: The multiprocessing module used by
# ProcessPoolExecutor is not functional when the
get_running_loop_impl = events._py_get_running_loop
get_event_loop_impl = events._py_get_event_loop
+ Task = asyncio.tasks._PyTask
+ Future = asyncio.futures._PyFuture
try:
import _asyncio # NoQA
get_running_loop_impl = events._c_get_running_loop
get_event_loop_impl = events._c_get_event_loop
+ Task = asyncio.tasks._CTask
+ Future = asyncio.futures._CFuture
class TestServer(unittest.TestCase):