asyncio_options: Dict[str, Any] = {}
if sys.platform == "win32":
- asyncio_options["policy"] = asyncio.WindowsSelectorEventLoopPolicy()
+ asyncio_options[
+ "loop_factory"
+ ] = asyncio.WindowsSelectorEventLoopPolicy().new_event_loop
@pytest.fixture(
# because they rely on the pool initialization outside the asyncio loop.
import asyncio
+import sys
import pytest
@pytest.fixture
-def asyncio_run(anyio_backend_options, recwarn):
+def asyncio_run(recwarn):
"""Fixture reuturning asyncio.run, but managing resources at exit.
In certain runs, fd objects are leaked and the error will only be caught
downstream, by some innocent test calling gc_collect().
"""
- try:
- policy = anyio_backend_options["policy"]
- except KeyError:
- pass
- else:
- asyncio.set_event_loop_policy(policy)
+ if sys.platform == "win32":
+ asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
recwarn.clear()
try:
yield asyncio.run