(:ticket:`#754`).
- Possibly use non-blocking cancellation upon `KeyboardInterrupt`
(:ticket:`#754`).
+- Possibly use non-blocking cancellation upon `Copy` termination
+ (:ticket:`#754`).
.. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types
COPY_IN = pq.ExecStatus.COPY_IN
COPY_OUT = pq.ExecStatus.COPY_OUT
+ACTIVE = pq.TransactionStatus.ACTIVE
+
class Copy(BaseCopy["Connection[Any]"]):
"""Manage an asynchronous :sql:`COPY` operation.
self.writer.finish(exc)
self._finished = True
else:
- self.connection.wait(self._end_copy_out_gen(exc))
+ if not exc:
+ return
+ if self._pgconn.transaction_status != ACTIVE:
+ # The server has already finished to send copy data. The connection
+ # is already in a good state.
+ return
+ # Throw a cancel to the server, then consume the rest of the copy data
+ # (which might or might not have been already transferred entirely to
+ # the client, so we won't necessary see the exception associated with
+ # canceling).
+ self.connection.cancel_safe()
+ self.connection.wait(self._end_copy_out_gen())
class Writer(ABC):
COPY_IN = pq.ExecStatus.COPY_IN
COPY_OUT = pq.ExecStatus.COPY_OUT
+ACTIVE = pq.TransactionStatus.ACTIVE
+
class AsyncCopy(BaseCopy["AsyncConnection[Any]"]):
"""Manage an asynchronous :sql:`COPY` operation.
await self.writer.finish(exc)
self._finished = True
else:
- await self.connection.wait(self._end_copy_out_gen(exc))
+ if not exc:
+ return
+
+ if self._pgconn.transaction_status != ACTIVE:
+ # The server has already finished to send copy data. The connection
+ # is already in a good state.
+ return
+
+ # Throw a cancel to the server, then consume the rest of the copy data
+ # (which might or might not have been already transferred entirely to
+ # the client, so we won't necessary see the exception associated with
+ # canceling).
+ await self.connection.cancel_safe()
+ await self.connection.wait(self._end_copy_out_gen())
class AsyncWriter(ABC):
COPY_IN = pq.ExecStatus.COPY_IN
COPY_OUT = pq.ExecStatus.COPY_OUT
-ACTIVE = pq.TransactionStatus.ACTIVE
-
# Size of data to accumulate before sending it down the network. We fill a
# buffer this size field by field, and when it passes the threshold size
# we ship it, so it may end up being bigger than this.
return row
- def _end_copy_out_gen(self, exc: Optional[BaseException]) -> PQGen[None]:
- if not exc:
- return
-
- if self._pgconn.transaction_status != ACTIVE:
- # The server has already finished to send copy data. The connection
- # is already in a good state.
- return
-
- # Throw a cancel to the server, then consume the rest of the copy data
- # (which might or might not have been already transferred entirely to
- # the client, so we won't necessary see the exception associated with
- # canceling).
- self.connection.cancel()
+ def _end_copy_out_gen(self) -> PQGen[None]:
try:
while (yield from self._read_gen()):
pass