From: Daniele Varrazzo Date: Tue, 29 Dec 2020 17:09:02 +0000 (+0100) Subject: Make testable, and test, the different waiting implementation X-Git-Tag: 3.0.dev0~228^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a72f43ad728842c3d3b860c2529ba51831a4eee6;p=thirdparty%2Fpsycopg.git Make testable, and test, the different waiting implementation Also fixed segfault calling flush on a closed connection. --- diff --git a/psycopg3/psycopg3/connection.py b/psycopg3/psycopg3/connection.py index 765add5ba..d22433a8e 100644 --- a/psycopg3/psycopg3/connection.py +++ b/psycopg3/psycopg3/connection.py @@ -623,7 +623,7 @@ class AsyncConnection(BaseConnection): @classmethod async def _wait_conn(cls, gen: PQGenConn[RV]) -> RV: - return await waiting.wait_async_conn(gen) + return await waiting.wait_conn_async(gen) def _set_client_encoding(self, name: str) -> None: raise AttributeError( diff --git a/psycopg3/psycopg3/pq/pq_ctypes.py b/psycopg3/psycopg3/pq/pq_ctypes.py index 9c6814826..e09c7e5be 100644 --- a/psycopg3/psycopg3/pq/pq_ctypes.py +++ b/psycopg3/psycopg3/pq/pq_ctypes.py @@ -493,6 +493,8 @@ class PGconn: raise PQerror(f"setting nonblocking failed: {error_message(self)}") def flush(self) -> int: + if not self.pgconn_ptr: + raise PQerror("flushing failed: the connection is closed") rv: int = impl.PQflush(self.pgconn_ptr) if rv < 0: raise PQerror(f"flushing failed: {error_message(self)}") diff --git a/psycopg3/psycopg3/proto.py b/psycopg3/psycopg3/proto.py index 8cb92e588..9234c7211 100644 --- a/psycopg3/psycopg3/proto.py +++ b/psycopg3/psycopg3/proto.py @@ -26,6 +26,7 @@ ConnectionType = TypeVar("ConnectionType", bound="BaseConnection") # Waiting protocol types RV = TypeVar("RV") + PQGenConn = Generator[Tuple[int, "Wait"], "Ready", RV] """Generator for processes where the connection file number can change. diff --git a/psycopg3/psycopg3/waiting.py b/psycopg3/psycopg3/waiting.py index e227aabc7..ba6e005d5 100644 --- a/psycopg3/psycopg3/waiting.py +++ b/psycopg3/psycopg3/waiting.py @@ -31,7 +31,9 @@ class Ready(IntEnum): W = EVENT_WRITE -def wait(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV: +def wait_selector( + gen: PQGen[RV], fileno: int, timeout: Optional[float] = None +) -> RV: """ Wait for a generator using the best strategy available. @@ -142,7 +144,7 @@ async def wait_async(gen: PQGen[RV], fileno: int) -> RV: return rv -async def wait_async_conn(gen: PQGenConn[RV]) -> RV: +async def wait_conn_async(gen: PQGenConn[RV]) -> RV: """ Coroutine waiting for a connection generator to complete. @@ -243,4 +245,6 @@ if ( selectors.DefaultSelector # type: ignore[comparison-overlap] is selectors.EpollSelector ): - wait = wait_epoll # noqa: F811 + wait = wait_epoll +else: + wait = wait_selector diff --git a/psycopg3_c/psycopg3_c/pq/pgconn.pyx b/psycopg3_c/psycopg3_c/pq/pgconn.pyx index 48f453df2..0e250d487 100644 --- a/psycopg3_c/psycopg3_c/pq/pgconn.pyx +++ b/psycopg3_c/psycopg3_c/pq/pgconn.pyx @@ -400,9 +400,11 @@ cdef class PGconn: raise PQerror(f"setting nonblocking failed: {error_message(self)}") def flush(self) -> int: + if self.pgconn_ptr == NULL: + raise PQerror(f"flushing failed: the connection is closed") cdef int rv = libpq.PQflush(self.pgconn_ptr) if rv < 0: - raise PQerror(f"flushing failed:{error_message(self)}") + raise PQerror(f"flushing failed: {error_message(self)}") return rv def get_cancel(self) -> PGcancel: diff --git a/tests/test_waiting.py b/tests/test_waiting.py new file mode 100644 index 000000000..c5f3b71da --- /dev/null +++ b/tests/test_waiting.py @@ -0,0 +1,107 @@ +import select + +import pytest + +import psycopg3 +from psycopg3 import waiting +from psycopg3 import generators +from psycopg3.pq import ConnStatus, ExecStatus + + +skip_no_epoll = pytest.mark.skipif( + not hasattr(select, "epoll"), reason="epoll not available" +) + +timeouts = [ + {}, + {"timeout": None}, + {"timeout": 0}, + {"timeout": 0.1}, + {"timeout": 10}, +] + + +@pytest.mark.parametrize("timeout", timeouts) +def test_wait_conn(dsn, timeout): + gen = generators.connect(dsn) + conn = waiting.wait_conn(gen, **timeout) + assert conn.status == ConnStatus.OK + + +def test_wait_conn_bad(dsn): + gen = generators.connect("dbname=nosuchdb") + with pytest.raises(psycopg3.OperationalError): + waiting.wait_conn(gen) + + +@pytest.mark.parametrize("timeout", timeouts) +def test_wait(pgconn, timeout): + pgconn.send_query(b"select 1") + gen = generators.execute(pgconn) + (res,) = waiting.wait(gen, pgconn.socket, **timeout) + assert res.status == ExecStatus.TUPLES_OK + + +@pytest.mark.parametrize("timeout", timeouts) +def test_wait_selector(pgconn, timeout): + pgconn.send_query(b"select 1") + gen = generators.execute(pgconn) + (res,) = waiting.wait_selector(gen, pgconn.socket, **timeout) + assert res.status == ExecStatus.TUPLES_OK + + +def test_wait_selector_bad(pgconn): + pgconn.send_query(b"select 1") + gen = generators.execute(pgconn) + pgconn.finish() + with pytest.raises(psycopg3.OperationalError): + waiting.wait_selector(gen, pgconn.socket) + + +@skip_no_epoll +@pytest.mark.parametrize("timeout", timeouts) +def test_wait_epoll(pgconn, timeout): + pgconn.send_query(b"select 1") + gen = generators.execute(pgconn) + (res,) = waiting.wait_epoll(gen, pgconn.socket, **timeout) + assert res.status == ExecStatus.TUPLES_OK + + +@skip_no_epoll +def test_wait_epoll_bad(pgconn): + pgconn.send_query(b"select 1") + gen = generators.execute(pgconn) + (res,) = waiting.wait_epoll(gen, pgconn.socket) + assert res.status == ExecStatus.TUPLES_OK + + +@pytest.mark.asyncio +async def test_wait_conn_async(dsn): + gen = generators.connect(dsn) + conn = await waiting.wait_conn_async(gen) + assert conn.status == ConnStatus.OK + + +@pytest.mark.asyncio +async def test_wait_conn_async_bad(dsn): + gen = generators.connect("dbname=nosuchdb") + with pytest.raises(psycopg3.OperationalError): + await waiting.wait_conn_async(gen) + + +@pytest.mark.asyncio +async def test_wait_async(pgconn): + pgconn.send_query(b"select 1") + gen = generators.execute(pgconn) + (res,) = await waiting.wait_async(gen, pgconn.socket) + assert res.status == ExecStatus.TUPLES_OK + + +@pytest.mark.asyncio +async def test_wait_async_bad(pgconn): + pgconn.send_query(b"select 1") + gen = generators.execute(pgconn) + socket = pgconn.socket + pgconn.finish() + with pytest.raises(psycopg3.OperationalError): + await waiting.wait_async(gen, socket)