]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
perf(copy): only flush at every row on copy on macOS
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 6 May 2024 16:43:10 +0000 (18:43 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 8 May 2024 09:54:02 +0000 (11:54 +0200)
Using some benchmark, it seems on Linux performances are worse.

Must look into how Windows behave.

docs/news.rst
psycopg/psycopg/copy.py
psycopg/psycopg/generators.py
tests/test_copy.py
tests/test_copy_async.py

index 7c0160897d978b15e46cf1e7d53f430f5e1a6e2d..f7ceced69f58bf3962caec39fb0033733e227be1 100644 (file)
@@ -17,6 +17,7 @@ Psycopg 3.1.19 (unreleased)
 - Fix excessive stripping of error message prefixes (:ticket:`#752`).
 - Allow to specify the ``connect_timeout`` connection parameter as float
   (:ticket:`#796`).
+- Improve COPY performance on macOS (:ticket:`#745`).
 
 
 Current release
index f20c86a2415f9679a9d8ba9544befe31c8538b53..3ae8edbc915973c2db909fbda25def4a2ea2b3e6 100644 (file)
@@ -5,6 +5,7 @@ psycopg copy support
 # Copyright (C) 2020 The Psycopg Team
 
 import re
+import sys
 import queue
 import struct
 import asyncio
@@ -59,6 +60,10 @@ MAX_BUFFER_SIZE = 4 * BUFFER_SIZE
 # Each buffer should be around BUFFER_SIZE size.
 QUEUE_SIZE = 1024
 
+# On certain systems, memmove seems particularly slow and flushing often is
+# more performing than accumulating a larger buffer. See #746 for details.
+PREFER_FLUSH = sys.platform == "darwin"
+
 
 class BaseCopy(Generic[ConnectionType]):
     """
@@ -358,13 +363,15 @@ class LibpqWriter(Writer):
         if len(data) <= MAX_BUFFER_SIZE:
             # Most used path: we don't need to split the buffer in smaller
             # bits, so don't make a copy.
-            self.connection.wait(copy_to(self._pgconn, data))
+            self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
         else:
             # Copy a buffer too large in chunks to avoid causing a memory
             # error in the libpq, which may cause an infinite loop (#255).
             for i in range(0, len(data), MAX_BUFFER_SIZE):
                 self.connection.wait(
-                    copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE])
+                    copy_to(
+                        self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH
+                    )
                 )
 
     def finish(self, exc: Optional[BaseException] = None) -> None:
@@ -416,7 +423,7 @@ class QueuedLibpqWriter(LibpqWriter):
                 data = self._queue.get(block=True, timeout=24 * 60 * 60)
                 if not data:
                     break
-                self.connection.wait(copy_to(self._pgconn, data))
+                self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
         except BaseException as ex:
             # Propagate the error to the main thread.
             self._worker_error = ex
@@ -572,13 +579,15 @@ class AsyncLibpqWriter(AsyncWriter):
         if len(data) <= MAX_BUFFER_SIZE:
             # Most used path: we don't need to split the buffer in smaller
             # bits, so don't make a copy.
-            await self.connection.wait(copy_to(self._pgconn, data))
+            await self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
         else:
             # Copy a buffer too large in chunks to avoid causing a memory
             # error in the libpq, which may cause an infinite loop (#255).
             for i in range(0, len(data), MAX_BUFFER_SIZE):
                 await self.connection.wait(
-                    copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE])
+                    copy_to(
+                        self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH
+                    )
                 )
 
     async def finish(self, exc: Optional[BaseException] = None) -> None:
index aa01f1d62d952d5f1eead33054d9ae6775bc8a3f..8e90cb3f520822a3a17e6cd3b100cb773dca215d 100644 (file)
@@ -283,7 +283,7 @@ def copy_from(pgconn: PGconn) -> PQGen[Union[memoryview, PGresult]]:
     return result
 
 
-def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]:
+def copy_to(pgconn: PGconn, buffer: Buffer, flush: bool = True) -> PQGen[None]:
     # Retry enqueuing data until successful.
     #
     # WARNING! This can cause an infinite loop if the buffer is too large. (see
@@ -293,15 +293,19 @@ def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]:
     while pgconn.put_copy_data(buffer) == 0:
         yield WAIT_W
 
-    # Repeat until it the message is flushed to the server
-    while True:
+    # Flushing often has a good effect on macOS because memcpy operations
+    # seem expensive on this platform so accumulating a large buffer has a
+    # bad effect (see #745).
+    if flush:
+        # Repeat until it the message is flushed to the server
         while True:
-            ready = yield WAIT_W
-            if ready:
+            while True:
+                ready = yield WAIT_W
+                if ready:
+                    break
+            f = pgconn.flush()
+            if f == 0:
                 break
-        f = pgconn.flush()
-        if f == 0:
-            break
 
 
 def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]:
index bc3628c6abbe55689b0d4d9159e90940ad9e2071..0f0a1a78d56fb5a7bc1af15c1834d24f1b228b18 100644 (file)
@@ -655,7 +655,7 @@ def test_worker_life(conn, format, buffer):
 
 
 def test_worker_error_propagated(conn, monkeypatch):
-    def copy_to_broken(pgconn, buffer):
+    def copy_to_broken(pgconn, buffer, flush=True):
         raise ZeroDivisionError
         yield
 
index ea8f23028066e2fb733f68e37ca7ab854fa71273..5589656598fcddeb9ce1cfe710ba5d6a00269718 100644 (file)
@@ -658,7 +658,7 @@ async def test_worker_life(aconn, format, buffer):
 
 
 async def test_worker_error_propagated(aconn, monkeypatch):
-    def copy_to_broken(pgconn, buffer):
+    def copy_to_broken(pgconn, buffer, flush=True):
         raise ZeroDivisionError
         yield