# Copyright (C) 2020 The Psycopg Team
import codecs
-from typing import Any, Callable, Dict, Generator, List, Optional, Sequence
+from typing import Any, Callable, Dict, Generator, Iterable, List, Optional
from typing import Tuple, Type, Union
from . import errors as e
-from .pq import Format, PGresult
+from .pq import Format
from .cursor import BaseCursor
from .types.oids import builtins, INVALID_OID
from .connection import BaseConnection
# mapping oid, fmt -> cast function
self._cast_funcs: Dict[Tuple[int, Format], TypeCasterFunc] = {}
- # The result to return values from
- self._result: Optional[PGresult] = None
-
# sequence of cast function from value to python
# the length of the result columns
self._row_casters: List[TypeCasterFunc] = []
- @property
- def result(self) -> Optional[PGresult]:
- return self._result
-
- @result.setter
- def result(self, result: PGresult) -> None:
- if self._result is result:
- return
-
- rc = self._row_casters = []
- for c in range(result.nfields):
- oid = result.ftype(c)
- fmt = result.fformat(c)
- func = self.get_cast_function(oid, fmt)
- rc.append(func)
-
def adapt_sequence(
- self, objs: Sequence[Any], formats: Sequence[Format]
+ self, objs: Iterable[Any], formats: Iterable[Format]
) -> Tuple[List[Optional[bytes]], List[int]]:
out = []
types = []
f"cannot adapt type {src} to format {Format(format).name}"
)
- def cast_row(self, result: PGresult, n: int) -> Generator[Any, None, None]:
- self.result = result
-
- for col, func in enumerate(self._row_casters):
- v = result.get_value(n, col)
- if v is not None:
- v = func(v)
- yield v
+ def set_row_types(self, types: Iterable[Tuple[int, Format]]) -> None:
+ rc = self._row_casters = []
+ for oid, fmt in types:
+ rc.append(self.get_cast_function(oid, fmt))
+
+ def cast_sequence(
+ self, record: Iterable[Optional[bytes]]
+ ) -> Generator[Any, None, None]:
+ for val, caster in zip(record, self._row_casters):
+ if val is not None:
+ yield caster(val)
+ else:
+ yield None
def cast(self, data: bytes, oid: int, format: Format = Format.TEXT) -> Any:
if data is not None:
def _reset(self) -> None:
from .adapt import Transformer
+ self._transformer = Transformer(self) # TODO: circular reference
self._results: List[PGresult] = []
self.pgresult: Optional[PGresult] = None
self._pos = 0
self._iresult = 0
- self._transformer = Transformer(self)
+
+ @property
+ def pgresult(self) -> Optional[PGresult]:
+ return self._pgresult
+
+ @pgresult.setter
+ def pgresult(self, result: Optional[PGresult]) -> None:
+ self._pgresult = result
+ if result is not None and self._transformer is not None:
+ self._transformer.set_row_types(
+ (result.ftype(i), result.fformat(i))
+ for i in range(result.nfields)
+ )
def _execute_send(
self, query: Query, vars: Optional[Params]
return rv
def _cast_row(self, n: int) -> Optional[Tuple[Any, ...]]:
- if self.pgresult is None:
+ res = self.pgresult
+ if res is None:
return None
- if n >= self.pgresult.ntuples:
+ if n >= res.ntuples:
return None
- return tuple(self._transformer.cast_row(self.pgresult, n))
+ return tuple(
+ self._transformer.cast_sequence(
+ res.get_value(n, i) for i in range(res.nfields)
+ )
+ )
class Cursor(BaseCursor):