]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
test: add test on wait_select
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 19 Oct 2022 16:33:07 +0000 (17:33 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 11 Dec 2022 19:53:20 +0000 (19:53 +0000)
Refactor tests to make a better use of pytest parametrization and skip
conditions.

tests/test_waiting.py

index aa79d1a591f09ee0ec3e3afeb35302bd22778ff1..acd1e95b139e336d5cd961224fc3d69c32beb785 100644 (file)
@@ -1,4 +1,4 @@
-import select
+import select  # noqa: used in pytest.mark.skipif
 import socket
 import sys
 
@@ -9,23 +9,28 @@ from psycopg import waiting
 from psycopg import generators
 from psycopg.pq import ConnStatus, ExecStatus
 
-
-hasepoll = hasattr(select, "epoll")
-skip_no_epoll = pytest.mark.skipif(not hasepoll, reason="epoll not available")
-
-timeouts = [
-    {},
-    {"timeout": None},
-    {"timeout": 0},
-    {"timeout": 0.2},
-    {"timeout": 10},
-]
-
-
 skip_if_not_linux = pytest.mark.skipif(
     not sys.platform.startswith("linux"), reason="non-Linux platform"
 )
 
+waitfns = [
+    pytest.param(waiting.wait, id="wait"),
+    pytest.param(waiting.wait_selector, id="wait_selector"),
+    pytest.param(
+        waiting.wait_select,
+        id="wait_select",
+        marks=pytest.mark.skipif("not hasattr(select, 'select')"),
+    ),
+    pytest.param(
+        waiting.wait_epoll,
+        id="wait_epoll",
+        marks=pytest.mark.skipif("not hasattr(select, 'epoll')"),
+    ),
+]
+
+timeouts = [pytest.param({}, id="blank")]
+timeouts += [pytest.param({"timeout": x}, id=str(x)) for x in [None, 0, 0.2, 10]]
+
 
 @pytest.mark.parametrize("timeout", timeouts)
 def test_wait_conn(dsn, timeout):
@@ -40,25 +45,7 @@ def test_wait_conn_bad(dsn):
         waiting.wait_conn(gen)
 
 
-@pytest.mark.parametrize("timeout", timeouts)
-def test_wait(pgconn, timeout):
-    pgconn.send_query(b"select 1")
-    gen = generators.execute(pgconn)
-    (res,) = waiting.wait(gen, pgconn.socket, **timeout)
-    assert res.status == ExecStatus.TUPLES_OK
-
-
-waits_and_ids = [
-    (waiting.wait, "wait"),
-    (waiting.wait_selector, "wait_selector"),
-]
-if hasepoll:
-    waits_and_ids.append((waiting.wait_epoll, "wait_epoll"))
-
-waits, wids = list(zip(*waits_and_ids))
-
-
-@pytest.mark.parametrize("waitfn", waits, ids=wids)
+@pytest.mark.parametrize("waitfn", waitfns)
 @pytest.mark.parametrize("wait, ready", zip(waiting.Wait, waiting.Ready))
 @skip_if_not_linux
 def test_wait_ready(waitfn, wait, ready):
@@ -71,37 +58,22 @@ def test_wait_ready(waitfn, wait, ready):
     assert r & ready
 
 
+@pytest.mark.parametrize("waitfn", waitfns)
 @pytest.mark.parametrize("timeout", timeouts)
-def test_wait_selector(pgconn, timeout):
+def test_wait(pgconn, waitfn, timeout):
     pgconn.send_query(b"select 1")
     gen = generators.execute(pgconn)
-    (res,) = waiting.wait_selector(gen, pgconn.socket, **timeout)
+    (res,) = waitfn(gen, pgconn.socket, **timeout)
     assert res.status == ExecStatus.TUPLES_OK
 
 
-def test_wait_selector_bad(pgconn):
+@pytest.mark.parametrize("waitfn", waitfns)
+def test_wait_bad(pgconn, waitfn):
     pgconn.send_query(b"select 1")
     gen = generators.execute(pgconn)
     pgconn.finish()
     with pytest.raises(psycopg.OperationalError):
-        waiting.wait_selector(gen, pgconn.socket)
-
-
-@skip_no_epoll
-@pytest.mark.parametrize("timeout", timeouts)
-def test_wait_epoll(pgconn, timeout):
-    pgconn.send_query(b"select 1")
-    gen = generators.execute(pgconn)
-    (res,) = waiting.wait_epoll(gen, pgconn.socket, **timeout)
-    assert res.status == ExecStatus.TUPLES_OK
-
-
-@skip_no_epoll
-def test_wait_epoll_bad(pgconn):
-    pgconn.send_query(b"select 1")
-    gen = generators.execute(pgconn)
-    (res,) = waiting.wait_epoll(gen, pgconn.socket)
-    assert res.status == ExecStatus.TUPLES_OK
+        waitfn(gen, pgconn.socket)
 
 
 @pytest.mark.parametrize("timeout", timeouts)
@@ -119,14 +91,6 @@ async def test_wait_conn_async_bad(dsn):
         await waiting.wait_conn_async(gen)
 
 
-@pytest.mark.asyncio
-async def test_wait_async(pgconn):
-    pgconn.send_query(b"select 1")
-    gen = generators.execute(pgconn)
-    (res,) = await waiting.wait_async(gen, pgconn.socket)
-    assert res.status == ExecStatus.TUPLES_OK
-
-
 @pytest.mark.asyncio
 @pytest.mark.parametrize("wait, ready", zip(waiting.Wait, waiting.Ready))
 @skip_if_not_linux
@@ -140,6 +104,14 @@ async def test_wait_ready_async(wait, ready):
     assert r & ready
 
 
+@pytest.mark.asyncio
+async def test_wait_async(pgconn):
+    pgconn.send_query(b"select 1")
+    gen = generators.execute(pgconn)
+    (res,) = await waiting.wait_async(gen, pgconn.socket)
+    assert res.status == ExecStatus.TUPLES_OK
+
+
 @pytest.mark.asyncio
 async def test_wait_async_bad(pgconn):
     pgconn.send_query(b"select 1")