# WARNING: don't store context, or you'll create a loop with the Cursor
if context:
self._adapters = context.adapters
- self._connection = context.connection
+ self._conn = context.connection
else:
from .adapt import global_adapters
self._adapters = global_adapters
- self._connection = None
+ self._conn = None
# mapping class, fmt -> Dumper instance
self._dumpers_cache: DefaultDict[Format, DumperCache] = defaultdict(
@property
def connection(self) -> Optional["BaseConnection"]:
- return self._connection
+ return self._conn
@property
def adapters(self) -> "AdaptersMap":
if sys.version_info >= (3, 7):
__slots__ = """
_conn format _adapters arraysize _closed _results _pgresult _pos
- _iresult _rowcount _pgq _transformer _last_query
+ _iresult _rowcount _pgq _tx _last_query
__weakref__
""".split()
ExecStatus = pq.ExecStatus
- _transformer: "Transformer"
+ _tx: "Transformer"
def __init__(
self,
@pgresult.setter
def pgresult(self, result: Optional["PGresult"]) -> None:
self._pgresult = result
- if result and self._transformer:
- self._transformer.pgresult = result
+ if result and self._tx:
+ self._tx.pgresult = result
@property
def description(self) -> Optional[List[Column]]:
self._reset()
if not self._last_query or (self._last_query is not query):
self._last_query = None
- self._transformer = adapt.Transformer(self)
+ self._tx = adapt.Transformer(self)
yield from self._conn._start_query()
def _start_copy_gen(self, statement: Query) -> PQGen[None]:
def _convert_query(
self, query: Query, params: Optional[Params] = None
) -> PostgresQuery:
- pgq = PostgresQuery(self._transformer)
+ pgq = PostgresQuery(self._tx)
pgq.convert(query, params)
return pgq
Return `!None` the recordset is finished.
"""
self._check_result()
- record = self._transformer.load_row(self._pos)
+ record = self._tx.load_row(self._pos)
if record is not None:
self._pos += 1
return record
if not size:
size = self.arraysize
- records = self._transformer.load_rows(
+ records = self._tx.load_rows(
self._pos, min(self._pos + size, self.pgresult.ntuples)
)
self._pos += len(records)
"""
self._check_result()
assert self.pgresult
- records = self._transformer.load_rows(self._pos, self.pgresult.ntuples)
+ records = self._tx.load_rows(self._pos, self.pgresult.ntuples)
self._pos += self.pgresult.ntuples
return records
def __iter__(self) -> Iterator[Sequence[Any]]:
self._check_result()
- load = self._transformer.load_row
+ load = self._tx.load_row
while 1:
row = load(self._pos)
async def fetchone(self) -> Optional[Sequence[Any]]:
self._check_result()
- rv = self._transformer.load_row(self._pos)
+ rv = self._tx.load_row(self._pos)
if rv is not None:
self._pos += 1
return rv
if not size:
size = self.arraysize
- records = self._transformer.load_rows(
+ records = self._tx.load_rows(
self._pos, min(self._pos + size, self.pgresult.ntuples)
)
self._pos += len(records)
async def fetchall(self) -> Sequence[Sequence[Any]]:
self._check_result()
assert self.pgresult
- records = self._transformer.load_rows(self._pos, self.pgresult.ntuples)
+ records = self._tx.load_rows(self._pos, self.pgresult.ntuples)
self._pos += self.pgresult.ntuples
return records
async def __aiter__(self) -> AsyncIterator[Sequence[Any]]:
self._check_result()
- load = self._transformer.load_row
+ load = self._tx.load_row
while 1:
row = load(self._pos)