]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
perf(copy): use non-thread/task copy writers by default
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 23 Jul 2022 21:44:55 +0000 (22:44 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 26 Jul 2022 12:01:42 +0000 (13:01 +0100)
Further benchmarks show no performance improvement using a writer
thread, and a slight slowdown using a writer async task. If that's the
case we can keep it simpler.

psycopg/psycopg/copy.py
tests/test_copy.py
tests/test_copy_async.py

index d1204bffb6a5eb7104dc016e3032159e200109af..04632c9496f7feec19c03ec7ea3a081637a39d47 100644 (file)
@@ -199,7 +199,7 @@ class Copy(BaseCopy["Connection[Any]"]):
     def __init__(self, cursor: "Cursor[Any]", *, writer: Optional["Writer"] = None):
         super().__init__(cursor)
         if not writer:
-            writer = QueueWriter(cursor)
+            writer = LibpqWriter(cursor)
 
         self.writer = writer
         self._write = writer.write
@@ -429,7 +429,7 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]):
         super().__init__(cursor)
 
         if not writer:
-            writer = AsyncQueueWriter(cursor)
+            writer = AsyncLibpqWriter(cursor)
 
         self.writer = writer
         self._write = writer.write
index 094a7043d932cb9a936ca95725e7bb616d5e68d7..e6e3eb19990559c8796c45308d2d4df4978ff5e6 100644 (file)
@@ -12,6 +12,7 @@ from psycopg import pq
 from psycopg import sql
 from psycopg import errors as e
 from psycopg.pq import Format
+from psycopg.copy import Copy, Writer, LibpqWriter, QueueWriter
 from psycopg.adapt import PyFormat
 from psycopg.types import TypeInfo
 from psycopg.types.hstore import register_hstore
@@ -464,7 +465,7 @@ def test_copy_in_format(conn):
     writer = BytesWriter()
     conn.execute("set client_encoding to utf8")
     cur = conn.cursor()
-    with psycopg.copy.Copy(cur, writer=writer) as copy:
+    with Copy(cur, writer=writer) as copy:
         for i in range(1, 256):
             copy.write_row((i, chr(i)))
 
@@ -616,7 +617,9 @@ def test_description(conn):
 def test_worker_life(conn, format, buffer):
     cur = conn.cursor()
     ensure_table(cur, sample_tabledef)
-    with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy:
+    with cur.copy(
+        f"copy copy_in from stdin (format {format.name})", writer=QueueWriter(cur)
+    ) as copy:
         assert not copy.writer._worker
         copy.write(globals()[buffer])
         assert copy.writer._worker
@@ -635,7 +638,7 @@ def test_worker_error_propagated(conn, monkeypatch):
     cur = conn.cursor()
     cur.execute("create temp table wat (a text, b text)")
     with pytest.raises(ZeroDivisionError):
-        with cur.copy("copy wat from stdin") as copy:
+        with cur.copy("copy wat from stdin", writer=QueueWriter(cur)) as copy:
             copy.write("a,b")
 
 
@@ -645,7 +648,7 @@ def test_worker_error_propagated(conn, monkeypatch):
 )
 def test_connection_writer(conn, format, buffer):
     cur = conn.cursor()
-    writer = psycopg.copy.LibpqWriter(cur)
+    writer = LibpqWriter(cur)
 
     ensure_table(cur, sample_tabledef)
     with cur.copy(
@@ -859,7 +862,7 @@ class DataGenerator:
         return m.hexdigest()
 
 
-class BytesWriter(psycopg.copy.Writer):
+class BytesWriter(Writer):
     def __init__(self):
         self.file = BytesIO()
 
index ed91c07919ae2b314cc91b5957ef7ebf01b3b1d9..cd450e54760ee7f13d90d42b88e30a7b839906ff 100644 (file)
@@ -12,6 +12,7 @@ from psycopg import pq
 from psycopg import sql
 from psycopg import errors as e
 from psycopg.pq import Format
+from psycopg.copy import AsyncCopy, AsyncWriter, AsyncLibpqWriter, AsyncQueueWriter
 from psycopg.types import TypeInfo
 from psycopg.adapt import PyFormat
 from psycopg.types.hstore import register_hstore
@@ -466,7 +467,7 @@ async def test_copy_in_format(aconn):
     writer = AsyncBytesWriter()
     await aconn.execute("set client_encoding to utf8")
     cur = aconn.cursor()
-    async with psycopg.copy.AsyncCopy(cur, writer=writer) as copy:
+    async with AsyncCopy(cur, writer=writer) as copy:
         for i in range(1, 256):
             await copy.write_row((i, chr(i)))
 
@@ -618,7 +619,9 @@ async def test_description(aconn):
 async def test_worker_life(aconn, format, buffer):
     cur = aconn.cursor()
     await ensure_table(cur, sample_tabledef)
-    async with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy:
+    async with cur.copy(
+        f"copy copy_in from stdin (format {format.name})", writer=AsyncQueueWriter(cur)
+    ) as copy:
         assert not copy.writer._worker
         await copy.write(globals()[buffer])
         assert copy.writer._worker
@@ -638,7 +641,9 @@ async def test_worker_error_propagated(aconn, monkeypatch):
     cur = aconn.cursor()
     await cur.execute("create temp table wat (a text, b text)")
     with pytest.raises(ZeroDivisionError):
-        async with cur.copy("copy wat from stdin") as copy:
+        async with cur.copy(
+            "copy wat from stdin", writer=AsyncQueueWriter(cur)
+        ) as copy:
             await copy.write("a,b")
 
 
@@ -648,7 +653,7 @@ async def test_worker_error_propagated(aconn, monkeypatch):
 )
 async def test_connection_writer(aconn, format, buffer):
     cur = aconn.cursor()
-    writer = psycopg.copy.AsyncLibpqWriter(cur)
+    writer = AsyncLibpqWriter(cur)
 
     await ensure_table(cur, sample_tabledef)
     async with cur.copy(
@@ -852,7 +857,7 @@ class DataGenerator:
         return m.hexdigest()
 
 
-class AsyncBytesWriter(psycopg.copy.AsyncWriter):
+class AsyncBytesWriter(AsyncWriter):
     def __init__(self):
         self.file = BytesIO()