methods `!fetch*()` will operate on.
"""
if self._iresult < len(self._results) - 1:
- self._set_result(self._iresult + 1)
+ self._set_current_result(self._iresult + 1)
return True
else:
return None
)
self._check_results(results)
self._results = results
- self._set_result(0)
+ self._set_current_result(0)
self._last_query = query
for cmd in self._conn._prepared.get_maintenance_commands():
nrows += res.command_tuples or 0
if self._results:
- self._set_result(0)
+ self._set_current_result(0)
# Override rowcount for the first result. Calls to nextset() will change
# it to the value of that result only, but we hope nobody will notice.
f" {ExecStatus(result.status).name}"
)
- def _set_result(self, i: int, format: Optional[Format] = None) -> None:
+ def _set_current_result(
+ self, i: int, format: Optional[Format] = None
+ ) -> None:
"""
Select one of the results in the cursor as the active one.
"""
name, pgq.params, param_formats=pgq.formats, result_format=fmt
)
- def _check_result(self) -> None:
+ def _check_result_for_fetch(self) -> None:
if self.closed:
raise e.InterfaceError("the cursor is closed")
res = self.pgresult
)
def _scroll(self, value: int, mode: str) -> None:
- self._check_result()
+ self._check_result_for_fetch()
assert self.pgresult
if mode == "relative":
newpos = self._pos + value
:rtype: Optional[Row], with Row defined by `row_factory`
"""
- self._check_result()
+ self._check_result_for_fetch()
record = self._tx.load_row(self._pos, self._make_row)
if record is not None:
self._pos += 1
:rtype: Sequence[Row], with Row defined by `row_factory`
"""
- self._check_result()
+ self._check_result_for_fetch()
assert self.pgresult
if not size:
:rtype: Sequence[Row], with Row defined by `row_factory`
"""
- self._check_result()
+ self._check_result_for_fetch()
assert self.pgresult
records = self._tx.load_rows(
self._pos, self.pgresult.ntuples, self._make_row
return records
def __iter__(self) -> Iterator[Row]:
- self._check_result()
+ self._check_result_for_fetch()
def load(pos: int) -> Optional[Row]:
return self._tx.load_row(pos, self._make_row)
first = False
async def fetchone(self) -> Optional[Row]:
- self._check_result()
+ self._check_result_for_fetch()
rv = self._tx.load_row(self._pos, self._make_row)
if rv is not None:
self._pos += 1
return rv
async def fetchmany(self, size: int = 0) -> List[Row]:
- self._check_result()
+ self._check_result_for_fetch()
assert self.pgresult
if not size:
return records
async def fetchall(self) -> List[Row]:
- self._check_result()
+ self._check_result_for_fetch()
assert self.pgresult
records = self._tx.load_rows(
self._pos, self.pgresult.ntuples, self._make_row
return records
async def __aiter__(self) -> AsyncIterator[Row]:
- self._check_result()
+ self._check_result_for_fetch()
def load(pos: int) -> Optional[Row]:
return self._tx.load_row(pos, self._make_row)