]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
perf: base executemany on pipeline
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 27 Mar 2022 03:55:51 +0000 (05:55 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 2 Apr 2022 23:23:22 +0000 (01:23 +0200)
This changeset also fix several glitches of executemany() run in
pipeline mode, around the management of returned value and rowcount.
These glitches still appear if executemany() is run in an explicit
pipeline() block, because certain events only happen at (outermost)
pipeline block exit.

Related to: #145.

psycopg/psycopg/_pipeline.py
psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py
tests/test_pipeline.py

index 26923682b47aa271bcb3b9d0115678db706c9648..99a0103398773809a253925962b80959cc92ab6b 100644 (file)
@@ -107,8 +107,7 @@ class BasePipeline:
             return
 
         if flush:
-            self.pgconn.send_flush_request()
-            yield from send(self.pgconn)
+            yield from self._flush_gen()
 
         to_process = []
         while self.result_queue:
@@ -123,6 +122,10 @@ class BasePipeline:
         for queued, results in to_process:
             self._process_results(queued, results)
 
+    def _flush_gen(self) -> PQGen[None]:
+        self.pgconn.send_flush_request()
+        yield from send(self.pgconn)
+
     def _process_results(
         self, queued: PendingResult, results: List["PGresult"]
     ) -> None:
index d96019d75212e19753940859a224636ea2d47583..87dbcb847f00422782fb014a61c104aa30086732 100644 (file)
@@ -48,7 +48,7 @@ class BaseCursor(Generic[ConnectionType, Row]):
     __slots__ = """
         _conn format _adapters arraysize _closed _results pgresult _pos
         _iresult _rowcount _query _tx _last_query _row_factory _make_row
-        _pgconn _encoding
+        _pgconn _encoding _execmany_returning
         __weakref__
         """.split()
 
@@ -67,6 +67,8 @@ class BaseCursor(Generic[ConnectionType, Row]):
         self._closed = False
         self._last_query: Optional[Query] = None
         self._reset()
+        # None if executemany() not executing, True/False according to returning state
+        self._execmany_returning: Optional[bool] = None
 
     def _reset(self, reset_query: bool = True) -> None:
         self._results: List["PGresult"] = []
@@ -212,9 +214,13 @@ class BaseCursor(Generic[ConnectionType, Row]):
         self, query: Query, params_seq: Iterable[Params], returning: bool
     ) -> PQGen[None]:
         """Generator implementing `Cursor.executemany()`."""
+        pipeline = self._conn._pipeline
+        assert pipeline
+
         yield from self._start_query(query)
+        self._rowcount = 0
+
         first = True
-        nrows = 0
         for params in params_seq:
             if first:
                 pgq = self._convert_query(query, params)
@@ -223,34 +229,16 @@ class BaseCursor(Generic[ConnectionType, Row]):
             else:
                 pgq.dump(params)
 
-            results = yield from self._maybe_prepare_gen(pgq, prepare=True)
-
-            if self._conn._pipeline:
-                yield from self._conn._pipeline._communicate_gen()
-            else:
-                assert results is not None
-                self._check_results(results)
-                if returning and results[0].status == ExecStatus.TUPLES_OK:
-                    self._results.extend(results)
-
-                for res in results:
-                    nrows += res.command_tuples or 0
-
-        if not self._conn._pipeline:
-            if self._results:
-                self._set_current_result(0)
-
-            # Override rowcount for the first result. Calls to nextset() will
-            # change it to the value of that result only, but we hope nobody
-            # will notice.
-            # You haven't read this comment.
-            self._rowcount = nrows
+            yield from self._maybe_prepare_gen(pgq, prepare=True)
+            yield from pipeline._communicate_gen()
 
         self._last_query = query
 
         for cmd in self._conn._prepared.get_maintenance_commands():
             yield from self._conn._exec_command(cmd)
 
+        yield from pipeline._flush_gen()
+
     def _maybe_prepare_gen(
         self,
         pgq: PostgresQuery,
@@ -483,11 +471,34 @@ class BaseCursor(Generic[ConnectionType, Row]):
 
     def _set_results_from_pipeline(self, results: List["PGresult"]) -> None:
         self._check_results(results)
-        if not self._results:
-            self._results = results
-            self._set_current_result(0)
-        else:
+        first_batch = not self._results
+
+        if self._execmany_returning is None:
+            # Received from execute()
+            # TODO: bug we also end up here on executemany() if run from inside
+            # a pipeline block. This causes a wrong rowcount. As it isn't so
+            # serious, currently leaving it this way.
+            first_batch = not self._results
             self._results.extend(results)
+            if first_batch:
+                self._set_current_result(0)
+
+        else:
+            # Received from executemany()
+            if self._execmany_returning and results[0].status == ExecStatus.TUPLES_OK:
+                self._results.extend(results)
+                if first_batch:
+                    self._set_current_result(0)
+                    self._rowcount = 0
+
+            # Override rowcount for the first result. Calls to nextset() will
+            # change it to the value of that result only, but we hope nobody
+            # will notice.
+            # You haven't read this comment.
+            if self._rowcount < 0:
+                self._rowcount = 0
+            for res in results:
+                self._rowcount += res.command_tuples or 0
 
     def _send_prepare(self, name: bytes, query: PostgresQuery) -> None:
         if self._conn._pipeline:
@@ -645,10 +656,15 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
         Execute the same command with a sequence of input data.
         """
         try:
-            with self._conn.lock:
-                self._conn.wait(self._executemany_gen(query, params_seq, returning))
+            with self._conn.pipeline():
+                with self._conn.lock:
+                    assert self._execmany_returning is None
+                    self._execmany_returning = returning
+                    self._conn.wait(self._executemany_gen(query, params_seq, returning))
         except e.Error as ex:
             raise ex.with_traceback(None)
+        finally:
+            self._execmany_returning = None
 
     def stream(
         self,
index 75131430de20073ea78f54c7251911ac597ffb8c..61bdc1f35eb4bbf794733bb9281df92ae1b02414 100644 (file)
@@ -86,12 +86,17 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
         returning: bool = False,
     ) -> None:
         try:
-            async with self._conn.lock:
-                await self._conn.wait(
-                    self._executemany_gen(query, params_seq, returning)
-                )
+            async with self._conn.pipeline():
+                async with self._conn.lock:
+                    assert self._execmany_returning is None
+                    self._execmany_returning = returning
+                    await self._conn.wait(
+                        self._executemany_gen(query, params_seq, returning)
+                    )
         except e.Error as ex:
             raise ex.with_traceback(None)
+        finally:
+            self._execmany_returning = None
 
     async def stream(
         self,
index fde3c99f10be9bfb2fb77202fd263e4eeda19b9d..090fb11f1680946f226b6a4b42c35b33ddd4239c 100644 (file)
@@ -168,6 +168,12 @@ def test_executemany(conn):
             "insert into execmanypipeline(num) values (%s) returning id",
             [(10,), (20,)],
         )
+
+        # TODO: this is a bug, it should be 2. It is caused by reentering the
+        # pipeline mode in executemany(). Leaving it here to monitor how it
+        # changes. The snag is in Cursor._set_results_from_pipeline()
+        assert cur.rowcount == 0
+
         assert cur.fetchone() == (1,)
         assert cur.nextset()
         assert cur.fetchone() == (2,)