+import asyncio
import inspect
import pytest
)
+def pytest_addoption(parser):
+ parser.addoption(
+ "--loop",
+ choices=["default", "uvloop"],
+ help="The asyncio loop to use for async tests.",
+ )
+
+
+def pytest_report_header(config):
+ loop = config.getoption("--loop")
+ if loop == "default":
+ return []
+
+ return [f"asyncio loop: {loop}"]
+
+
@pytest.fixture
def retries(request):
"""Retry a block in a test a few times before giving up."""
return tenacity.Retrying(
reraise=True, stop=tenacity.stop_after_attempt(3)
)
+
+
+@pytest.fixture
+def event_loop(request):
+ """Return the event loop to test asyncio-marked tests."""
+
+ loop = request.config.getoption("--loop")
+ if loop == "uvloop":
+ import uvloop
+
+ uvloop.install()
+ else:
+ assert loop == "default"
+
+ loop = asyncio.get_event_loop_policy().new_event_loop()
+ yield loop
+ loop.close()
assert stats["connections_num"] == 3
assert stats.get("connections_errors", 0) == 0
assert stats.get("connections_lost", 0) == 0
- assert 600 <= stats["connections_ms"] < 1200
+ assert 580 <= stats["connections_ms"] < 1200
proxy.stop()
await p.check()