connection: ConnectionType,
format: pq.Format = pq.Format.TEXT,
):
- self.connection = connection
+ self._conn = connection
self.format = format
self.dumpers: DumpersMap = {}
self.loaders: LoadersMap = {}
self._iresult = 0
self._rowcount = -1
+ @property
+ def connection(self) -> ConnectionType:
+ """The connection this cursor is using."""
+ return self._conn
+
@property
def closed(self) -> bool:
"""`True` if the cursor is closed."""
res = self.pgresult
if not res or res.status != self.ExecStatus.TUPLES_OK:
return None
- encoding = self.connection.client_encoding
+ encoding = self._conn.client_encoding
return [Column(res, i, encoding) for i in range(res.nfields)]
@property
if self.closed:
raise e.InterfaceError("the cursor is closed")
- if self.connection.closed:
+ if self._conn.closed:
raise e.InterfaceError("the connection is closed")
- if self.connection.pgconn.status != self.connection.ConnStatus.OK:
+ if self._conn.pgconn.status != pq.ConnStatus.OK:
raise e.InterfaceError(
f"cannot execute operations: the connection is"
- f" in status {self.connection.pgconn.status}"
+ f" in status {self._conn.pgconn.status}"
)
self._reset()
pgq.convert(query, vars)
if pgq.params or no_pqexec or self.format == pq.Format.BINARY:
- self.connection.pgconn.send_query_params(
+ self._conn.pgconn.send_query_params(
pgq.query,
pgq.params,
param_formats=pgq.formats,
else:
# if we don't have to, let's use exec_ as it can run more than
# one query in one go
- self.connection.pgconn.send_query(pgq.query)
+ self._conn.pgconn.send_query(pgq.query)
def _execute_results(self, results: Sequence["PGresult"]) -> None:
"""
if results[-1].status == S.FATAL_ERROR:
raise e.error_from_result(
- results[-1], encoding=self.connection.client_encoding
+ results[-1], encoding=self._conn.client_encoding
)
elif badstats & {S.COPY_IN, S.COPY_OUT, S.COPY_BOTH}:
pgq = PostgresQuery(self._transformer)
pgq.convert(query, vars)
- self.connection.pgconn.send_prepare(
- name, pgq.query, param_types=pgq.types
- )
+ self._conn.pgconn.send_prepare(name, pgq.query, param_types=pgq.types)
return pgq
def _send_query_prepared(self, name: bytes, pgq: PostgresQuery) -> None:
- self.connection.pgconn.send_query_prepared(
+ self._conn.pgconn.send_query_prepared(
name,
pgq.params,
param_formats=pgq.formats,
return
elif status == pq.ExecStatus.FATAL_ERROR:
raise e.error_from_result(
- result, encoding=self.connection.client_encoding
+ result, encoding=self._conn.client_encoding
)
else:
raise e.ProgrammingError(
"""
Execute a query or command to the database.
"""
- with self.connection.lock:
+ with self._conn.lock:
self._start_query()
- self.connection._start_query()
+ self._conn._start_query()
self._execute_send(query, vars)
- gen = execute(self.connection.pgconn)
- results = self.connection.wait(gen)
+ gen = execute(self._conn.pgconn)
+ results = self._conn.wait(gen)
self._execute_results(results)
return self
"""
Execute the same command with a sequence of input data.
"""
- with self.connection.lock:
+ with self._conn.lock:
self._start_query()
- self.connection._start_query()
+ self._conn._start_query()
first = True
for vars in vars_seq:
if first:
pgq = self._send_prepare(b"", query, vars)
- gen = execute(self.connection.pgconn)
- (result,) = self.connection.wait(gen)
+ gen = execute(self._conn.pgconn)
+ (result,) = self._conn.wait(gen)
if result.status == self.ExecStatus.FATAL_ERROR:
raise e.error_from_result(
- result, encoding=self.connection.client_encoding
+ result, encoding=self._conn.client_encoding
)
else:
pgq.dump(vars)
self._send_query_prepared(b"", pgq)
- gen = execute(self.connection.pgconn)
- (result,) = self.connection.wait(gen)
+ gen = execute(self._conn.pgconn)
+ (result,) = self._conn.wait(gen)
self._execute_results((result,))
return self
def _start_copy(
self, statement: Query, vars: Optional[Params] = None
) -> Copy:
- with self.connection.lock:
+ with self._conn.lock:
self._start_query()
- self.connection._start_query()
+ self._conn._start_query()
# Make sure to avoid PQexec to avoid receiving a mix of COPY and
# other operations.
self._execute_send(statement, vars, no_pqexec=True)
- gen = execute(self.connection.pgconn)
- results = self.connection.wait(gen)
+ gen = execute(self._conn.pgconn)
+ results = self._conn.wait(gen)
self._check_copy_results(results)
self.pgresult = results[0] # will set it on the transformer too
async def execute(
self, query: Query, vars: Optional[Params] = None
) -> "AsyncCursor":
- async with self.connection.lock:
+ async with self._conn.lock:
self._start_query()
- await self.connection._start_query()
+ await self._conn._start_query()
self._execute_send(query, vars)
- gen = execute(self.connection.pgconn)
- results = await self.connection.wait(gen)
+ gen = execute(self._conn.pgconn)
+ results = await self._conn.wait(gen)
self._execute_results(results)
return self
async def executemany(
self, query: Query, vars_seq: Sequence[Params]
) -> "AsyncCursor":
- async with self.connection.lock:
+ async with self._conn.lock:
self._start_query()
- await self.connection._start_query()
+ await self._conn._start_query()
first = True
for vars in vars_seq:
if first:
pgq = self._send_prepare(b"", query, vars)
- gen = execute(self.connection.pgconn)
- (result,) = await self.connection.wait(gen)
+ gen = execute(self._conn.pgconn)
+ (result,) = await self._conn.wait(gen)
if result.status == self.ExecStatus.FATAL_ERROR:
raise e.error_from_result(
- result, encoding=self.connection.client_encoding
+ result, encoding=self._conn.client_encoding
)
else:
pgq.dump(vars)
self._send_query_prepared(b"", pgq)
- gen = execute(self.connection.pgconn)
- (result,) = await self.connection.wait(gen)
+ gen = execute(self._conn.pgconn)
+ (result,) = await self._conn.wait(gen)
self._execute_results((result,))
return self
async def _start_copy(
self, statement: Query, vars: Optional[Params] = None
) -> AsyncCopy:
- async with self.connection.lock:
+ async with self._conn.lock:
self._start_query()
- await self.connection._start_query()
+ await self._conn._start_query()
# Make sure to avoid PQexec to avoid receiving a mix of COPY and
# other operations.
self._execute_send(statement, vars, no_pqexec=True)
- gen = execute(self.connection.pgconn)
- results = await self.connection.wait(gen)
+ gen = execute(self._conn.pgconn)
+ results = await self._conn.wait(gen)
self._check_copy_results(results)
self.pgresult = results[0] # will set it on the transformer too