]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
perf(copy): only flush at every row on copy on macOS 746/head
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 6 May 2024 16:43:10 +0000 (18:43 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 8 May 2024 10:06:22 +0000 (12:06 +0200)
With some benchmark, it seems that on Linux and Windows performances are worse.
See #746 for details.

docs/news.rst
psycopg/psycopg/_copy.py
psycopg/psycopg/_copy_async.py
psycopg/psycopg/_copy_base.py
psycopg/psycopg/generators.py
tests/test_copy.py
tests/test_copy_async.py

index 9a62899bb438a71382fe1ba0b725b28c45cca3ad..dc8bf2a353766913a92bcfb84bf5f9e0c98aac96 100644 (file)
@@ -53,6 +53,7 @@ Psycopg 3.1.19 (unreleased)
 - Fix excessive stripping of error message prefixes (:ticket:`#752`).
 - Allow to specify the ``connect_timeout`` connection parameter as float
   (:ticket:`#796`).
+- Improve COPY performance on macOS (:ticket:`#745`).
 
 
 Current release
index 2b0e77ca7e0887a39d42a75d8c686414cb81c652..8d779cc591cb016d7476c1d09dfe85531934452a 100644 (file)
@@ -16,7 +16,7 @@ from typing import Any, Iterator, Type, Tuple, Sequence, TYPE_CHECKING
 from . import pq
 from . import errors as e
 from ._compat import Self
-from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE
+from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE, PREFER_FLUSH
 from .generators import copy_to, copy_end
 from ._encodings import pgconn_encoding
 from ._acompat import spawn, gather, Queue, Worker
@@ -199,13 +199,15 @@ class LibpqWriter(Writer):
         if len(data) <= MAX_BUFFER_SIZE:
             # Most used path: we don't need to split the buffer in smaller
             # bits, so don't make a copy.
-            self.connection.wait(copy_to(self._pgconn, data))
+            self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
         else:
             # Copy a buffer too large in chunks to avoid causing a memory
             # error in the libpq, which may cause an infinite loop (#255).
             for i in range(0, len(data), MAX_BUFFER_SIZE):
                 self.connection.wait(
-                    copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE])
+                    copy_to(
+                        self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH
+                    )
                 )
 
     def finish(self, exc: BaseException | None = None) -> None:
@@ -259,7 +261,7 @@ class QueuedLibpqWriter(LibpqWriter):
                 data = self._queue.get()
                 if not data:
                     break
-                self.connection.wait(copy_to(self._pgconn, data))
+                self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
         except BaseException as ex:
             # Propagate the error to the main thread.
             self._worker_error = ex
index c94f05db920ffeeef12c2982b5e3556e34d07427..4be61bf82934dedcbe152a105fd0afd0c5ed3e3b 100644 (file)
@@ -13,7 +13,7 @@ from typing import Any, AsyncIterator, Type, Tuple, Sequence, TYPE_CHECKING
 from . import pq
 from . import errors as e
 from ._compat import Self
-from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE
+from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE, PREFER_FLUSH
 from .generators import copy_to, copy_end
 from ._encodings import pgconn_encoding
 from ._acompat import aspawn, agather, AQueue, AWorker
@@ -198,13 +198,15 @@ class AsyncLibpqWriter(AsyncWriter):
         if len(data) <= MAX_BUFFER_SIZE:
             # Most used path: we don't need to split the buffer in smaller
             # bits, so don't make a copy.
-            await self.connection.wait(copy_to(self._pgconn, data))
+            await self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
         else:
             # Copy a buffer too large in chunks to avoid causing a memory
             # error in the libpq, which may cause an infinite loop (#255).
             for i in range(0, len(data), MAX_BUFFER_SIZE):
                 await self.connection.wait(
-                    copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE])
+                    copy_to(
+                        self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH
+                    )
                 )
 
     async def finish(self, exc: BaseException | None = None) -> None:
@@ -258,7 +260,9 @@ class AsyncQueuedLibpqWriter(AsyncLibpqWriter):
                 data = await self._queue.get()
                 if not data:
                     break
-                await self.connection.wait(copy_to(self._pgconn, data))
+                await self.connection.wait(
+                    copy_to(self._pgconn, data, flush=PREFER_FLUSH)
+                )
         except BaseException as ex:
             # Propagate the error to the main thread.
             self._worker_error = ex
index 59ae71fef61f69159b8fb8f270f0d8bdfa4abe27..47a028bf069ad0137e3bef91c79bd6b98a4d1cb0 100644 (file)
@@ -7,6 +7,7 @@ psycopg copy support
 from __future__ import annotations
 
 import re
+import sys
 import struct
 from abc import ABC, abstractmethod
 from typing import Any, Dict, Generic, List, Match
@@ -53,6 +54,10 @@ MAX_BUFFER_SIZE = 4 * BUFFER_SIZE
 # Each buffer should be around BUFFER_SIZE size.
 QUEUE_SIZE = 1024
 
+# On certain systems, memmove seems particularly slow and flushing often is
+# more performing than accumulating a larger buffer. See #746 for details.
+PREFER_FLUSH = sys.platform == "darwin"
+
 
 class BaseCopy(Generic[ConnectionType]):
     """
index f25e6a1682921c32b8c96ee722f8089e3aec5a2b..96cc3dea35dff8df581f91682b1545783a17e3fb 100644 (file)
@@ -333,7 +333,7 @@ def copy_from(pgconn: PGconn) -> PQGen[Union[memoryview, PGresult]]:
     return result
 
 
-def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]:
+def copy_to(pgconn: PGconn, buffer: Buffer, flush: bool = True) -> PQGen[None]:
     # Retry enqueuing data until successful.
     #
     # WARNING! This can cause an infinite loop if the buffer is too large. (see
@@ -346,15 +346,19 @@ def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]:
             if ready:
                 break
 
-    # Repeat until it the message is flushed to the server
-    while True:
+    # Flushing often has a good effect on macOS because memcpy operations
+    # seem expensive on this platform so accumulating a large buffer has a
+    # bad effect (see #745).
+    if flush:
+        # Repeat until it the message is flushed to the server
         while True:
-            ready = yield WAIT_W
-            if ready:
+            while True:
+                ready = yield WAIT_W
+                if ready:
+                    break
+            f = pgconn.flush()
+            if f == 0:
                 break
-        f = pgconn.flush()
-        if f == 0:
-            break
 
 
 def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]:
index 55f9e3b77499bf44555ce30caae87ae41ec1c3d7..596b837f177f9dd303bbf3c05203a6a96112cb21 100644 (file)
@@ -643,7 +643,7 @@ def test_worker_life(conn, format, buffer):
 
 def test_worker_error_propagated(conn, monkeypatch):
 
-    def copy_to_broken(pgconn, buffer):
+    def copy_to_broken(pgconn, buffer, flush=True):
         raise ZeroDivisionError
         yield
 
index f52cc614431dbe5cc93b3f39dfd2ca8dbe568fce..6d1d4956614ee0b1b048ae20b85760354b578eac 100644 (file)
@@ -656,7 +656,7 @@ async def test_worker_life(aconn, format, buffer):
 
 
 async def test_worker_error_propagated(aconn, monkeypatch):
-    def copy_to_broken(pgconn, buffer):
+    def copy_to_broken(pgconn, buffer, flush=True):
         raise ZeroDivisionError
         yield