From: Denis Laxalde Date: Fri, 24 Mar 2023 13:08:20 +0000 (+0100) Subject: feat: add generators.cancel() X-Git-Tag: 3.2.0~54^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=af98467e7d1c4f257bd664b14dd20c7b5d48cf50;p=thirdparty%2Fpsycopg.git feat: add generators.cancel() This is a PQGenConn generator as the socket for the PGcancelConn needs to be retrieved after (at least) the first poll() call. --- diff --git a/psycopg/psycopg/generators.py b/psycopg/psycopg/generators.py index 96143af93..0c6098cf6 100644 --- a/psycopg/psycopg/generators.py +++ b/psycopg/psycopg/generators.py @@ -27,7 +27,7 @@ from typing import List, Optional, Union from . import pq from . import errors as e from .abc import Buffer, PipelineCommand, PQGen, PQGenConn -from .pq.abc import PGconn, PGresult +from .pq.abc import PGcancelConn, PGconn, PGresult from .waiting import Wait, Ready from ._compat import Deque from ._cmodule import _psycopg @@ -100,6 +100,23 @@ def _connect(conninfo: str, *, timeout: float = 0.0) -> PQGenConn[PGconn]: return conn +def _cancel(cancel_conn: PGcancelConn) -> PQGenConn[None]: + while True: + status = cancel_conn.poll() + if status == POLL_OK: + break + elif status == POLL_READING: + yield cancel_conn.socket, WAIT_R + elif status == POLL_WRITING: + yield cancel_conn.socket, WAIT_W + elif status == POLL_FAILED: + raise e.OperationalError( + f"cancellation failed: {cancel_conn.error_message}" + ) + else: + raise e.InternalError(f"unexpected poll status: {status}") + + def _execute(pgconn: PGconn) -> PQGen[List[PGresult]]: """ Generator sending a query and returning results without blocking. @@ -357,6 +374,7 @@ def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]: # Override functions with fast versions if available if _psycopg: connect = _psycopg.connect + cancel = _psycopg.cancel execute = _psycopg.execute send = _psycopg.send fetch_many = _psycopg.fetch_many @@ -365,6 +383,7 @@ if _psycopg: else: connect = _connect + cancel = _cancel execute = _execute send = _send fetch_many = _fetch_many diff --git a/psycopg_c/psycopg_c/_psycopg.pyi b/psycopg_c/psycopg_c/_psycopg.pyi index ec976eb5c..388150191 100644 --- a/psycopg_c/psycopg_c/_psycopg.pyi +++ b/psycopg_c/psycopg_c/_psycopg.pyi @@ -7,12 +7,12 @@ information. Will submit a bug. # Copyright (C) 2020 The Psycopg Team -from typing import Any, Iterable, List, Optional, Sequence, Tuple +from typing import Any, List, Optional, Sequence, Tuple from psycopg import pq, abc, BaseConnection from psycopg.rows import Row, RowMaker from psycopg.adapt import AdaptersMap, PyFormat -from psycopg.pq.abc import PGconn, PGresult +from psycopg.pq.abc import PGcancelConn, PGconn, PGresult from psycopg._compat import Deque class Transformer(abc.AdaptContext): @@ -52,6 +52,7 @@ class Transformer(abc.AdaptContext): # Generators def connect(conninfo: str, *, timeout: float = 0.0) -> abc.PQGenConn[PGconn]: ... +def cancel(cancel_conn: PGcancelConn) -> abc.PQGenConn[None]: ... def execute(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ... def send(pgconn: PGconn) -> abc.PQGen[None]: ... def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ... diff --git a/psycopg_c/psycopg_c/_psycopg/generators.pyx b/psycopg_c/psycopg_c/_psycopg/generators.pyx index 1b2be5f60..7198fddf1 100644 --- a/psycopg_c/psycopg_c/_psycopg/generators.pyx +++ b/psycopg_c/psycopg_c/_psycopg/generators.pyx @@ -81,6 +81,26 @@ def connect(conninfo: str, *, timeout: float = 0.0) -> PQGenConn[abc.PGconn]: return conn +def cancel(pq.PGcancelConn cancel_conn) -> PQGenConn[None]: + cdef libpq.PGcancelConn *pgcancelconn_ptr = cancel_conn.pgcancelconn_ptr + cdef int status + while True: + with nogil: + status = libpq.PQcancelPoll(pgcancelconn_ptr) + if status == libpq.PGRES_POLLING_OK: + break + elif status == libpq.PGRES_POLLING_READING: + yield libpq.PQcancelSocket(pgcancelconn_ptr), WAIT_R + elif status == libpq.PGRES_POLLING_WRITING: + yield libpq.PQcancelSocket(pgcancelconn_ptr), WAIT_W + elif status == libpq.PGRES_POLLING_FAILED: + raise e.OperationalError( + f"cancellation failed: {cancel_conn.error_message}" + ) + else: + raise e.InternalError(f"unexpected poll status: {status}") + + def execute(pq.PGconn pgconn) -> PQGen[List[abc.PGresult]]: """ Generator sending a query and returning results without blocking. diff --git a/tests/test_generators.py b/tests/test_generators.py index 2df55e3e0..8397c203b 100644 --- a/tests/test_generators.py +++ b/tests/test_generators.py @@ -1,3 +1,4 @@ +import time from collections import deque from functools import partial from typing import List @@ -44,6 +45,30 @@ def test_connect_operationalerror_pgconn(generators, dsn, monkeypatch): pgconn.exec_(b"select 1") +@pytest.mark.libpq(">= 17") +def test_cancel(pgconn, conn, generators): + pgconn.send_query_params(b"SELECT pg_sleep($1)", [b"180"]) + while not conn.execute( + "SELECT count(*) FROM pg_stat_activity" + " WHERE query = 'SELECT pg_sleep($1)'" + " AND state = 'active'" + ).fetchone(): + time.sleep(0.01) + cancel_conn = pgconn.cancel_conn() + assert cancel_conn.status != pq.ConnStatus.BAD + cancel_conn.start() + gen = generators.cancel(cancel_conn) + waiting.wait_conn(gen) + assert cancel_conn.status == pq.ConnStatus.OK + + res = pgconn.get_result() + assert res is not None + assert res.status == pq.ExecStatus.FATAL_ERROR + assert res.error_field(pq.DiagnosticField.SQLSTATE) == b"57014" + while pgconn.is_busy(): + pgconn.consume_input() + + @pytest.fixture def pipeline(pgconn): nb, pgconn.nonblocking = pgconn.nonblocking, True