From: Daniele Varrazzo Date: Mon, 6 May 2024 16:43:10 +0000 (+0200) Subject: perf(copy): only flush at every row on copy on macOS X-Git-Tag: 3.1.19~3^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=28fc73e5730d594d7340dbd2a686ce01894ba8e5;p=thirdparty%2Fpsycopg.git perf(copy): only flush at every row on copy on macOS Using some benchmark, it seems on Linux performances are worse. Must look into how Windows behave. --- diff --git a/docs/news.rst b/docs/news.rst index 7c0160897..f7ceced69 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -17,6 +17,7 @@ Psycopg 3.1.19 (unreleased) - Fix excessive stripping of error message prefixes (:ticket:`#752`). - Allow to specify the ``connect_timeout`` connection parameter as float (:ticket:`#796`). +- Improve COPY performance on macOS (:ticket:`#745`). Current release diff --git a/psycopg/psycopg/copy.py b/psycopg/psycopg/copy.py index f20c86a24..3ae8edbc9 100644 --- a/psycopg/psycopg/copy.py +++ b/psycopg/psycopg/copy.py @@ -5,6 +5,7 @@ psycopg copy support # Copyright (C) 2020 The Psycopg Team import re +import sys import queue import struct import asyncio @@ -59,6 +60,10 @@ MAX_BUFFER_SIZE = 4 * BUFFER_SIZE # Each buffer should be around BUFFER_SIZE size. QUEUE_SIZE = 1024 +# On certain systems, memmove seems particularly slow and flushing often is +# more performing than accumulating a larger buffer. See #746 for details. +PREFER_FLUSH = sys.platform == "darwin" + class BaseCopy(Generic[ConnectionType]): """ @@ -358,13 +363,15 @@ class LibpqWriter(Writer): if len(data) <= MAX_BUFFER_SIZE: # Most used path: we don't need to split the buffer in smaller # bits, so don't make a copy. - self.connection.wait(copy_to(self._pgconn, data)) + self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH)) else: # Copy a buffer too large in chunks to avoid causing a memory # error in the libpq, which may cause an infinite loop (#255). for i in range(0, len(data), MAX_BUFFER_SIZE): self.connection.wait( - copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE]) + copy_to( + self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH + ) ) def finish(self, exc: Optional[BaseException] = None) -> None: @@ -416,7 +423,7 @@ class QueuedLibpqWriter(LibpqWriter): data = self._queue.get(block=True, timeout=24 * 60 * 60) if not data: break - self.connection.wait(copy_to(self._pgconn, data)) + self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH)) except BaseException as ex: # Propagate the error to the main thread. self._worker_error = ex @@ -572,13 +579,15 @@ class AsyncLibpqWriter(AsyncWriter): if len(data) <= MAX_BUFFER_SIZE: # Most used path: we don't need to split the buffer in smaller # bits, so don't make a copy. - await self.connection.wait(copy_to(self._pgconn, data)) + await self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH)) else: # Copy a buffer too large in chunks to avoid causing a memory # error in the libpq, which may cause an infinite loop (#255). for i in range(0, len(data), MAX_BUFFER_SIZE): await self.connection.wait( - copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE]) + copy_to( + self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH + ) ) async def finish(self, exc: Optional[BaseException] = None) -> None: diff --git a/psycopg/psycopg/generators.py b/psycopg/psycopg/generators.py index aa01f1d62..8e90cb3f5 100644 --- a/psycopg/psycopg/generators.py +++ b/psycopg/psycopg/generators.py @@ -283,7 +283,7 @@ def copy_from(pgconn: PGconn) -> PQGen[Union[memoryview, PGresult]]: return result -def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]: +def copy_to(pgconn: PGconn, buffer: Buffer, flush: bool = True) -> PQGen[None]: # Retry enqueuing data until successful. # # WARNING! This can cause an infinite loop if the buffer is too large. (see @@ -293,15 +293,19 @@ def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]: while pgconn.put_copy_data(buffer) == 0: yield WAIT_W - # Repeat until it the message is flushed to the server - while True: + # Flushing often has a good effect on macOS because memcpy operations + # seem expensive on this platform so accumulating a large buffer has a + # bad effect (see #745). + if flush: + # Repeat until it the message is flushed to the server while True: - ready = yield WAIT_W - if ready: + while True: + ready = yield WAIT_W + if ready: + break + f = pgconn.flush() + if f == 0: break - f = pgconn.flush() - if f == 0: - break def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]: diff --git a/tests/test_copy.py b/tests/test_copy.py index bc3628c6a..0f0a1a78d 100644 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -655,7 +655,7 @@ def test_worker_life(conn, format, buffer): def test_worker_error_propagated(conn, monkeypatch): - def copy_to_broken(pgconn, buffer): + def copy_to_broken(pgconn, buffer, flush=True): raise ZeroDivisionError yield diff --git a/tests/test_copy_async.py b/tests/test_copy_async.py index ea8f23028..558965659 100644 --- a/tests/test_copy_async.py +++ b/tests/test_copy_async.py @@ -658,7 +658,7 @@ async def test_worker_life(aconn, format, buffer): async def test_worker_error_propagated(aconn, monkeypatch): - def copy_to_broken(pgconn, buffer): + def copy_to_broken(pgconn, buffer, flush=True): raise ZeroDivisionError yield