See :ref:`query-parameters` for all the details about executing
queries.
- .. automethod:: copy(statement: Query, vars: Optional[Args]=None) -> Copy
+ .. automethod:: copy(statement: Query) -> Copy
:param statement: The copy operation to execute
:type statement: `!str`, `!bytes`, or `sql.Composable`
- :param args: The parameters to pass to the query, if any
- :type args: Mapping, Sequence
.. note:: it must be called as ``with cur.copy() as copy: ...``
.. automethod:: execute(query: Query, vars: Optional[Args]=None) -> AsyncCursor
.. automethod:: executemany(query: Query, vars_seq: Sequence[Args]) -> AsyncCursor
- .. automethod:: copy(statement: Query, vars: Optional[Args]=None) -> AsyncCopy
+ .. automethod:: copy(statement: Query) -> AsyncCopy
.. note:: it must be called as ``async with cur.copy() as copy: ...``
yield row
@contextmanager
- def copy(
- self, statement: Query, vars: Optional[Params] = None
- ) -> Iterator[Copy]:
+ def copy(self, statement: Query) -> Iterator[Copy]:
"""
Initiate a :sql:`COPY` operation and return an object to manage it.
"""
- with self._start_copy(statement, vars) as copy:
+ with self._start_copy(statement) as copy:
yield copy
- def _start_copy(
- self, statement: Query, vars: Optional[Params] = None
- ) -> Copy:
+ def _start_copy(self, statement: Query) -> Copy:
with self._conn.lock:
self._start_query()
self._conn._start_query()
# Make sure to avoid PQexec to avoid receiving a mix of COPY and
# other operations.
- self._execute_send(statement, vars, no_pqexec=True)
+ self._execute_send(statement, None, no_pqexec=True)
gen = execute(self._conn.pgconn)
results = self._conn.wait(gen)
self._check_copy_results(results)
yield row
@asynccontextmanager
- async def copy(
- self, statement: Query, vars: Optional[Params] = None
- ) -> AsyncIterator[AsyncCopy]:
- copy = await self._start_copy(statement, vars)
+ async def copy(self, statement: Query) -> AsyncIterator[AsyncCopy]:
+ copy = await self._start_copy(statement)
async with copy:
yield copy
- async def _start_copy(
- self, statement: Query, vars: Optional[Params] = None
- ) -> AsyncCopy:
+ async def _start_copy(self, statement: Query) -> AsyncCopy:
async with self._conn.lock:
self._start_query()
await self._conn._start_query()
# Make sure to avoid PQexec to avoid receiving a mix of COPY and
# other operations.
- self._execute_send(statement, vars, no_pqexec=True)
+ self._execute_send(statement, None, no_pqexec=True)
gen = execute(self._conn.pgconn)
results = await self._conn.wait(gen)
self._check_copy_results(results)