self._closed = False
self._last_query: Optional[Query] = None
self._reset()
- # None if executemany() not executing, True/False according to returning state
- self._execmany_returning: Optional[bool] = None
def _reset(self, reset_query: bool = True) -> None:
self._results: List["PGresult"] = []
self._rowcount = -1
self._query: Optional[PostgresQuery]
self._encoding = "utf-8"
+ # None if executemany() not executing, True/False according to returning state
+ self._execmany_returning: Optional[bool] = None
if reset_query:
self._query = None
yield from self._conn._exec_command(cmd)
def _executemany_gen_pipeline(
- self, query: Query, params_seq: Iterable[Params]
+ self, query: Query, params_seq: Iterable[Params], returning: bool
) -> PQGen[None]:
"""
Generator implementing `Cursor.executemany()` with pipelines available.
yield from self._start_query(query)
self._rowcount = 0
+ assert self._execmany_returning is None
+ self._execmany_returning = returning
+
first = True
for params in params_seq:
if first:
try:
if Pipeline.is_supported():
with self._conn.pipeline(), self._conn.lock:
- assert self._execmany_returning is None
- self._execmany_returning = returning
- self._conn.wait(self._executemany_gen_pipeline(query, params_seq))
+ self._conn.wait(
+ self._executemany_gen_pipeline(query, params_seq, returning)
+ )
else:
with self._conn.lock:
self._conn.wait(
)
except e.Error as ex:
raise ex.with_traceback(None)
- finally:
- self._execmany_returning = None
def stream(
self,
yield copy
def _fetch_pipeline(self) -> None:
- if not self.pgresult and self._conn._pipeline:
+ if (
+ self._execmany_returning is not False
+ and not self.pgresult
+ and self._conn._pipeline
+ ):
with self._conn.lock:
self._conn.wait(self._conn._pipeline._fetch_gen(flush=True))
assert self.pgresult
try:
if Pipeline.is_supported():
async with self._conn.pipeline(), self._conn.lock:
- assert self._execmany_returning is None
- self._execmany_returning = returning
await self._conn.wait(
- self._executemany_gen_pipeline(query, params_seq)
+ self._executemany_gen_pipeline(query, params_seq, returning)
)
else:
await self._conn.wait(
)
except e.Error as ex:
raise ex.with_traceback(None)
- finally:
- self._execmany_returning = None
async def stream(
self,
yield copy
async def _fetch_pipeline(self) -> None:
- if not self.pgresult and self._conn._pipeline:
+ if (
+ self._execmany_returning is not False
+ and not self.pgresult
+ and self._conn._pipeline
+ ):
async with self._conn.lock:
await self._conn.wait(self._conn._pipeline._fetch_gen(flush=True))
assert self.pgresult
assert cur.nextset() is None
+def test_executemany_no_returning(conn):
+ conn.autocommit = True
+ conn.execute("drop table if exists execmanypipelinenoreturning")
+ conn.execute(
+ "create unlogged table execmanypipelinenoreturning ("
+ " id serial primary key, num integer)"
+ )
+ with conn.pipeline(), conn.cursor() as cur:
+ cur.executemany(
+ "insert into execmanypipelinenoreturning(num) values (%s)",
+ [(10,), (20,)],
+ returning=False,
+ )
+ assert cur.rowcount == 2
+ with pytest.raises(e.ProgrammingError, match="no result available"):
+ cur.fetchone()
+ assert cur.nextset() is None
+ with pytest.raises(e.ProgrammingError, match="no result available"):
+ cur.fetchone()
+ assert cur.nextset() is None
+
+
def test_prepared(conn):
conn.autocommit = True
with conn.pipeline():
assert cur.nextset() is None
+async def test_executemany_no_returning(aconn):
+ await aconn.set_autocommit(True)
+ await aconn.execute("drop table if exists execmanypipelinenoreturning")
+ await aconn.execute(
+ "create unlogged table execmanypipelinenoreturning ("
+ " id serial primary key, num integer)"
+ )
+ async with aconn.pipeline(), aconn.cursor() as cur:
+ await cur.executemany(
+ "insert into execmanypipelinenoreturning(num) values (%s)",
+ [(10,), (20,)],
+ returning=False,
+ )
+ assert cur.rowcount == 2
+ with pytest.raises(e.ProgrammingError, match="no result available"):
+ await cur.fetchone()
+ assert cur.nextset() is None
+ with pytest.raises(e.ProgrammingError, match="no result available"):
+ await cur.fetchone()
+ assert cur.nextset() is None
+
+
async def test_prepared(aconn):
await aconn.set_autocommit(True)
async with aconn.pipeline():