Tried to use a couple of pytest plugins for retrying but they don't work
well with asyncio.
"pytest >= 6, < 6.1",
"pytest-asyncio >= 0.14.0, < 0.15",
"pytest-randomly >= 3.5, < 3.6",
+ "tenacity >= 6.3, < 6.4",
],
"dev": [
"black",
+import inspect
+
+import pytest
+
pytest_plugins = (
"tests.fix_db",
"tests.fix_pq",
config.addinivalue_line(
"markers", "subprocess: the test import psycopg3 after subprocess"
)
+
+
+@pytest.fixture
+def retries(request):
+ """Retry a block in a test a few times before giving up."""
+ import tenacity
+
+ if inspect.iscoroutinefunction(request.function):
+ return tenacity.AsyncRetrying(
+ reraise=True, stop=tenacity.stop_after_attempt(3)
+ )
+ else:
+ return tenacity.Retrying(
+ reraise=True, stop=tenacity.stop_after_attempt(3)
+ )