From: Daniele Varrazzo Date: Wed, 25 May 2022 17:33:18 +0000 (+0100) Subject: perf(copy): avoid to call _write with empty buffer X-Git-Tag: 3.1~67^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=47c81af6733f4f5b1481916dfa5b1ac145bd6541;p=thirdparty%2Fpsycopg.git perf(copy): avoid to call _write with empty buffer There is a call per row we can avoid if the formatter is buffering the data produced. --- diff --git a/psycopg/psycopg/copy.py b/psycopg/psycopg/copy.py index 778a850be..b1fe6dc7d 100644 --- a/psycopg/psycopg/copy.py +++ b/psycopg/psycopg/copy.py @@ -266,12 +266,14 @@ class Copy(BaseCopy["Connection[Any]"]): text mode it can be either `!bytes` or `!str`. """ data = self.formatter.write(buffer) - self._write(data) + if data: + self._write(data) def write_row(self, row: Sequence[Any]) -> None: """Write a record to a table after a :sql:`COPY FROM` operation.""" data = self.formatter.write_row(row) - self._write(data) + if data: + self._write(data) def finish(self, exc: Optional[BaseException]) -> None: """Terminate the copy operation and free the resources allocated. @@ -307,9 +309,6 @@ class Copy(BaseCopy["Connection[Any]"]): self._worker_error = ex def _write(self, data: Buffer) -> None: - if not data: - return - if not self._worker: # warning: reference loop, broken by _write_end self._worker = threading.Thread(target=self.worker) @@ -332,7 +331,8 @@ class Copy(BaseCopy["Connection[Any]"]): def _write_end(self) -> None: data = self.formatter.end() - self._write(data) + if data: + self._write(data) self._queue.put(b"") if self._worker: @@ -388,11 +388,13 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]): async def write(self, buffer: Union[Buffer, str]) -> None: data = self.formatter.write(buffer) - await self._write(data) + if data: + await self._write(data) async def write_row(self, row: Sequence[Any]) -> None: data = self.formatter.write_row(row) - await self._write(data) + if data: + await self._write(data) async def finish(self, exc: Optional[BaseException]) -> None: if self._pgresult.status == COPY_IN: @@ -417,9 +419,6 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]): await self.connection.wait(copy_to(self._pgconn, data)) async def _write(self, data: Buffer) -> None: - if not data: - return - if not self._worker: self._worker = create_task(self.worker()) @@ -435,7 +434,8 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]): async def _write_end(self) -> None: data = self.formatter.end() - await self._write(data) + if data: + await self._write(data) await self._queue.put(b"") if self._worker: