From 2f5ea2bff16fbeb8266024e2a731e0659cc8a433 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Mon, 6 May 2024 18:43:10 +0200 Subject: [PATCH] perf(copy): only flush at every row on copy on macOS With some benchmark, it seems that on Linux and Windows performances are worse. See #746 for details. --- docs/news.rst | 1 + psycopg/psycopg/_copy.py | 10 ++++++---- psycopg/psycopg/_copy_async.py | 12 ++++++++---- psycopg/psycopg/_copy_base.py | 5 +++++ psycopg/psycopg/generators.py | 20 ++++++++++++-------- tests/test_copy.py | 2 +- tests/test_copy_async.py | 2 +- 7 files changed, 34 insertions(+), 18 deletions(-) diff --git a/docs/news.rst b/docs/news.rst index 9a62899bb..dc8bf2a35 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -53,6 +53,7 @@ Psycopg 3.1.19 (unreleased) - Fix excessive stripping of error message prefixes (:ticket:`#752`). - Allow to specify the ``connect_timeout`` connection parameter as float (:ticket:`#796`). +- Improve COPY performance on macOS (:ticket:`#745`). Current release diff --git a/psycopg/psycopg/_copy.py b/psycopg/psycopg/_copy.py index 2b0e77ca7..8d779cc59 100644 --- a/psycopg/psycopg/_copy.py +++ b/psycopg/psycopg/_copy.py @@ -16,7 +16,7 @@ from typing import Any, Iterator, Type, Tuple, Sequence, TYPE_CHECKING from . import pq from . import errors as e from ._compat import Self -from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE +from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE, PREFER_FLUSH from .generators import copy_to, copy_end from ._encodings import pgconn_encoding from ._acompat import spawn, gather, Queue, Worker @@ -199,13 +199,15 @@ class LibpqWriter(Writer): if len(data) <= MAX_BUFFER_SIZE: # Most used path: we don't need to split the buffer in smaller # bits, so don't make a copy. - self.connection.wait(copy_to(self._pgconn, data)) + self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH)) else: # Copy a buffer too large in chunks to avoid causing a memory # error in the libpq, which may cause an infinite loop (#255). for i in range(0, len(data), MAX_BUFFER_SIZE): self.connection.wait( - copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE]) + copy_to( + self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH + ) ) def finish(self, exc: BaseException | None = None) -> None: @@ -259,7 +261,7 @@ class QueuedLibpqWriter(LibpqWriter): data = self._queue.get() if not data: break - self.connection.wait(copy_to(self._pgconn, data)) + self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH)) except BaseException as ex: # Propagate the error to the main thread. self._worker_error = ex diff --git a/psycopg/psycopg/_copy_async.py b/psycopg/psycopg/_copy_async.py index c94f05db9..4be61bf82 100644 --- a/psycopg/psycopg/_copy_async.py +++ b/psycopg/psycopg/_copy_async.py @@ -13,7 +13,7 @@ from typing import Any, AsyncIterator, Type, Tuple, Sequence, TYPE_CHECKING from . import pq from . import errors as e from ._compat import Self -from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE +from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE, PREFER_FLUSH from .generators import copy_to, copy_end from ._encodings import pgconn_encoding from ._acompat import aspawn, agather, AQueue, AWorker @@ -198,13 +198,15 @@ class AsyncLibpqWriter(AsyncWriter): if len(data) <= MAX_BUFFER_SIZE: # Most used path: we don't need to split the buffer in smaller # bits, so don't make a copy. - await self.connection.wait(copy_to(self._pgconn, data)) + await self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH)) else: # Copy a buffer too large in chunks to avoid causing a memory # error in the libpq, which may cause an infinite loop (#255). for i in range(0, len(data), MAX_BUFFER_SIZE): await self.connection.wait( - copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE]) + copy_to( + self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH + ) ) async def finish(self, exc: BaseException | None = None) -> None: @@ -258,7 +260,9 @@ class AsyncQueuedLibpqWriter(AsyncLibpqWriter): data = await self._queue.get() if not data: break - await self.connection.wait(copy_to(self._pgconn, data)) + await self.connection.wait( + copy_to(self._pgconn, data, flush=PREFER_FLUSH) + ) except BaseException as ex: # Propagate the error to the main thread. self._worker_error = ex diff --git a/psycopg/psycopg/_copy_base.py b/psycopg/psycopg/_copy_base.py index 59ae71fef..47a028bf0 100644 --- a/psycopg/psycopg/_copy_base.py +++ b/psycopg/psycopg/_copy_base.py @@ -7,6 +7,7 @@ psycopg copy support from __future__ import annotations import re +import sys import struct from abc import ABC, abstractmethod from typing import Any, Dict, Generic, List, Match @@ -53,6 +54,10 @@ MAX_BUFFER_SIZE = 4 * BUFFER_SIZE # Each buffer should be around BUFFER_SIZE size. QUEUE_SIZE = 1024 +# On certain systems, memmove seems particularly slow and flushing often is +# more performing than accumulating a larger buffer. See #746 for details. +PREFER_FLUSH = sys.platform == "darwin" + class BaseCopy(Generic[ConnectionType]): """ diff --git a/psycopg/psycopg/generators.py b/psycopg/psycopg/generators.py index f25e6a168..96cc3dea3 100644 --- a/psycopg/psycopg/generators.py +++ b/psycopg/psycopg/generators.py @@ -333,7 +333,7 @@ def copy_from(pgconn: PGconn) -> PQGen[Union[memoryview, PGresult]]: return result -def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]: +def copy_to(pgconn: PGconn, buffer: Buffer, flush: bool = True) -> PQGen[None]: # Retry enqueuing data until successful. # # WARNING! This can cause an infinite loop if the buffer is too large. (see @@ -346,15 +346,19 @@ def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]: if ready: break - # Repeat until it the message is flushed to the server - while True: + # Flushing often has a good effect on macOS because memcpy operations + # seem expensive on this platform so accumulating a large buffer has a + # bad effect (see #745). + if flush: + # Repeat until it the message is flushed to the server while True: - ready = yield WAIT_W - if ready: + while True: + ready = yield WAIT_W + if ready: + break + f = pgconn.flush() + if f == 0: break - f = pgconn.flush() - if f == 0: - break def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]: diff --git a/tests/test_copy.py b/tests/test_copy.py index 55f9e3b77..596b837f1 100644 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -643,7 +643,7 @@ def test_worker_life(conn, format, buffer): def test_worker_error_propagated(conn, monkeypatch): - def copy_to_broken(pgconn, buffer): + def copy_to_broken(pgconn, buffer, flush=True): raise ZeroDivisionError yield diff --git a/tests/test_copy_async.py b/tests/test_copy_async.py index f52cc6144..6d1d49566 100644 --- a/tests/test_copy_async.py +++ b/tests/test_copy_async.py @@ -656,7 +656,7 @@ async def test_worker_life(aconn, format, buffer): async def test_worker_error_propagated(aconn, monkeypatch): - def copy_to_broken(pgconn, buffer): + def copy_to_broken(pgconn, buffer, flush=True): raise ZeroDivisionError yield -- 2.47.2