From: Denis Laxalde Date: Mon, 8 Apr 2024 08:00:04 +0000 (+0200) Subject: feat: fall back to cancel() in cancel_safe() for libpq < 17 X-Git-Tag: 3.2.0~54^2~4 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=472fb48ed76bb0e066ad2b14681e707e5a9d0ccd;p=thirdparty%2Fpsycopg.git feat: fall back to cancel() in cancel_safe() for libpq < 17 We run this in a thread executor in the AsyncConnection. As asyncio's to_thread() is not available in Python 3.8, so we add a compat layer. --- diff --git a/psycopg/psycopg/_compat.py b/psycopg/psycopg/_compat.py index 68d689a2d..8d5d48aa9 100644 --- a/psycopg/psycopg/_compat.py +++ b/psycopg/psycopg/_compat.py @@ -5,18 +5,29 @@ compatibility functions for different Python versions # Copyright (C) 2021 The Psycopg Team import sys +from functools import partial +from typing import Any if sys.version_info >= (3, 9): + from asyncio import to_thread from zoneinfo import ZoneInfo from functools import cache from collections import Counter, deque as Deque + from collections.abc import Callable else: - from typing import Counter, Deque + import asyncio + from typing import Callable, Counter, Deque from functools import lru_cache from backports.zoneinfo import ZoneInfo cache = lru_cache(maxsize=None) + async def to_thread(func: Callable[..., Any], /, *args: Any, **kwargs: Any) -> None: + loop = asyncio.get_running_loop() + func_call = partial(func, *args, **kwargs) + await loop.run_in_executor(None, func_call) + + if sys.version_info >= (3, 10): from typing import TypeGuard, TypeAlias else: @@ -42,4 +53,5 @@ __all__ = [ "TypeVar", "ZoneInfo", "cache", + "to_thread", ] diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index b5430122d..f7e90d92a 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -271,11 +271,14 @@ class Connection(BaseConnection[Row]): In contrast with `cancel()`, it is not appropriate for use in a signal handler. - :raises ~psycopg.NotSupportedError: if the underlying libpq is older - than version 17. + If the underlying libpq is older than version 17, fall back to + `cancel()`. """ if self._should_cancel(): - waiting.wait_conn(self._cancel_gen(), interval=_WAIT_INTERVAL) + try: + waiting.wait_conn(self._cancel_gen(), interval=_WAIT_INTERVAL) + except e.NotSupportedError: + self.cancel() @contextmanager def transaction( @@ -382,10 +385,7 @@ class Connection(BaseConnection[Row]): if self.pgconn.transaction_status == ACTIVE: # On Ctrl-C, try to cancel the query in the server, otherwise # the connection will remain stuck in ACTIVE state. - try: - self.cancel_safe() - except e.NotSupportedError: - self.cancel() + self.cancel_safe() try: waiting.wait(gen, self.pgconn.socket, interval=interval) except e.QueryCanceled: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 3b2022a69..ab370db13 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -36,6 +36,7 @@ if True: # ASYNC import sys import asyncio from asyncio import Lock + from ._compat import to_thread else: from threading import Lock @@ -287,11 +288,19 @@ class AsyncConnection(BaseConnection[Row]): In contrast with `cancel()`, it is not appropriate for use in a signal handler. - :raises ~psycopg.NotSupportedError: if the underlying libpq is older - than version 17. + If the underlying libpq is older than version 17, fall back to + `cancel()`. """ if self._should_cancel(): - await waiting.wait_conn_async(self._cancel_gen(), interval=_WAIT_INTERVAL) + try: + await waiting.wait_conn_async( + self._cancel_gen(), interval=_WAIT_INTERVAL + ) + except e.NotSupportedError: + if True: # ASYNC + await to_thread(self.cancel) + else: + self.cancel() @asynccontextmanager async def transaction( @@ -400,10 +409,7 @@ class AsyncConnection(BaseConnection[Row]): if self.pgconn.transaction_status == ACTIVE: # On Ctrl-C, try to cancel the query in the server, otherwise # the connection will remain stuck in ACTIVE state. - try: - await self.cancel_safe() - except e.NotSupportedError: - self.cancel() + await self.cancel_safe() try: await waiting.wait_async(gen, self.pgconn.socket, interval=interval) except e.QueryCanceled: diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index 530bbd364..099076b51 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -159,10 +159,7 @@ class Cursor(BaseCursor["Connection[Any]", Row]): if self._pgconn.transaction_status == ACTIVE: # Try to cancel the query, then consume the results # already received. - try: - self._conn.cancel_safe() - except e.NotSupportedError: - self._conn.cancel() + self._conn.cancel_safe() try: while self._conn.wait(self._stream_fetchone_gen(first=False)): pass diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py index 2ea18ee54..485cf0e93 100644 --- a/psycopg/psycopg/cursor_async.py +++ b/psycopg/psycopg/cursor_async.py @@ -164,10 +164,7 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): if self._pgconn.transaction_status == ACTIVE: # Try to cancel the query, then consume the results # already received. - try: - await self._conn.cancel_safe() - except e.NotSupportedError: - self._conn.cancel() + await self._conn.cancel_safe() try: while await self._conn.wait( self._stream_fetchone_gen(first=False) diff --git a/tests/test_concurrency_async.py b/tests/test_concurrency_async.py index 98d659d68..bd6247c7d 100644 --- a/tests/test_concurrency_async.py +++ b/tests/test_concurrency_async.py @@ -61,10 +61,7 @@ async def test_concurrent_execution(aconn_cls, dsn): async def canceller(aconn, errors): try: await asyncio.sleep(0.5) - try: - await aconn.cancel_safe() - except e.NotSupportedError: - aconn.cancel() + await aconn.cancel_safe() except Exception as exc: errors.append(exc)