# Copyright (C) 2020 The Psycopg Team
import codecs
-from typing import Any, Callable, Dict, Generator, Iterable, List, Optional
+from typing import Any, Callable, Dict, Iterable, List, Optional
from typing import Tuple, Type, Union
from . import errors as e
def load_sequence(
self, record: Iterable[Optional[bytes]]
- ) -> Generator[Any, None, None]:
- for val, loader in zip(record, self._row_loaders):
- if val is not None:
- yield loader(val)
- else:
- yield None
+ ) -> Tuple[Any, ...]:
+ return tuple(
+ (self._row_loaders[i](val) if val is not None else None)
+ for i, val in enumerate(record)
+ )
def load(self, data: bytes, oid: int, format: Format = Format.TEXT) -> Any:
if data is not None:
@pgresult.setter
def pgresult(self, result: Optional[pq.PGresult]) -> None:
self._pgresult = result
- if result is not None and self._transformer is not None:
- self._transformer.set_row_types(
- (result.ftype(i), result.fformat(i))
- for i in range(result.nfields)
- )
+ if result is not None:
+ self._ntuples = result.ntuples
+ self._nfields = result.nfields
+ if self._transformer is not None:
+ self._transformer.set_row_types(
+ (result.ftype(i), result.fformat(i))
+ for i in range(self._nfields)
+ )
@property
def description(self) -> Optional[List[Column]]:
else:
return None
- def _load_row(self, n: int) -> Optional[Tuple[Any, ...]]:
+ def _check_result(self) -> None:
res = self.pgresult
if res is None:
raise e.ProgrammingError("no result available")
"the last operation didn't produce a result"
)
- if n >= res.ntuples:
+ def _load_row(self, n: int) -> Optional[Tuple[Any, ...]]:
+ if n >= self._ntuples:
return None
- return tuple(
- self._transformer.load_sequence(
- res.get_value(n, i) for i in range(res.nfields)
- )
+ get_value = self.pgresult.get_value # type: ignore
+ return self._transformer.load_sequence(
+ get_value(n, i) for i in range(self._nfields)
)
return self
def fetchone(self) -> Optional[Sequence[Any]]:
+ self._check_result()
rv = self._load_row(self._pos)
if rv is not None:
self._pos += 1
return rv
def fetchmany(self, size: Optional[int] = None) -> List[Sequence[Any]]:
+ self._check_result()
if size is None:
size = self.arraysize
return rv
def fetchall(self) -> List[Sequence[Any]]:
+ self._check_result()
rv: List[Sequence[Any]] = []
while 1:
row = self._load_row(self._pos)
return self
async def fetchone(self) -> Optional[Sequence[Any]]:
+ self._check_result()
rv = self._load_row(self._pos)
if rv is not None:
self._pos += 1
async def fetchmany(
self, size: Optional[int] = None
) -> List[Sequence[Any]]:
+ self._check_result()
if size is None:
size = self.arraysize
return rv
async def fetchall(self) -> List[Sequence[Any]]:
+ self._check_result()
rv: List[Sequence[Any]] = []
while 1:
row = self._load_row(self._pos)
self._config_types(data)
self._types_set = True
- return tuple(
- self._tx.load_sequence(
- data[offset : offset + length] if length != -1 else None
- for _, offset, length in self._walk_record(data)
- )
+ return self._tx.load_sequence(
+ data[offset : offset + length] if length != -1 else None
+ for _, offset, length in self._walk_record(data)
)
def _walk_record(