]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
feat: use non-blocking cancellation upon Copy termination
authorDenis Laxalde <denis.laxalde@dalibo.com>
Fri, 24 Mar 2023 13:55:18 +0000 (14:55 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 9 Apr 2024 10:07:43 +0000 (12:07 +0200)
The logic of Copy termination, in finish(), is reworked so that
connection cancellation is invoked from there directly instead of from
_end_copy_out_gen() as we cannot call async code from the generator.

docs/news.rst
psycopg/psycopg/_copy.py
psycopg/psycopg/_copy_async.py
psycopg/psycopg/_copy_base.py

index 62687abc069f750645c3f8db440d7518b40f23f1..4c130ad0ab6c33ce614ade94fb90dfca7b2d05a5 100644 (file)
@@ -39,6 +39,8 @@ Psycopg 3.2 (unreleased)
   (:ticket:`#754`).
 - Possibly use non-blocking cancellation upon `KeyboardInterrupt`
   (:ticket:`#754`).
+- Possibly use non-blocking cancellation upon `Copy` termination
+  (:ticket:`#754`).
 
 .. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types
 
index d7db77e21c2f339d79cf0c9f60521f99bca6170c..844b74de56763dc3eadfd2d93d9c02c6f260405b 100644 (file)
@@ -29,6 +29,8 @@ if TYPE_CHECKING:
 COPY_IN = pq.ExecStatus.COPY_IN
 COPY_OUT = pq.ExecStatus.COPY_OUT
 
+ACTIVE = pq.TransactionStatus.ACTIVE
+
 
 class Copy(BaseCopy["Connection[Any]"]):
     """Manage an asynchronous :sql:`COPY` operation.
@@ -148,7 +150,18 @@ class Copy(BaseCopy["Connection[Any]"]):
             self.writer.finish(exc)
             self._finished = True
         else:
-            self.connection.wait(self._end_copy_out_gen(exc))
+            if not exc:
+                return
+            if self._pgconn.transaction_status != ACTIVE:
+                # The server has already finished to send copy data. The connection
+                # is already in a good state.
+                return
+            # Throw a cancel to the server, then consume the rest of the copy data
+            # (which might or might not have been already transferred entirely to
+            # the client, so we won't necessary see the exception associated with
+            # canceling).
+            self.connection.cancel_safe()
+            self.connection.wait(self._end_copy_out_gen())
 
 
 class Writer(ABC):
index 7008fbcad8e6b7a6b652784da5b72a7fa6525aff..16bc714be6522ac4232bc22cf76e8633604aa37c 100644 (file)
@@ -26,6 +26,8 @@ if TYPE_CHECKING:
 COPY_IN = pq.ExecStatus.COPY_IN
 COPY_OUT = pq.ExecStatus.COPY_OUT
 
+ACTIVE = pq.TransactionStatus.ACTIVE
+
 
 class AsyncCopy(BaseCopy["AsyncConnection[Any]"]):
     """Manage an asynchronous :sql:`COPY` operation.
@@ -145,7 +147,20 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]):
             await self.writer.finish(exc)
             self._finished = True
         else:
-            await self.connection.wait(self._end_copy_out_gen(exc))
+            if not exc:
+                return
+
+            if self._pgconn.transaction_status != ACTIVE:
+                # The server has already finished to send copy data. The connection
+                # is already in a good state.
+                return
+
+            # Throw a cancel to the server, then consume the rest of the copy data
+            # (which might or might not have been already transferred entirely to
+            # the client, so we won't necessary see the exception associated with
+            # canceling).
+            await self.connection.cancel_safe()
+            await self.connection.wait(self._end_copy_out_gen())
 
 
 class AsyncWriter(ABC):
index 9194b266bca5c66434561a3027ca2b4da5507494..59ae71fef61f69159b8fb8f270f0d8bdfa4abe27 100644 (file)
@@ -35,8 +35,6 @@ BINARY = pq.Format.BINARY
 COPY_IN = pq.ExecStatus.COPY_IN
 COPY_OUT = pq.ExecStatus.COPY_OUT
 
-ACTIVE = pq.TransactionStatus.ACTIVE
-
 # Size of data to accumulate before sending it down the network. We fill a
 # buffer this size field by field, and when it passes the threshold size
 # we ship it, so it may end up being bigger than this.
@@ -176,20 +174,7 @@ class BaseCopy(Generic[ConnectionType]):
 
         return row
 
-    def _end_copy_out_gen(self, exc: Optional[BaseException]) -> PQGen[None]:
-        if not exc:
-            return
-
-        if self._pgconn.transaction_status != ACTIVE:
-            # The server has already finished to send copy data. The connection
-            # is already in a good state.
-            return
-
-        # Throw a cancel to the server, then consume the rest of the copy data
-        # (which might or might not have been already transferred entirely to
-        # the client, so we won't necessary see the exception associated with
-        # canceling).
-        self.connection.cancel()
+    def _end_copy_out_gen(self) -> PQGen[None]:
         try:
             while (yield from self._read_gen()):
                 pass