From: Daniele Varrazzo Date: Sat, 23 Jul 2022 21:44:55 +0000 (+0100) Subject: perf(copy): use non-thread/task copy writers by default X-Git-Tag: 3.1~44^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9b66e0bcbd25e97aec2c7956e90eb8137c3dc001;p=thirdparty%2Fpsycopg.git perf(copy): use non-thread/task copy writers by default Further benchmarks show no performance improvement using a writer thread, and a slight slowdown using a writer async task. If that's the case we can keep it simpler. --- diff --git a/psycopg/psycopg/copy.py b/psycopg/psycopg/copy.py index d1204bffb..04632c949 100644 --- a/psycopg/psycopg/copy.py +++ b/psycopg/psycopg/copy.py @@ -199,7 +199,7 @@ class Copy(BaseCopy["Connection[Any]"]): def __init__(self, cursor: "Cursor[Any]", *, writer: Optional["Writer"] = None): super().__init__(cursor) if not writer: - writer = QueueWriter(cursor) + writer = LibpqWriter(cursor) self.writer = writer self._write = writer.write @@ -429,7 +429,7 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]): super().__init__(cursor) if not writer: - writer = AsyncQueueWriter(cursor) + writer = AsyncLibpqWriter(cursor) self.writer = writer self._write = writer.write diff --git a/tests/test_copy.py b/tests/test_copy.py index 094a7043d..e6e3eb199 100644 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -12,6 +12,7 @@ from psycopg import pq from psycopg import sql from psycopg import errors as e from psycopg.pq import Format +from psycopg.copy import Copy, Writer, LibpqWriter, QueueWriter from psycopg.adapt import PyFormat from psycopg.types import TypeInfo from psycopg.types.hstore import register_hstore @@ -464,7 +465,7 @@ def test_copy_in_format(conn): writer = BytesWriter() conn.execute("set client_encoding to utf8") cur = conn.cursor() - with psycopg.copy.Copy(cur, writer=writer) as copy: + with Copy(cur, writer=writer) as copy: for i in range(1, 256): copy.write_row((i, chr(i))) @@ -616,7 +617,9 @@ def test_description(conn): def test_worker_life(conn, format, buffer): cur = conn.cursor() ensure_table(cur, sample_tabledef) - with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy: + with cur.copy( + f"copy copy_in from stdin (format {format.name})", writer=QueueWriter(cur) + ) as copy: assert not copy.writer._worker copy.write(globals()[buffer]) assert copy.writer._worker @@ -635,7 +638,7 @@ def test_worker_error_propagated(conn, monkeypatch): cur = conn.cursor() cur.execute("create temp table wat (a text, b text)") with pytest.raises(ZeroDivisionError): - with cur.copy("copy wat from stdin") as copy: + with cur.copy("copy wat from stdin", writer=QueueWriter(cur)) as copy: copy.write("a,b") @@ -645,7 +648,7 @@ def test_worker_error_propagated(conn, monkeypatch): ) def test_connection_writer(conn, format, buffer): cur = conn.cursor() - writer = psycopg.copy.LibpqWriter(cur) + writer = LibpqWriter(cur) ensure_table(cur, sample_tabledef) with cur.copy( @@ -859,7 +862,7 @@ class DataGenerator: return m.hexdigest() -class BytesWriter(psycopg.copy.Writer): +class BytesWriter(Writer): def __init__(self): self.file = BytesIO() diff --git a/tests/test_copy_async.py b/tests/test_copy_async.py index ed91c0791..cd450e547 100644 --- a/tests/test_copy_async.py +++ b/tests/test_copy_async.py @@ -12,6 +12,7 @@ from psycopg import pq from psycopg import sql from psycopg import errors as e from psycopg.pq import Format +from psycopg.copy import AsyncCopy, AsyncWriter, AsyncLibpqWriter, AsyncQueueWriter from psycopg.types import TypeInfo from psycopg.adapt import PyFormat from psycopg.types.hstore import register_hstore @@ -466,7 +467,7 @@ async def test_copy_in_format(aconn): writer = AsyncBytesWriter() await aconn.execute("set client_encoding to utf8") cur = aconn.cursor() - async with psycopg.copy.AsyncCopy(cur, writer=writer) as copy: + async with AsyncCopy(cur, writer=writer) as copy: for i in range(1, 256): await copy.write_row((i, chr(i))) @@ -618,7 +619,9 @@ async def test_description(aconn): async def test_worker_life(aconn, format, buffer): cur = aconn.cursor() await ensure_table(cur, sample_tabledef) - async with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy: + async with cur.copy( + f"copy copy_in from stdin (format {format.name})", writer=AsyncQueueWriter(cur) + ) as copy: assert not copy.writer._worker await copy.write(globals()[buffer]) assert copy.writer._worker @@ -638,7 +641,9 @@ async def test_worker_error_propagated(aconn, monkeypatch): cur = aconn.cursor() await cur.execute("create temp table wat (a text, b text)") with pytest.raises(ZeroDivisionError): - async with cur.copy("copy wat from stdin") as copy: + async with cur.copy( + "copy wat from stdin", writer=AsyncQueueWriter(cur) + ) as copy: await copy.write("a,b") @@ -648,7 +653,7 @@ async def test_worker_error_propagated(aconn, monkeypatch): ) async def test_connection_writer(aconn, format, buffer): cur = aconn.cursor() - writer = psycopg.copy.AsyncLibpqWriter(cur) + writer = AsyncLibpqWriter(cur) await ensure_table(cur, sample_tabledef) async with cur.copy( @@ -852,7 +857,7 @@ class DataGenerator: return m.hexdigest() -class AsyncBytesWriter(psycopg.copy.AsyncWriter): +class AsyncBytesWriter(AsyncWriter): def __init__(self): self.file = BytesIO()