self,
name: str = "",
binary: bool = False,
- row_factory: RowFactory = cursor.default_row_factory,
+ row_factory: Optional[RowFactory] = None,
) -> "Cursor":
"""
Return a new `Cursor` to send commands and queries to the connection.
raise NotImplementedError
format = Format.BINARY if binary else Format.TEXT
- return self.cursor_factory(self, row_factory, format=format)
+ return self.cursor_factory(
+ self, format=format, row_factory=row_factory
+ )
def execute(
self,
self,
name: str = "",
binary: bool = False,
- row_factory: RowFactory = cursor.default_row_factory,
+ row_factory: Optional[RowFactory] = None,
) -> "AsyncCursor":
"""
Return a new `AsyncCursor` to send commands and queries to the connection.
raise NotImplementedError
format = Format.BINARY if binary else Format.TEXT
- return self.cursor_factory(self, row_factory, format=format)
+ return self.cursor_factory(
+ self, format=format, row_factory=row_factory
+ )
async def execute(
self,
import sys
from types import TracebackType
from typing import Any, AsyncIterator, Callable, Generic, Iterator, List
-from typing import Optional, NoReturn, Sequence, Type, TYPE_CHECKING
+from typing import Optional, NoReturn, Sequence, Type, TYPE_CHECKING, cast
from contextlib import contextmanager
from . import pq
execute = generators.execute
-def default_row_factory(cursor: Any) -> RowMaker:
- return lambda values: values
-
-
class BaseCursor(Generic[ConnectionType]):
# Slots with __weakref__ and generic bases don't work on Py 3.6
# https://bugs.python.org/issue41451
def __init__(
self,
connection: ConnectionType,
- row_factory: RowFactory,
format: Format = Format.TEXT,
+ row_factory: Optional[RowFactory] = None,
):
self._conn = connection
self.format = format
return None
elif res.status == ExecStatus.SINGLE_TUPLE:
- self._make_row = self._row_factory(self)
+ if self._row_factory:
+ self._make_row = self._row_factory(self)
self.pgresult = res # will set it on the transformer too
# TODO: the transformer may do excessive work here: create a
# path that doesn't clear the loaders every time.
self._results = list(results)
self.pgresult = results[0]
- self._make_row = self._row_factory(self)
+ if self._row_factory:
+ self._make_row = self._row_factory(self)
nrows = self.pgresult.command_tuples
if nrows is not None:
if self._rowcount < 0:
while self._conn.wait(self._stream_fetchone_gen()):
rec = self._tx.load_row(0)
assert rec is not None
- assert self._make_row is not None
- yield self._make_row(rec)
+ yield self._make_row(rec) if self._make_row else cast(Row, rec)
def fetchone(self) -> Optional[Row]:
"""
record = self._tx.load_row(self._pos)
if record is not None:
self._pos += 1
- assert self._make_row is not None
- return self._make_row(record)
+ return (
+ self._make_row(record) if self._make_row else cast(Row, record)
+ )
return record
def fetchmany(self, size: int = 0) -> Sequence[Row]:
self._pos, min(self._pos + size, self.pgresult.ntuples)
)
self._pos += len(records)
- assert self._make_row is not None
- return [self._make_row(r) for r in records]
+ if self._make_row:
+ return list(map(self._make_row, records))
+ return cast(Sequence[Row], records)
def fetchall(self) -> Sequence[Row]:
"""
assert self.pgresult
records = self._tx.load_rows(self._pos, self.pgresult.ntuples)
self._pos += self.pgresult.ntuples
- assert self._make_row is not None
- return [self._make_row(r) for r in records]
+ if self._make_row:
+ return list(map(self._make_row, records))
+ return cast(Sequence[Row], records)
def __iter__(self) -> Iterator[Row]:
self._check_result()
if row is None:
break
self._pos += 1
- assert self._make_row is not None
- yield self._make_row(row)
+ yield self._make_row(row) if self._make_row else cast(Row, row)
@contextmanager
def copy(self, statement: Query) -> Iterator[Copy]:
while await self._conn.wait(self._stream_fetchone_gen()):
rec = self._tx.load_row(0)
assert rec is not None
- assert self._make_row is not None
- yield self._make_row(rec)
+ yield self._make_row(rec) if self._make_row else cast(Row, rec)
async def fetchone(self) -> Optional[Row]:
self._check_result()
rv = self._tx.load_row(self._pos)
if rv is not None:
self._pos += 1
- assert self._make_row is not None
- return self._make_row(rv)
+ return self._make_row(rv) if self._make_row else cast(Row, rv)
return rv
async def fetchmany(self, size: int = 0) -> List[Row]:
self._pos, min(self._pos + size, self.pgresult.ntuples)
)
self._pos += len(records)
- assert self._make_row is not None
- return [self._make_row(r) for r in records]
+ if self._make_row:
+ return list(map(self._make_row, records))
+ return cast(List[Row], records)
async def fetchall(self) -> List[Row]:
self._check_result()
assert self.pgresult
records = self._tx.load_rows(self._pos, self.pgresult.ntuples)
self._pos += self.pgresult.ntuples
- assert self._make_row is not None
- return [self._make_row(r) for r in records]
+ if self._make_row:
+ return list(map(self._make_row, records))
+ return cast(List[Row], records)
async def __aiter__(self) -> AsyncIterator[Row]:
self._check_result()
if row is None:
break
self._pos += 1
- assert self._make_row is not None
- yield self._make_row(row)
+ yield self._make_row(row) if self._make_row else cast(Row, row)
@asynccontextmanager
async def copy(self, statement: Query) -> AsyncIterator[AsyncCopy]: