From: Daniele Varrazzo Date: Fri, 25 Mar 2022 15:57:28 +0000 (+0100) Subject: fix(copy): chunk large buffers before queuing, not after X-Git-Tag: 3.1~163^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1a1395c448985f285ba2cf8924153df06bd23303;p=thirdparty%2Fpsycopg.git fix(copy): chunk large buffers before queuing, not after This is conceptually a better place to do it, because the queue has the function of applying backpressure on the data generator. Splitting large buffers later would flood the libpq without effectively slowing down the producer. Also, reduce the size of the chunks appended to the libpq from 1Mb to 128K. This makes an *immense* difference: the too large chunk probably triggers some quadraticity in the libpq. The test script found in #255, piped in `ts -s`, shows that pushing a block of data of about 1Gb size (which will fail in Postgres anyway), with the smaller size, will take about 9s. With the larger size, it takes 4.10m to get to waiting for PQputCopyEnd, and other almost 6 minutes to receive the error message from the server. 00:00:47 putting 1048576 (or less) bytes in queue size 1023 00:00:47 writing copy end 00:00:47 got 1048576 bytes from queue size 1023 ... 00:01:25 got 1048576 bytes from queue size 640 ... 00:01:54 got 1048576 bytes from queue size 512 ... 00:03:00 got 1048576 bytes from queue size 256 ... 00:04:12 got 0 bytes from queue size 0 00:04:12 wait for copy end 00:09:59 Traceback (most recent call last): ... Adding a few prints (see #255 for details) also shows that the time spent in PQputCopyData increases, going from ~15 entries/sec processed when the writer has just finished pushing data in the queue, down to ~4 items/sec towards the end. Considering that a reduction of 10-20% of the input size causes a decrease of the processing time of about 50%, there is definitely something quadratic going on there. It might be possible to improve the libpq, but for the moment it's better to try and coexist nicely with the current state. --- diff --git a/psycopg/psycopg/copy.py b/psycopg/psycopg/copy.py index 6a3da8e0d..b8c181896 100644 --- a/psycopg/psycopg/copy.py +++ b/psycopg/psycopg/copy.py @@ -51,9 +51,23 @@ class BaseCopy(Generic[ConnectionType]): """ # Max size of the write queue of buffers. More than that copy will block - # Each buffer around Formatter.BUFFER_SIZE size + # Each buffer should be around BUFFER_SIZE size. QUEUE_SIZE = 1024 + # Size of data to accumulate before sending it down the network. We fill a + # buffer this size field by field, and when it passes the threshold size + # wee ship it, so it may end up being bigger than this. + BUFFER_SIZE = 32 * 1024 + + # Maximum data size we want to queue to send to the libpq copy. Sending a + # buffer too big to be handled can cause an infinite loop in the libpq + # (#255) so we want to split it in more digestable chunks. + MAX_BUFFER_SIZE = 4 * BUFFER_SIZE + # Note: making this buffer too large, e.g. + # MAX_BUFFER_SIZE = 1024 * 1024 + # makes operations *way* slower! Probably triggering some quadraticity + # in the libpq memory management and data sending. + formatter: "Formatter" def __init__(self, cursor: "BaseCursor[ConnectionType, Any]"): @@ -300,7 +314,15 @@ class Copy(BaseCopy["Connection[Any]"]): if self._worker_error: raise self._worker_error - self._queue.put(data) + if len(data) <= self.MAX_BUFFER_SIZE: + # Most used path: we don't need to split the buffer in smaller + # bits, so don't make a copy. + self._queue.put(data) + else: + # Copy a buffer too large in chunks to avoid causing a memory + # error in the libpq, which may cause an infinite loop (#255). + for i in range(0, len(data), self.MAX_BUFFER_SIZE): + self._queue.put(data[i : i + self.MAX_BUFFER_SIZE]) def _write_end(self) -> None: data = self.formatter.end() @@ -395,7 +417,15 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]): if not self._worker: self._worker = create_task(self.worker()) - await self._queue.put(data) + if len(data) <= self.MAX_BUFFER_SIZE: + # Most used path: we don't need to split the buffer in smaller + # bits, so don't make a copy. + await self._queue.put(data) + else: + # Copy a buffer too large in chunks to avoid causing a memory + # error in the libpq, which may cause an infinite loop (#255). + for i in range(0, len(data), self.MAX_BUFFER_SIZE): + await self._queue.put(data[i : i + self.MAX_BUFFER_SIZE]) async def _write_end(self) -> None: data = self.formatter.end() @@ -413,9 +443,7 @@ class Formatter(ABC): """ format: pq.Format - - # Size of data to accumulate before sending it down the network - BUFFER_SIZE = 32 * 1024 + BUFFER_SIZE = BaseCopy.BUFFER_SIZE def __init__(self, transformer: Transformer): self.transformer = transformer diff --git a/psycopg/psycopg/generators.py b/psycopg/psycopg/generators.py index 902695b4b..dbee2b2d1 100644 --- a/psycopg/psycopg/generators.py +++ b/psycopg/psycopg/generators.py @@ -203,16 +203,8 @@ def copy_from(pgconn: PGconn) -> PQGen[Union[memoryview, PGresult]]: def copy_to(pgconn: PGconn, buffer: bytes) -> PQGen[None]: - # Split the data to send in chunks not larger than 1Mb. - # - # The libpq docs say to retry on 0. What they don't say is that they return - # 0 pretty much only on palloc error, not on socket EWOULDBLOCK. Passing a - # block too big will always fail and create an infinite loop (See #255). - COPY_BUFFER_SIZE = 2**20 - for i in range(0, len(buffer), COPY_BUFFER_SIZE): - # Retry enqueuing data until successful - while pgconn.put_copy_data(buffer[i : i + COPY_BUFFER_SIZE]) == 0: - yield Wait.W + while pgconn.put_copy_data(buffer) == 0: + yield Wait.W def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]: diff --git a/tests/test_copy.py b/tests/test_copy.py index 64037fd62..e506ad06e 100644 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -2,6 +2,7 @@ import gc import string import hashlib from io import BytesIO, StringIO +from random import choice, randrange from itertools import cycle import pytest @@ -264,6 +265,30 @@ def test_copy_in_empty(conn, format): assert cur.rowcount == 0 +@pytest.mark.slow +def test_copy_big_size_record(conn): + cur = conn.cursor() + ensure_table(cur, sample_tabledef) + data = "".join(chr(randrange(1, 256)) for i in range(10 * 1024 * 1024)) + with cur.copy("copy copy_in (data) from stdin") as copy: + copy.write_row([data]) + + cur.execute("select data from copy_in limit 1") + assert cur.fetchone()[0] == data + + +@pytest.mark.slow +def test_copy_big_size_block(conn): + cur = conn.cursor() + ensure_table(cur, sample_tabledef) + data = "".join(choice(string.ascii_letters) for i in range(10 * 1024 * 1024)) + with cur.copy("copy copy_in (data) from stdin") as copy: + copy.write(data + "\n") + + cur.execute("select data from copy_in limit 1") + assert cur.fetchone()[0] == data + + @pytest.mark.parametrize("format", Format) def test_subclass_adapter(conn, format): if format == Format.TEXT: diff --git a/tests/test_copy_async.py b/tests/test_copy_async.py index ad7ab7f44..271e92105 100644 --- a/tests/test_copy_async.py +++ b/tests/test_copy_async.py @@ -2,6 +2,7 @@ import gc import string import hashlib from io import BytesIO, StringIO +from random import choice, randrange from itertools import cycle import pytest @@ -254,6 +255,30 @@ async def test_copy_in_empty(aconn, format): assert cur.rowcount == 0 +@pytest.mark.slow +async def test_copy_big_size_record(aconn): + cur = aconn.cursor() + await ensure_table(cur, sample_tabledef) + data = "".join(chr(randrange(1, 256)) for i in range(10 * 1024 * 1024)) + async with cur.copy("copy copy_in (data) from stdin") as copy: + await copy.write_row([data]) + + await cur.execute("select data from copy_in limit 1") + assert await cur.fetchone() == (data,) + + +@pytest.mark.slow +async def test_copy_big_size_block(aconn): + cur = aconn.cursor() + await ensure_table(cur, sample_tabledef) + data = "".join(choice(string.ascii_letters) for i in range(10 * 1024 * 1024)) + async with cur.copy("copy copy_in (data) from stdin") as copy: + await copy.write(data + "\n") + + await cur.execute("select data from copy_in limit 1") + assert await cur.fetchone() == (data,) + + @pytest.mark.parametrize("format", Format) async def test_subclass_adapter(aconn, format): if format == Format.TEXT: