from ._column import Column
from ._cmodule import _psycopg
from ._queries import PostgresQuery
+from ._pipeline import Pipeline
from ._encodings import pgconn_encoding
from ._preparing import Prepare
for cmd in self._conn._prepared.get_maintenance_commands():
yield from self._conn._exec_command(cmd)
- def _executemany_gen(
- self, query: Query, params_seq: Iterable[Params], returning: bool
+ def _executemany_gen_pipeline(
+ self, query: Query, params_seq: Iterable[Params]
) -> PQGen[None]:
- """Generator implementing `Cursor.executemany()`."""
+ """
+ Generator implementing `Cursor.executemany()` with pipelines available.
+ """
pipeline = self._conn._pipeline
assert pipeline
yield from pipeline._flush_gen()
+ def _executemany_gen_no_pipeline(
+ self, query: Query, params_seq: Iterable[Params], returning: bool
+ ) -> PQGen[None]:
+ """
+ Generator implementing `Cursor.executemany()` with pipelines not available.
+ """
+ yield from self._start_query(query)
+ first = True
+ nrows = 0
+ for params in params_seq:
+ if first:
+ pgq = self._convert_query(query, params)
+ self._query = pgq
+ first = False
+ else:
+ pgq.dump(params)
+
+ results = yield from self._maybe_prepare_gen(pgq, prepare=True)
+ assert results is not None
+ self._check_results(results)
+ if returning and results[0].status == ExecStatus.TUPLES_OK:
+ self._results.extend(results)
+
+ for res in results:
+ nrows += res.command_tuples or 0
+
+ if self._results:
+ self._set_current_result(0)
+
+ # Override rowcount for the first result. Calls to nextset() will change
+ # it to the value of that result only, but we hope nobody will notice.
+ # You haven't read this comment.
+ self._rowcount = nrows
+ self._last_query = query
+
+ for cmd in self._conn._prepared.get_maintenance_commands():
+ yield from self._conn._exec_command(cmd)
+
def _maybe_prepare_gen(
self,
pgq: PostgresQuery,
# TODO: bug we also end up here on executemany() if run from inside
# a pipeline block. This causes a wrong rowcount. As it isn't so
# serious, currently leaving it this way.
- first_batch = not self._results
self._results.extend(results)
if first_batch:
self._set_current_result(0)
Execute the same command with a sequence of input data.
"""
try:
- with self._conn.pipeline():
- with self._conn.lock:
+ if Pipeline.is_supported():
+ with self._conn.pipeline(), self._conn.lock:
assert self._execmany_returning is None
self._execmany_returning = returning
- self._conn.wait(self._executemany_gen(query, params_seq, returning))
+ self._conn.wait(self._executemany_gen_pipeline(query, params_seq))
+ else:
+ with self._conn.lock:
+ self._conn.wait(
+ self._executemany_gen_no_pipeline(query, params_seq, returning)
+ )
except e.Error as ex:
raise ex.with_traceback(None)
finally:
from .copy import AsyncCopy
from .rows import Row, RowMaker, AsyncRowFactory
from .cursor import BaseCursor
+from ._pipeline import Pipeline
if TYPE_CHECKING:
from .connection_async import AsyncConnection
returning: bool = False,
) -> None:
try:
- async with self._conn.pipeline():
- async with self._conn.lock:
+ if Pipeline.is_supported():
+ async with self._conn.pipeline(), self._conn.lock:
assert self._execmany_returning is None
self._execmany_returning = returning
await self._conn.wait(
- self._executemany_gen(query, params_seq, returning)
+ self._executemany_gen_pipeline(query, params_seq)
)
+ else:
+ await self._conn.wait(
+ self._executemany_gen_no_pipeline(query, params_seq, returning)
+ )
except e.Error as ex:
raise ex.with_traceback(None)
finally: