import select
+import socket
+import sys
import pytest
from psycopg.pq import ConnStatus, ExecStatus
-skip_no_epoll = pytest.mark.skipif(
- not hasattr(select, "epoll"), reason="epoll not available"
-)
+hasepoll = hasattr(select, "epoll")
+skip_no_epoll = pytest.mark.skipif(not hasepoll, reason="epoll not available")
timeouts = [
{},
]
+skip_if_not_linux = pytest.mark.skipif(
+ not sys.platform.startswith("linux"), reason="non-Linux platform"
+)
+
+
@pytest.mark.parametrize("timeout", timeouts)
def test_wait_conn(dsn, timeout, retries):
for retry in retries:
assert res.status == ExecStatus.TUPLES_OK
+waits_and_ids = [
+ (waiting.wait, "wait"),
+ (waiting.wait_selector, "wait_selector"),
+]
+if hasepoll:
+ waits_and_ids.append((waiting.wait_epoll, "wait_epoll"))
+
+waits, wids = list(zip(*waits_and_ids))
+
+readys = [waiting.Ready.R, waiting.Ready.W, waiting.Ready.R | waiting.Ready.W]
+
+
+@pytest.mark.parametrize("waitfn", waits, ids=wids)
+@pytest.mark.parametrize("wait, ready", zip(waiting.Wait, readys))
+@skip_if_not_linux
+def test_wait_ready(waitfn, wait, ready):
+ def gen():
+ r = yield wait
+ return r
+
+ with socket.socket() as s:
+ r = waitfn(gen(), s.fileno())
+ assert r & ready
+
+
@pytest.mark.parametrize("timeout", timeouts)
def test_wait_selector(pgconn, timeout):
pgconn.send_query(b"select 1")
assert res.status == ExecStatus.TUPLES_OK
+@pytest.mark.asyncio
+@pytest.mark.parametrize("wait, ready", zip(waiting.Wait, readys))
+@skip_if_not_linux
+async def test_wait_ready_async(wait, ready):
+ def gen():
+ r = yield wait
+ return r
+
+ with socket.socket() as s:
+ r = await waiting.wait_async(gen(), s.fileno())
+ assert r & ready
+
+
@pytest.mark.asyncio
async def test_wait_async_bad(pgconn):
pgconn.send_query(b"select 1")