@pgresult.setter
def pgresult(self, result: Optional["PGresult"]) -> None:
self._pgresult = result
- if result and self._tx:
- self._tx.set_pgresult(result)
@property
def description(self) -> Optional[List[Column]]:
self._iresult += 1
if self._iresult < len(self._results):
self.pgresult = self._results[self._iresult]
+ self._tx.set_pgresult(self._results[self._iresult])
self._pos = 0
nrows = self.pgresult.command_tuples
self._rowcount = nrows if nrows is not None else -1
self._conn.pgconn.set_single_row_mode()
self._last_query = query
- def _stream_fetchone_gen(self) -> PQGen[Optional["PGresult"]]:
+ def _stream_fetchone_gen(self, first: bool) -> PQGen[Optional["PGresult"]]:
yield from generators.send(self._conn.pgconn)
res = yield from generators.fetch(self._conn.pgconn)
if res is None:
return None
elif res.status == ExecStatus.SINGLE_TUPLE:
- self.pgresult = res # will set it on the transformer too
- # TODO: the transformer may do excessive work here: create a
- # path that doesn't clear the loaders every time.
+ self.pgresult = res
+ self._tx.set_pgresult(res, set_loaders=first)
return res
elif res.status in (ExecStatus.TUPLES_OK, ExecStatus.COMMAND_OK):
self._execute_send(query, no_pqexec=True)
(result,) = yield from execute(self._conn.pgconn)
self._check_copy_result(result)
- self.pgresult = result # will set it on the transformer too
+ self.pgresult = result
+ self._tx.set_pgresult(result)
def _execute_send(
self, query: PostgresQuery, no_pqexec: bool = False
self._results = list(results)
self.pgresult = results[0]
+ self._tx.set_pgresult(results[0])
nrows = self.pgresult.command_tuples
if nrows is not None:
if self._rowcount < 0:
"""
with self._conn.lock:
self._conn.wait(self._stream_send_gen(query, params))
- while self._conn.wait(self._stream_fetchone_gen()):
+ first = True
+ while self._conn.wait(self._stream_fetchone_gen(first)):
rec = self._tx.load_row(0)
assert rec is not None
yield rec
+ first = False
def fetchone(self) -> Optional[Sequence[Any]]:
"""
) -> AsyncIterator[Sequence[Any]]:
async with self._conn.lock:
await self._conn.wait(self._stream_send_gen(query, params))
- while await self._conn.wait(self._stream_fetchone_gen()):
+ first = True
+ while await self._conn.wait(self._stream_fetchone_gen(first)):
rec = self._tx.load_row(0)
assert rec is not None
yield rec
+ first = False
async def fetchone(self) -> Optional[Sequence[Any]]:
self._check_result()