-import select
+import select # noqa: used in pytest.mark.skipif
import socket
import sys
from psycopg import generators
from psycopg.pq import ConnStatus, ExecStatus
-
-hasepoll = hasattr(select, "epoll")
-skip_no_epoll = pytest.mark.skipif(not hasepoll, reason="epoll not available")
-
-timeouts = [
- {},
- {"timeout": None},
- {"timeout": 0},
- {"timeout": 0.2},
- {"timeout": 10},
-]
-
-
skip_if_not_linux = pytest.mark.skipif(
not sys.platform.startswith("linux"), reason="non-Linux platform"
)
+waitfns = [
+ pytest.param(waiting.wait, id="wait"),
+ pytest.param(waiting.wait_selector, id="wait_selector"),
+ pytest.param(
+ waiting.wait_select,
+ id="wait_select",
+ marks=pytest.mark.skipif("not hasattr(select, 'select')"),
+ ),
+ pytest.param(
+ waiting.wait_epoll,
+ id="wait_epoll",
+ marks=pytest.mark.skipif("not hasattr(select, 'epoll')"),
+ ),
+]
+
+timeouts = [pytest.param({}, id="blank")]
+timeouts += [pytest.param({"timeout": x}, id=str(x)) for x in [None, 0, 0.2, 10]]
+
@pytest.mark.parametrize("timeout", timeouts)
def test_wait_conn(dsn, timeout):
waiting.wait_conn(gen)
-@pytest.mark.parametrize("timeout", timeouts)
-def test_wait(pgconn, timeout):
- pgconn.send_query(b"select 1")
- gen = generators.execute(pgconn)
- (res,) = waiting.wait(gen, pgconn.socket, **timeout)
- assert res.status == ExecStatus.TUPLES_OK
-
-
-waits_and_ids = [
- (waiting.wait, "wait"),
- (waiting.wait_selector, "wait_selector"),
-]
-if hasepoll:
- waits_and_ids.append((waiting.wait_epoll, "wait_epoll"))
-
-waits, wids = list(zip(*waits_and_ids))
-
-
-@pytest.mark.parametrize("waitfn", waits, ids=wids)
+@pytest.mark.parametrize("waitfn", waitfns)
@pytest.mark.parametrize("wait, ready", zip(waiting.Wait, waiting.Ready))
@skip_if_not_linux
def test_wait_ready(waitfn, wait, ready):
assert r & ready
+@pytest.mark.parametrize("waitfn", waitfns)
@pytest.mark.parametrize("timeout", timeouts)
-def test_wait_selector(pgconn, timeout):
+def test_wait(pgconn, waitfn, timeout):
pgconn.send_query(b"select 1")
gen = generators.execute(pgconn)
- (res,) = waiting.wait_selector(gen, pgconn.socket, **timeout)
+ (res,) = waitfn(gen, pgconn.socket, **timeout)
assert res.status == ExecStatus.TUPLES_OK
-def test_wait_selector_bad(pgconn):
+@pytest.mark.parametrize("waitfn", waitfns)
+def test_wait_bad(pgconn, waitfn):
pgconn.send_query(b"select 1")
gen = generators.execute(pgconn)
pgconn.finish()
with pytest.raises(psycopg.OperationalError):
- waiting.wait_selector(gen, pgconn.socket)
-
-
-@skip_no_epoll
-@pytest.mark.parametrize("timeout", timeouts)
-def test_wait_epoll(pgconn, timeout):
- pgconn.send_query(b"select 1")
- gen = generators.execute(pgconn)
- (res,) = waiting.wait_epoll(gen, pgconn.socket, **timeout)
- assert res.status == ExecStatus.TUPLES_OK
-
-
-@skip_no_epoll
-def test_wait_epoll_bad(pgconn):
- pgconn.send_query(b"select 1")
- gen = generators.execute(pgconn)
- (res,) = waiting.wait_epoll(gen, pgconn.socket)
- assert res.status == ExecStatus.TUPLES_OK
+ waitfn(gen, pgconn.socket)
@pytest.mark.parametrize("timeout", timeouts)
await waiting.wait_conn_async(gen)
-@pytest.mark.asyncio
-async def test_wait_async(pgconn):
- pgconn.send_query(b"select 1")
- gen = generators.execute(pgconn)
- (res,) = await waiting.wait_async(gen, pgconn.socket)
- assert res.status == ExecStatus.TUPLES_OK
-
-
@pytest.mark.asyncio
@pytest.mark.parametrize("wait, ready", zip(waiting.Wait, waiting.Ready))
@skip_if_not_linux
assert r & ready
+@pytest.mark.asyncio
+async def test_wait_async(pgconn):
+ pgconn.send_query(b"select 1")
+ gen = generators.execute(pgconn)
+ (res,) = await waiting.wait_async(gen, pgconn.socket)
+ assert res.status == ExecStatus.TUPLES_OK
+
+
@pytest.mark.asyncio
async def test_wait_async_bad(pgconn):
pgconn.send_query(b"select 1")