]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix(copy): chunk large buffers before queuing, not after 256/head
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 25 Mar 2022 15:57:28 +0000 (16:57 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 26 Mar 2022 00:17:25 +0000 (01:17 +0100)
This is conceptually a better place to do it, because the queue has the
function of applying backpressure on the data generator. Splitting large
buffers later would flood the libpq without effectively slowing down the
producer.

Also, reduce the size of the chunks appended to the libpq from 1Mb to
128K. This makes an *immense* difference: the too large chunk probably
triggers some quadraticity in the libpq. The test script found in #255,
piped in `ts -s`, shows that pushing a block of data of about 1Gb size
(which will fail in Postgres anyway), with the smaller size, will take
about 9s. With the larger size, it takes 4.10m to get to waiting for
PQputCopyEnd, and other almost 6 minutes to receive the error message
from the server.

    00:00:47 putting 1048576 (or less) bytes in queue size 1023
    00:00:47 writing copy end
    00:00:47 got 1048576 bytes from queue size 1023
    ...
    00:01:25 got 1048576 bytes from queue size 640
    ...
    00:01:54 got 1048576 bytes from queue size 512
    ...
    00:03:00 got 1048576 bytes from queue size 256
    ...
    00:04:12 got 0 bytes from queue size 0
    00:04:12 wait for copy end
    00:09:59 Traceback (most recent call last):
    ...

Adding a few prints (see #255 for details) also shows that the time
spent in PQputCopyData increases, going from ~15 entries/sec processed
when the writer has just finished pushing data in the queue, down to ~4
items/sec towards the end.

Considering that a reduction of 10-20% of the input size causes a
decrease of the processing time of about 50%, there is definitely
something quadratic going on there. It might be possible to improve the
libpq, but for the moment it's better to try and coexist nicely with the
current state.

psycopg/psycopg/copy.py
psycopg/psycopg/generators.py
tests/test_copy.py
tests/test_copy_async.py

index 6a3da8e0d81ffb74b49c5f7c03d4e65710e59699..b8c1818962d7f44a91a83b04f8a53add76c779b5 100644 (file)
@@ -51,9 +51,23 @@ class BaseCopy(Generic[ConnectionType]):
     """
 
     # Max size of the write queue of buffers. More than that copy will block
-    # Each buffer around Formatter.BUFFER_SIZE size
+    # Each buffer should be around BUFFER_SIZE size.
     QUEUE_SIZE = 1024
 
+    # Size of data to accumulate before sending it down the network. We fill a
+    # buffer this size field by field, and when it passes the threshold size
+    # wee ship it, so it may end up being bigger than this.
+    BUFFER_SIZE = 32 * 1024
+
+    # Maximum data size we want to queue to send to the libpq copy. Sending a
+    # buffer too big to be handled can cause an infinite loop in the libpq
+    # (#255) so we want to split it in more digestable chunks.
+    MAX_BUFFER_SIZE = 4 * BUFFER_SIZE
+    # Note: making this buffer too large, e.g.
+    # MAX_BUFFER_SIZE = 1024 * 1024
+    # makes operations *way* slower! Probably triggering some quadraticity
+    # in the libpq memory management and data sending.
+
     formatter: "Formatter"
 
     def __init__(self, cursor: "BaseCursor[ConnectionType, Any]"):
@@ -300,7 +314,15 @@ class Copy(BaseCopy["Connection[Any]"]):
         if self._worker_error:
             raise self._worker_error
 
-        self._queue.put(data)
+        if len(data) <= self.MAX_BUFFER_SIZE:
+            # Most used path: we don't need to split the buffer in smaller
+            # bits, so don't make a copy.
+            self._queue.put(data)
+        else:
+            # Copy a buffer too large in chunks to avoid causing a memory
+            # error in the libpq, which may cause an infinite loop (#255).
+            for i in range(0, len(data), self.MAX_BUFFER_SIZE):
+                self._queue.put(data[i : i + self.MAX_BUFFER_SIZE])
 
     def _write_end(self) -> None:
         data = self.formatter.end()
@@ -395,7 +417,15 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]):
         if not self._worker:
             self._worker = create_task(self.worker())
 
-        await self._queue.put(data)
+        if len(data) <= self.MAX_BUFFER_SIZE:
+            # Most used path: we don't need to split the buffer in smaller
+            # bits, so don't make a copy.
+            await self._queue.put(data)
+        else:
+            # Copy a buffer too large in chunks to avoid causing a memory
+            # error in the libpq, which may cause an infinite loop (#255).
+            for i in range(0, len(data), self.MAX_BUFFER_SIZE):
+                await self._queue.put(data[i : i + self.MAX_BUFFER_SIZE])
 
     async def _write_end(self) -> None:
         data = self.formatter.end()
@@ -413,9 +443,7 @@ class Formatter(ABC):
     """
 
     format: pq.Format
-
-    # Size of data to accumulate before sending it down the network
-    BUFFER_SIZE = 32 * 1024
+    BUFFER_SIZE = BaseCopy.BUFFER_SIZE
 
     def __init__(self, transformer: Transformer):
         self.transformer = transformer
index 902695b4bfd55595f79de207ccf2559bb86ee630..dbee2b2d1501b282aacb7d7e7f2fdc34d0ef9177 100644 (file)
@@ -203,16 +203,8 @@ def copy_from(pgconn: PGconn) -> PQGen[Union[memoryview, PGresult]]:
 
 
 def copy_to(pgconn: PGconn, buffer: bytes) -> PQGen[None]:
-    # Split the data to send in chunks not larger than 1Mb.
-    #
-    # The libpq docs say to retry on 0. What they don't say is that they return
-    # 0 pretty much only on palloc error, not on socket EWOULDBLOCK. Passing a
-    # block too big will always fail and create an infinite loop (See #255).
-    COPY_BUFFER_SIZE = 2**20
-    for i in range(0, len(buffer), COPY_BUFFER_SIZE):
-        # Retry enqueuing data until successful
-        while pgconn.put_copy_data(buffer[i : i + COPY_BUFFER_SIZE]) == 0:
-            yield Wait.W
+    while pgconn.put_copy_data(buffer) == 0:
+        yield Wait.W
 
 
 def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]:
index 64037fd628d30772ac1d747e9db3a3f6298d08b7..e506ad06e5f70d6457615500c49d3af97927588c 100644 (file)
@@ -2,6 +2,7 @@ import gc
 import string
 import hashlib
 from io import BytesIO, StringIO
+from random import choice, randrange
 from itertools import cycle
 
 import pytest
@@ -264,6 +265,30 @@ def test_copy_in_empty(conn, format):
     assert cur.rowcount == 0
 
 
+@pytest.mark.slow
+def test_copy_big_size_record(conn):
+    cur = conn.cursor()
+    ensure_table(cur, sample_tabledef)
+    data = "".join(chr(randrange(1, 256)) for i in range(10 * 1024 * 1024))
+    with cur.copy("copy copy_in (data) from stdin") as copy:
+        copy.write_row([data])
+
+    cur.execute("select data from copy_in limit 1")
+    assert cur.fetchone()[0] == data
+
+
+@pytest.mark.slow
+def test_copy_big_size_block(conn):
+    cur = conn.cursor()
+    ensure_table(cur, sample_tabledef)
+    data = "".join(choice(string.ascii_letters) for i in range(10 * 1024 * 1024))
+    with cur.copy("copy copy_in (data) from stdin") as copy:
+        copy.write(data + "\n")
+
+    cur.execute("select data from copy_in limit 1")
+    assert cur.fetchone()[0] == data
+
+
 @pytest.mark.parametrize("format", Format)
 def test_subclass_adapter(conn, format):
     if format == Format.TEXT:
index ad7ab7f44a7f43b910b40c4b0fd497459804892d..271e92105a0021c2725c82cc4448544741a2c061 100644 (file)
@@ -2,6 +2,7 @@ import gc
 import string
 import hashlib
 from io import BytesIO, StringIO
+from random import choice, randrange
 from itertools import cycle
 
 import pytest
@@ -254,6 +255,30 @@ async def test_copy_in_empty(aconn, format):
     assert cur.rowcount == 0
 
 
+@pytest.mark.slow
+async def test_copy_big_size_record(aconn):
+    cur = aconn.cursor()
+    await ensure_table(cur, sample_tabledef)
+    data = "".join(chr(randrange(1, 256)) for i in range(10 * 1024 * 1024))
+    async with cur.copy("copy copy_in (data) from stdin") as copy:
+        await copy.write_row([data])
+
+    await cur.execute("select data from copy_in limit 1")
+    assert await cur.fetchone() == (data,)
+
+
+@pytest.mark.slow
+async def test_copy_big_size_block(aconn):
+    cur = aconn.cursor()
+    await ensure_table(cur, sample_tabledef)
+    data = "".join(choice(string.ascii_letters) for i in range(10 * 1024 * 1024))
+    async with cur.copy("copy copy_in (data) from stdin") as copy:
+        await copy.write(data + "\n")
+
+    await cur.execute("select data from copy_in limit 1")
+    assert await cur.fetchone() == (data,)
+
+
 @pytest.mark.parametrize("format", Format)
 async def test_subclass_adapter(aconn, format):
     if format == Format.TEXT: