__module__ = "psycopg3.adapt"
_adapters: "AdaptersMap"
_pgresult: Optional["PGresult"] = None
- make_row: RowMaker = tuple
def __init__(self, context: Optional[AdaptContext] = None):
dumper = cache[key1] = dumper.upgrade(obj, format)
return dumper
- def load_rows(self, row0: int, row1: int) -> List[Row]:
+ def load_rows(self, row0: int, row1: int, make_row: RowMaker) -> List[Row]:
res = self._pgresult
if not res:
raise e.InterfaceError("result not set")
val = res.get_value(row, col)
if val is not None:
record[col] = self._row_loaders[col](val)
- records.append(self.make_row(record))
+ records.append(make_row(record))
return records
- def load_row(self, row: int) -> Optional[Row]:
+ def load_row(self, row: int, make_row: RowMaker) -> Optional[Row]:
res = self._pgresult
if not res:
return None
if val is not None:
record[col] = self._row_loaders[col](val)
- return self.make_row(record) # type: ignore[no-any-return]
+ return make_row(record) # type: ignore[no-any-return]
def load_sequence(
self, record: Sequence[Optional[bytes]]
if sys.version_info >= (3, 7):
__slots__ = """
_conn format _adapters arraysize _closed _results pgresult _pos
- _iresult _rowcount _pgq _tx _last_query _row_factory
+ _iresult _rowcount _pgq _tx _last_query _row_factory _make_row
__weakref__
""".split()
if self._iresult < len(self._results):
self.pgresult = self._results[self._iresult]
self._tx.set_pgresult(self._results[self._iresult])
- self._tx.make_row = self._row_factory(self)
+ self._make_row = self._row_factory(self)
self._pos = 0
nrows = self.pgresult.command_tuples
self._rowcount = nrows if nrows is not None else -1
def row_factory(self, row_factory: RowFactory) -> None:
self._row_factory = row_factory
if self.pgresult:
- self._tx.make_row = row_factory(self)
+ self._make_row = row_factory(self)
#
# Generators for the high level operations on the cursor
self.pgresult = res
self._tx.set_pgresult(res, set_loaders=first)
if first:
- self._tx.make_row = self._row_factory(self)
+ self._make_row = self._row_factory(self)
return res
elif res.status in (ExecStatus.TUPLES_OK, ExecStatus.COMMAND_OK):
self._results = list(results)
self.pgresult = results[0]
self._tx.set_pgresult(results[0])
- self._tx.make_row = self._row_factory(self)
+ self._make_row = self._row_factory(self)
nrows = self.pgresult.command_tuples
if nrows is not None:
if self._rowcount < 0:
self._conn.wait(self._stream_send_gen(query, params))
first = True
while self._conn.wait(self._stream_fetchone_gen(first)):
- rec = self._tx.load_row(0)
+ rec = self._tx.load_row(0, self._make_row)
assert rec is not None
yield rec
first = False
:rtype: Optional[Row], with Row defined by `row_factory`
"""
self._check_result()
- record = self._tx.load_row(self._pos)
+ record = self._tx.load_row(self._pos, self._make_row)
if record is not None:
self._pos += 1
return record
if not size:
size = self.arraysize
records: List[Row] = self._tx.load_rows(
- self._pos, min(self._pos + size, self.pgresult.ntuples)
+ self._pos,
+ min(self._pos + size, self.pgresult.ntuples),
+ self._make_row,
)
self._pos += len(records)
return records
self._check_result()
assert self.pgresult
records: List[Row] = self._tx.load_rows(
- self._pos, self.pgresult.ntuples
+ self._pos, self.pgresult.ntuples, self._make_row
)
self._pos = self.pgresult.ntuples
return records
def __iter__(self) -> Iterator[Row]:
self._check_result()
- load = self._tx.load_row
+ def load(pos: int) -> Optional[Row]:
+ return self._tx.load_row(pos, self._make_row)
while 1:
row = load(self._pos)
await self._conn.wait(self._stream_send_gen(query, params))
first = True
while await self._conn.wait(self._stream_fetchone_gen(first)):
- rec = self._tx.load_row(0)
+ rec = self._tx.load_row(0, self._make_row)
assert rec is not None
yield rec
first = False
async def fetchone(self) -> Optional[Row]:
self._check_result()
- rv = self._tx.load_row(self._pos)
+ rv = self._tx.load_row(self._pos, self._make_row)
if rv is not None:
self._pos += 1
return rv
if not size:
size = self.arraysize
records: List[Row] = self._tx.load_rows(
- self._pos, min(self._pos + size, self.pgresult.ntuples)
+ self._pos,
+ min(self._pos + size, self.pgresult.ntuples),
+ self._make_row,
)
self._pos += len(records)
return records
self._check_result()
assert self.pgresult
records: List[Row] = self._tx.load_rows(
- self._pos, self.pgresult.ntuples
+ self._pos, self.pgresult.ntuples, self._make_row
)
self._pos = self.pgresult.ntuples
return records
async def __aiter__(self) -> AsyncIterator[Row]:
self._check_result()
- load = self._tx.load_row
+ def load(pos: int) -> Optional[Row]:
+ return self._tx.load_row(pos, self._make_row)
while 1:
row = load(self._pos)
def __init__(self, context: Optional[AdaptContext] = None):
...
- make_row: RowMaker
-
@property
def connection(self) -> Optional["BaseConnection"]:
...
def get_dumper(self, obj: Any, format: Format) -> "Dumper":
...
- def load_rows(self, row0: int, row1: int) -> List[Row]:
+ def load_rows(self, row0: int, row1: int, make_row: RowMaker) -> List[Row]:
...
- def load_row(self, row: int) -> Optional[Row]:
+ def load_row(self, row: int, make_row: RowMaker) -> Optional[Row]:
...
def load_sequence(
cur.pgresult = res
cur._tx.set_pgresult(res, set_loaders=False)
- return cur._tx.load_rows(0, res.ntuples)
+ return cur._tx.load_rows(0, res.ntuples, cur._make_row)
def _scroll_gen(
self, cur: BaseCursor[ConnectionType], value: int, mode: str
class Transformer(proto.AdaptContext):
def __init__(self, context: Optional[proto.AdaptContext] = None): ...
- make_row: proto.RowMaker
@property
def connection(self) -> Optional[BaseConnection]: ...
@property
self, params: Sequence[Any], formats: Sequence[Format]
) -> Tuple[List[Any], Tuple[int, ...], Sequence[pq.Format]]: ...
def get_dumper(self, obj: Any, format: Format) -> Dumper: ...
- def load_rows(self, row0: int, row1: int) -> List[proto.Row]: ...
- def load_row(self, row: int) -> Optional[proto.Row]: ...
+ def load_rows(
+ self, row0: int, row1: int, make_row: proto.RowMaker
+ ) -> List[proto.Row]: ...
+ def load_row(
+ self, row: int, make_row: proto.RowMaker
+ ) -> Optional[proto.Row]: ...
def load_sequence(
self, record: Sequence[Optional[bytes]]
) -> Tuple[Any, ...]: ...
cdef int _nfields, _ntuples
cdef list _row_dumpers
cdef list _row_loaders
- cdef public object make_row
def __cinit__(self, context: Optional["AdaptContext"] = None):
if context is not None:
return ps, ts, fs
- def load_rows(self, int row0, int row1) -> List[Row]:
+ def load_rows(self, int row0, int row1, object make_row) -> List[Row]:
if self._pgresult is None:
raise e.InterfaceError("result not set")
Py_INCREF(pyval)
PyTuple_SET_ITEM(<object>brecord, col, pyval)
- cdef object make_row = self.make_row
if make_row is not tuple:
for i in range(row1 - row0):
brecord = PyList_GET_ITEM(records, i)
Py_DECREF(<object>brecord)
return records
- def load_row(self, int row) -> Optional[Row]:
+ def load_row(self, int row, object make_row) -> Optional[Row]:
if self._pgresult is None:
return None
Py_INCREF(pyval)
PyTuple_SET_ITEM(record, col, pyval)
- cdef object make_row = self.make_row
if make_row is not tuple:
record = PyObject_CallFunctionObjArgs(
make_row, <PyObject *>record, NULL)
row = cur.execute("select 1 as a").fetchone()
assert row == (1,)
assert type(row) is tuple
- assert cur._tx.make_row is tuple
+ assert cur._make_row is tuple
def test_dict_row(conn):