__slots__ = """
_conn format _adapters arraysize _closed _results pgresult _pos
_iresult _rowcount _query _tx _last_query _row_factory _make_row
- _pgconn _encoding
+ _pgconn _encoding _execmany_returning
__weakref__
""".split()
self._closed = False
self._last_query: Optional[Query] = None
self._reset()
+ # None if executemany() not executing, True/False according to returning state
+ self._execmany_returning: Optional[bool] = None
def _reset(self, reset_query: bool = True) -> None:
self._results: List["PGresult"] = []
self, query: Query, params_seq: Iterable[Params], returning: bool
) -> PQGen[None]:
"""Generator implementing `Cursor.executemany()`."""
+ pipeline = self._conn._pipeline
+ assert pipeline
+
yield from self._start_query(query)
+ self._rowcount = 0
+
first = True
- nrows = 0
for params in params_seq:
if first:
pgq = self._convert_query(query, params)
else:
pgq.dump(params)
- results = yield from self._maybe_prepare_gen(pgq, prepare=True)
-
- if self._conn._pipeline:
- yield from self._conn._pipeline._communicate_gen()
- else:
- assert results is not None
- self._check_results(results)
- if returning and results[0].status == ExecStatus.TUPLES_OK:
- self._results.extend(results)
-
- for res in results:
- nrows += res.command_tuples or 0
-
- if not self._conn._pipeline:
- if self._results:
- self._set_current_result(0)
-
- # Override rowcount for the first result. Calls to nextset() will
- # change it to the value of that result only, but we hope nobody
- # will notice.
- # You haven't read this comment.
- self._rowcount = nrows
+ yield from self._maybe_prepare_gen(pgq, prepare=True)
+ yield from pipeline._communicate_gen()
self._last_query = query
for cmd in self._conn._prepared.get_maintenance_commands():
yield from self._conn._exec_command(cmd)
+ yield from pipeline._flush_gen()
+
def _maybe_prepare_gen(
self,
pgq: PostgresQuery,
def _set_results_from_pipeline(self, results: List["PGresult"]) -> None:
self._check_results(results)
- if not self._results:
- self._results = results
- self._set_current_result(0)
- else:
+ first_batch = not self._results
+
+ if self._execmany_returning is None:
+ # Received from execute()
+ # TODO: bug we also end up here on executemany() if run from inside
+ # a pipeline block. This causes a wrong rowcount. As it isn't so
+ # serious, currently leaving it this way.
+ first_batch = not self._results
self._results.extend(results)
+ if first_batch:
+ self._set_current_result(0)
+
+ else:
+ # Received from executemany()
+ if self._execmany_returning and results[0].status == ExecStatus.TUPLES_OK:
+ self._results.extend(results)
+ if first_batch:
+ self._set_current_result(0)
+ self._rowcount = 0
+
+ # Override rowcount for the first result. Calls to nextset() will
+ # change it to the value of that result only, but we hope nobody
+ # will notice.
+ # You haven't read this comment.
+ if self._rowcount < 0:
+ self._rowcount = 0
+ for res in results:
+ self._rowcount += res.command_tuples or 0
def _send_prepare(self, name: bytes, query: PostgresQuery) -> None:
if self._conn._pipeline:
Execute the same command with a sequence of input data.
"""
try:
- with self._conn.lock:
- self._conn.wait(self._executemany_gen(query, params_seq, returning))
+ with self._conn.pipeline():
+ with self._conn.lock:
+ assert self._execmany_returning is None
+ self._execmany_returning = returning
+ self._conn.wait(self._executemany_gen(query, params_seq, returning))
except e.Error as ex:
raise ex.with_traceback(None)
+ finally:
+ self._execmany_returning = None
def stream(
self,