_transformer: "Transformer"
- def __init__(self, conn: "BaseConnection", binary: bool = False):
- self.conn = conn
+ def __init__(self, connection: "BaseConnection", binary: bool = False):
+ self.connection = connection
self.binary = binary
self.dumpers: DumpersMap = {}
self.loaders: LoadersMap = {}
res = self.pgresult
if res is None or res.status != self.ExecStatus.TUPLES_OK:
return None
- return [Column(res, i, self.conn.codec) for i in range(res.nfields)]
+ return [
+ Column(res, i, self.connection.codec) for i in range(res.nfields)
+ ]
@property
def rowcount(self) -> int:
if self.closed:
raise e.OperationalError("the cursor is closed")
- if self.conn.closed:
+ if self.connection.closed:
raise e.OperationalError("the connection is closed")
- if self.conn.status != self.conn.ConnStatus.OK:
+ if self.connection.status != self.connection.ConnStatus.OK:
raise e.InterfaceError(
f"cannot execute operations: the connection is"
- f" in status {self.conn.status}"
+ f" in status {self.connection.status}"
)
self._reset()
self._transformer = Transformer(self)
- codec = self.conn.codec
+ codec = self.connection.codec
if isinstance(query, str):
query = codec.encode(query)[0]
vars = reorder_params(vars, order)
assert isinstance(vars, Sequence)
params, types = self._transformer.dump_sequence(vars, formats)
- self.conn.pgconn.send_query_params(
+ self.connection.pgconn.send_query_params(
query,
params,
param_formats=formats,
# if we don't have to, let's use exec_ as it can run more than
# one query in one go
if self.binary:
- self.conn.pgconn.send_query_params(
+ self.connection.pgconn.send_query_params(
query, (), result_format=pq.Format(self.binary)
)
else:
- self.conn.pgconn.send_query(query)
+ self.connection.pgconn.send_query(query)
- return generators.execute(self.conn.pgconn)
+ return generators.execute(self.connection.pgconn)
def _execute_results(self, results: List[pq.PGresult]) -> None:
"""
class Cursor(BaseCursor):
- conn: "Connection"
+ connection: "Connection"
- def __init__(self, conn: "Connection", binary: bool = False):
- super().__init__(conn, binary)
+ def __init__(self, connection: "Connection", binary: bool = False):
+ super().__init__(connection, binary)
def execute(self, query: Query, vars: Optional[Params] = None) -> "Cursor":
- with self.conn.lock:
+ with self.connection.lock:
gen = self._execute_send(query, vars)
- results = self.conn.wait(gen)
+ results = self.connection.wait(gen)
self._execute_results(results)
return self
def executemany(
self, query: Query, vars_seq: Sequence[Params]
) -> "Cursor":
- with self.conn.lock:
- # TODO: trivial implementation; use prepare
+ with self.connection.lock:
for vars in vars_seq:
gen = self._execute_send(query, vars)
- results = self.conn.wait(gen)
+ results = self.connection.wait(gen)
self._execute_results(results)
return self
class AsyncCursor(BaseCursor):
- conn: "AsyncConnection"
+ connection: "AsyncConnection"
- def __init__(self, conn: "AsyncConnection", binary: bool = False):
- super().__init__(conn, binary)
+ def __init__(self, connection: "AsyncConnection", binary: bool = False):
+ super().__init__(connection, binary)
async def execute(
self, query: Query, vars: Optional[Params] = None
) -> "AsyncCursor":
- async with self.conn.lock:
+ async with self.connection.lock:
gen = self._execute_send(query, vars)
- results = await self.conn.wait(gen)
+ results = await self.connection.wait(gen)
self._execute_results(results)
return self