We run this in a thread executor in the AsyncConnection.
As asyncio's to_thread() is not available in Python 3.8, so we add a
compat layer.
# Copyright (C) 2021 The Psycopg Team
import sys
+from functools import partial
+from typing import Any
if sys.version_info >= (3, 9):
+ from asyncio import to_thread
from zoneinfo import ZoneInfo
from functools import cache
from collections import Counter, deque as Deque
+ from collections.abc import Callable
else:
- from typing import Counter, Deque
+ import asyncio
+ from typing import Callable, Counter, Deque
from functools import lru_cache
from backports.zoneinfo import ZoneInfo
cache = lru_cache(maxsize=None)
+ async def to_thread(func: Callable[..., Any], /, *args: Any, **kwargs: Any) -> None:
+ loop = asyncio.get_running_loop()
+ func_call = partial(func, *args, **kwargs)
+ await loop.run_in_executor(None, func_call)
+
+
if sys.version_info >= (3, 10):
from typing import TypeGuard, TypeAlias
else:
"TypeVar",
"ZoneInfo",
"cache",
+ "to_thread",
]
In contrast with `cancel()`, it is not appropriate for use in a signal
handler.
- :raises ~psycopg.NotSupportedError: if the underlying libpq is older
- than version 17.
+ If the underlying libpq is older than version 17, fall back to
+ `cancel()`.
"""
if self._should_cancel():
- waiting.wait_conn(self._cancel_gen(), interval=_WAIT_INTERVAL)
+ try:
+ waiting.wait_conn(self._cancel_gen(), interval=_WAIT_INTERVAL)
+ except e.NotSupportedError:
+ self.cancel()
@contextmanager
def transaction(
if self.pgconn.transaction_status == ACTIVE:
# On Ctrl-C, try to cancel the query in the server, otherwise
# the connection will remain stuck in ACTIVE state.
- try:
- self.cancel_safe()
- except e.NotSupportedError:
- self.cancel()
+ self.cancel_safe()
try:
waiting.wait(gen, self.pgconn.socket, interval=interval)
except e.QueryCanceled:
import sys
import asyncio
from asyncio import Lock
+ from ._compat import to_thread
else:
from threading import Lock
In contrast with `cancel()`, it is not appropriate for use in a signal
handler.
- :raises ~psycopg.NotSupportedError: if the underlying libpq is older
- than version 17.
+ If the underlying libpq is older than version 17, fall back to
+ `cancel()`.
"""
if self._should_cancel():
- await waiting.wait_conn_async(self._cancel_gen(), interval=_WAIT_INTERVAL)
+ try:
+ await waiting.wait_conn_async(
+ self._cancel_gen(), interval=_WAIT_INTERVAL
+ )
+ except e.NotSupportedError:
+ if True: # ASYNC
+ await to_thread(self.cancel)
+ else:
+ self.cancel()
@asynccontextmanager
async def transaction(
if self.pgconn.transaction_status == ACTIVE:
# On Ctrl-C, try to cancel the query in the server, otherwise
# the connection will remain stuck in ACTIVE state.
- try:
- await self.cancel_safe()
- except e.NotSupportedError:
- self.cancel()
+ await self.cancel_safe()
try:
await waiting.wait_async(gen, self.pgconn.socket, interval=interval)
except e.QueryCanceled:
if self._pgconn.transaction_status == ACTIVE:
# Try to cancel the query, then consume the results
# already received.
- try:
- self._conn.cancel_safe()
- except e.NotSupportedError:
- self._conn.cancel()
+ self._conn.cancel_safe()
try:
while self._conn.wait(self._stream_fetchone_gen(first=False)):
pass
if self._pgconn.transaction_status == ACTIVE:
# Try to cancel the query, then consume the results
# already received.
- try:
- await self._conn.cancel_safe()
- except e.NotSupportedError:
- self._conn.cancel()
+ await self._conn.cancel_safe()
try:
while await self._conn.wait(
self._stream_fetchone_gen(first=False)
async def canceller(aconn, errors):
try:
await asyncio.sleep(0.5)
- try:
- await aconn.cancel_safe()
- except e.NotSupportedError:
- aconn.cancel()
+ await aconn.cancel_safe()
except Exception as exc:
errors.append(exc)