From: Daniele Varrazzo Date: Sun, 27 Mar 2022 03:55:51 +0000 (+0200) Subject: perf: base executemany on pipeline X-Git-Tag: 3.1~145^2~18 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7acf241ffee0df08692a8d11547504002dbafba6;p=thirdparty%2Fpsycopg.git perf: base executemany on pipeline This changeset also fix several glitches of executemany() run in pipeline mode, around the management of returned value and rowcount. These glitches still appear if executemany() is run in an explicit pipeline() block, because certain events only happen at (outermost) pipeline block exit. Related to: #145. --- diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index 26923682b..99a010339 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -107,8 +107,7 @@ class BasePipeline: return if flush: - self.pgconn.send_flush_request() - yield from send(self.pgconn) + yield from self._flush_gen() to_process = [] while self.result_queue: @@ -123,6 +122,10 @@ class BasePipeline: for queued, results in to_process: self._process_results(queued, results) + def _flush_gen(self) -> PQGen[None]: + self.pgconn.send_flush_request() + yield from send(self.pgconn) + def _process_results( self, queued: PendingResult, results: List["PGresult"] ) -> None: diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index d96019d75..87dbcb847 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -48,7 +48,7 @@ class BaseCursor(Generic[ConnectionType, Row]): __slots__ = """ _conn format _adapters arraysize _closed _results pgresult _pos _iresult _rowcount _query _tx _last_query _row_factory _make_row - _pgconn _encoding + _pgconn _encoding _execmany_returning __weakref__ """.split() @@ -67,6 +67,8 @@ class BaseCursor(Generic[ConnectionType, Row]): self._closed = False self._last_query: Optional[Query] = None self._reset() + # None if executemany() not executing, True/False according to returning state + self._execmany_returning: Optional[bool] = None def _reset(self, reset_query: bool = True) -> None: self._results: List["PGresult"] = [] @@ -212,9 +214,13 @@ class BaseCursor(Generic[ConnectionType, Row]): self, query: Query, params_seq: Iterable[Params], returning: bool ) -> PQGen[None]: """Generator implementing `Cursor.executemany()`.""" + pipeline = self._conn._pipeline + assert pipeline + yield from self._start_query(query) + self._rowcount = 0 + first = True - nrows = 0 for params in params_seq: if first: pgq = self._convert_query(query, params) @@ -223,34 +229,16 @@ class BaseCursor(Generic[ConnectionType, Row]): else: pgq.dump(params) - results = yield from self._maybe_prepare_gen(pgq, prepare=True) - - if self._conn._pipeline: - yield from self._conn._pipeline._communicate_gen() - else: - assert results is not None - self._check_results(results) - if returning and results[0].status == ExecStatus.TUPLES_OK: - self._results.extend(results) - - for res in results: - nrows += res.command_tuples or 0 - - if not self._conn._pipeline: - if self._results: - self._set_current_result(0) - - # Override rowcount for the first result. Calls to nextset() will - # change it to the value of that result only, but we hope nobody - # will notice. - # You haven't read this comment. - self._rowcount = nrows + yield from self._maybe_prepare_gen(pgq, prepare=True) + yield from pipeline._communicate_gen() self._last_query = query for cmd in self._conn._prepared.get_maintenance_commands(): yield from self._conn._exec_command(cmd) + yield from pipeline._flush_gen() + def _maybe_prepare_gen( self, pgq: PostgresQuery, @@ -483,11 +471,34 @@ class BaseCursor(Generic[ConnectionType, Row]): def _set_results_from_pipeline(self, results: List["PGresult"]) -> None: self._check_results(results) - if not self._results: - self._results = results - self._set_current_result(0) - else: + first_batch = not self._results + + if self._execmany_returning is None: + # Received from execute() + # TODO: bug we also end up here on executemany() if run from inside + # a pipeline block. This causes a wrong rowcount. As it isn't so + # serious, currently leaving it this way. + first_batch = not self._results self._results.extend(results) + if first_batch: + self._set_current_result(0) + + else: + # Received from executemany() + if self._execmany_returning and results[0].status == ExecStatus.TUPLES_OK: + self._results.extend(results) + if first_batch: + self._set_current_result(0) + self._rowcount = 0 + + # Override rowcount for the first result. Calls to nextset() will + # change it to the value of that result only, but we hope nobody + # will notice. + # You haven't read this comment. + if self._rowcount < 0: + self._rowcount = 0 + for res in results: + self._rowcount += res.command_tuples or 0 def _send_prepare(self, name: bytes, query: PostgresQuery) -> None: if self._conn._pipeline: @@ -645,10 +656,15 @@ class Cursor(BaseCursor["Connection[Any]", Row]): Execute the same command with a sequence of input data. """ try: - with self._conn.lock: - self._conn.wait(self._executemany_gen(query, params_seq, returning)) + with self._conn.pipeline(): + with self._conn.lock: + assert self._execmany_returning is None + self._execmany_returning = returning + self._conn.wait(self._executemany_gen(query, params_seq, returning)) except e.Error as ex: raise ex.with_traceback(None) + finally: + self._execmany_returning = None def stream( self, diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py index 75131430d..61bdc1f35 100644 --- a/psycopg/psycopg/cursor_async.py +++ b/psycopg/psycopg/cursor_async.py @@ -86,12 +86,17 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): returning: bool = False, ) -> None: try: - async with self._conn.lock: - await self._conn.wait( - self._executemany_gen(query, params_seq, returning) - ) + async with self._conn.pipeline(): + async with self._conn.lock: + assert self._execmany_returning is None + self._execmany_returning = returning + await self._conn.wait( + self._executemany_gen(query, params_seq, returning) + ) except e.Error as ex: raise ex.with_traceback(None) + finally: + self._execmany_returning = None async def stream( self, diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index fde3c99f1..090fb11f1 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -168,6 +168,12 @@ def test_executemany(conn): "insert into execmanypipeline(num) values (%s) returning id", [(10,), (20,)], ) + + # TODO: this is a bug, it should be 2. It is caused by reentering the + # pipeline mode in executemany(). Leaving it here to monitor how it + # changes. The snag is in Cursor._set_results_from_pipeline() + assert cur.rowcount == 0 + assert cur.fetchone() == (1,) assert cur.nextset() assert cur.fetchone() == (2,)