text mode it can be either `!bytes` or `!str`.
"""
data = self.formatter.write(buffer)
- self._write(data)
+ if data:
+ self._write(data)
def write_row(self, row: Sequence[Any]) -> None:
"""Write a record to a table after a :sql:`COPY FROM` operation."""
data = self.formatter.write_row(row)
- self._write(data)
+ if data:
+ self._write(data)
def finish(self, exc: Optional[BaseException]) -> None:
"""Terminate the copy operation and free the resources allocated.
self._worker_error = ex
def _write(self, data: Buffer) -> None:
- if not data:
- return
-
if not self._worker:
# warning: reference loop, broken by _write_end
self._worker = threading.Thread(target=self.worker)
def _write_end(self) -> None:
data = self.formatter.end()
- self._write(data)
+ if data:
+ self._write(data)
self._queue.put(b"")
if self._worker:
async def write(self, buffer: Union[Buffer, str]) -> None:
data = self.formatter.write(buffer)
- await self._write(data)
+ if data:
+ await self._write(data)
async def write_row(self, row: Sequence[Any]) -> None:
data = self.formatter.write_row(row)
- await self._write(data)
+ if data:
+ await self._write(data)
async def finish(self, exc: Optional[BaseException]) -> None:
if self._pgresult.status == COPY_IN:
await self.connection.wait(copy_to(self._pgconn, data))
async def _write(self, data: Buffer) -> None:
- if not data:
- return
-
if not self._worker:
self._worker = create_task(self.worker())
async def _write_end(self) -> None:
data = self.formatter.end()
- await self._write(data)
+ if data:
+ await self._write(data)
await self._queue.put(b"")
if self._worker: