From: Denis Laxalde Date: Fri, 24 Mar 2023 13:55:18 +0000 (+0100) Subject: feat: use non-blocking cancellation upon Copy termination X-Git-Tag: 3.2.0~54^2~3 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=39319ff481c8d66996a2d4c0877843fb9fe50d0a;p=thirdparty%2Fpsycopg.git feat: use non-blocking cancellation upon Copy termination The logic of Copy termination, in finish(), is reworked so that connection cancellation is invoked from there directly instead of from _end_copy_out_gen() as we cannot call async code from the generator. --- diff --git a/docs/news.rst b/docs/news.rst index 62687abc0..4c130ad0a 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -39,6 +39,8 @@ Psycopg 3.2 (unreleased) (:ticket:`#754`). - Possibly use non-blocking cancellation upon `KeyboardInterrupt` (:ticket:`#754`). +- Possibly use non-blocking cancellation upon `Copy` termination + (:ticket:`#754`). .. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types diff --git a/psycopg/psycopg/_copy.py b/psycopg/psycopg/_copy.py index d7db77e21..844b74de5 100644 --- a/psycopg/psycopg/_copy.py +++ b/psycopg/psycopg/_copy.py @@ -29,6 +29,8 @@ if TYPE_CHECKING: COPY_IN = pq.ExecStatus.COPY_IN COPY_OUT = pq.ExecStatus.COPY_OUT +ACTIVE = pq.TransactionStatus.ACTIVE + class Copy(BaseCopy["Connection[Any]"]): """Manage an asynchronous :sql:`COPY` operation. @@ -148,7 +150,18 @@ class Copy(BaseCopy["Connection[Any]"]): self.writer.finish(exc) self._finished = True else: - self.connection.wait(self._end_copy_out_gen(exc)) + if not exc: + return + if self._pgconn.transaction_status != ACTIVE: + # The server has already finished to send copy data. The connection + # is already in a good state. + return + # Throw a cancel to the server, then consume the rest of the copy data + # (which might or might not have been already transferred entirely to + # the client, so we won't necessary see the exception associated with + # canceling). + self.connection.cancel_safe() + self.connection.wait(self._end_copy_out_gen()) class Writer(ABC): diff --git a/psycopg/psycopg/_copy_async.py b/psycopg/psycopg/_copy_async.py index 7008fbcad..16bc714be 100644 --- a/psycopg/psycopg/_copy_async.py +++ b/psycopg/psycopg/_copy_async.py @@ -26,6 +26,8 @@ if TYPE_CHECKING: COPY_IN = pq.ExecStatus.COPY_IN COPY_OUT = pq.ExecStatus.COPY_OUT +ACTIVE = pq.TransactionStatus.ACTIVE + class AsyncCopy(BaseCopy["AsyncConnection[Any]"]): """Manage an asynchronous :sql:`COPY` operation. @@ -145,7 +147,20 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]): await self.writer.finish(exc) self._finished = True else: - await self.connection.wait(self._end_copy_out_gen(exc)) + if not exc: + return + + if self._pgconn.transaction_status != ACTIVE: + # The server has already finished to send copy data. The connection + # is already in a good state. + return + + # Throw a cancel to the server, then consume the rest of the copy data + # (which might or might not have been already transferred entirely to + # the client, so we won't necessary see the exception associated with + # canceling). + await self.connection.cancel_safe() + await self.connection.wait(self._end_copy_out_gen()) class AsyncWriter(ABC): diff --git a/psycopg/psycopg/_copy_base.py b/psycopg/psycopg/_copy_base.py index 9194b266b..59ae71fef 100644 --- a/psycopg/psycopg/_copy_base.py +++ b/psycopg/psycopg/_copy_base.py @@ -35,8 +35,6 @@ BINARY = pq.Format.BINARY COPY_IN = pq.ExecStatus.COPY_IN COPY_OUT = pq.ExecStatus.COPY_OUT -ACTIVE = pq.TransactionStatus.ACTIVE - # Size of data to accumulate before sending it down the network. We fill a # buffer this size field by field, and when it passes the threshold size # we ship it, so it may end up being bigger than this. @@ -176,20 +174,7 @@ class BaseCopy(Generic[ConnectionType]): return row - def _end_copy_out_gen(self, exc: Optional[BaseException]) -> PQGen[None]: - if not exc: - return - - if self._pgconn.transaction_status != ACTIVE: - # The server has already finished to send copy data. The connection - # is already in a good state. - return - - # Throw a cancel to the server, then consume the rest of the copy data - # (which might or might not have been already transferred entirely to - # the client, so we won't necessary see the exception associated with - # canceling). - self.connection.cancel() + def _end_copy_out_gen(self) -> PQGen[None]: try: while (yield from self._read_gen()): pass