"""Tests for events.py."""
import concurrent.futures
+import contextlib
import functools
import io
import multiprocessing
async def doit():
return 'hello'
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- return loop.run_until_complete(doit())
+ with contextlib.closing(asyncio.new_event_loop()) as loop:
+ asyncio.set_event_loop(loop)
+ return loop.run_until_complete(doit())
class CoroLike:
def test_get_running_loop_already_running(self):
async def main():
running_loop = asyncio.get_running_loop()
- loop = asyncio.new_event_loop()
- try:
- loop.run_forever()
- except RuntimeError:
- pass
- else:
- self.fail("RuntimeError not raised")
+ with contextlib.closing(asyncio.new_event_loop()) as loop:
+ try:
+ loop.run_forever()
+ except RuntimeError:
+ pass
+ else:
+ self.fail("RuntimeError not raised")
self.assertIs(asyncio.get_running_loop(), running_loop)