--- /dev/null
+.. _adaptation:
+
+Adaptation of data between Python and PostgreSQL
+================================================
+
+TODO
.. autoclass:: Copy
+ The object is normally returned by `Cursor.copy()`. It can be used as a
+ context manager (useful to load data into a database using :sql:`COPY FROM`)
+ and can be iterated (useful to read data after a :sql:`COPY TO`).
+
+ See :ref:`copy` for details.
+
.. automethod:: read
+
+ Alternatively, you can iterate on the `Copy` object to read its data
+ row by row.
+
.. automethod:: write
.. automethod:: write_row
+
+ The data in the tuple will be converted as configured on the cursor;
+ see :ref:`adaptation` for details.
+
.. automethod:: finish
+ If an *error* is specified, the :sql:`COPY` operation is cancelled.
+
+ The method is called automatically at the end of a `!with` block.
+
.. autoclass:: AsyncCopy
+ The object is normally returned by `AsyncCursor.copy()`. Its methods are
+ the same of the `Copy` object but offering an `asyncio` interface
+ (`await`, `async for`, `async with`).
+
.. automethod:: read
.. automethod:: write
.. automethod:: write_row
you should use distinct `execute()` calls; otherwise you may consider merging
the query client-side, using `psycopg3.sql` module.
+Certain commands cannot be used with server-side binding, for instance
+:sql:`SET` or :sql:`NOTIFY`::
+
+ >>> cur.execute("SET timezone TO %s", ["utc"])
+ ...
+ psycopg3.errors.SyntaxError: syntax error at or near "$1"
+
+Sometimes PostgreSQL offers an alternative (e.g. :sql:`SELECT set_config()`,
+:sql:`SELECT pg_notify()`). If no alternative exist you can use `psycopg3.sql`
+to compose the query client-side.
+
Different adaptation system
---------------------------
flexibility, ease of customization.
Builtin data types should work as expected; if you have wrapped a custom data
-type you should check the `<ref> Adaptation` topic.
+type you should check the :ref:`Adaptation` topic.
Other differences
install
usage
- from_pg2
+ adaptation
connection
cursor
+ from_pg2
Indices and tables
- send commands to the database using methods such as `~Cursor.execute()`
and `~Cursor.executemany()`,
- - retrieve data from the database :ref:`by iteration <cursor-iterable>` or
- using methods such as `~Cursor.fetchone()`, `~Cursor.fetchmany()`,
- `~Cursor.fetchall()`.
+ - retrieve data from the database, iterating on the cursor or using methods
+ such as `~Cursor.fetchone()`, `~Cursor.fetchmany()`, `~Cursor.fetchall()`.
+.. index::
+ pair: Query; Parameters
+
+.. _binary-data:
+
+Binary parameters and results
+-----------------------------
+
+TODO: lift from psycopg2 docs
+
+
+
.. _transactions:
Transaction management
Using COPY TO and COPY FROM
---------------------------
-TODO
+`psycopg3` allows to operate with `PostgreSQL COPY protocol`__. :sql:`COPY` is
+one of the most efficient ways to load data into the database (and to modify
+it, with some SQL creativity).
+
+.. __: https://www.postgresql.org/docs/current/sql-copy.html
+
+Using `!psycopg3` you can do three things:
+
+- loading data into the database row-by-row, from a stream of Python objects;
+- loading data into the database block-by-block, with data already formatted in
+ a way suitable for :sql:`COPY FROM`;
+- reading data from the database block-by-block, with data emitted by a
+ :sql:`COPY TO` statement.
+
+The missing quadrant, copying data from database row-by-row, is not covered by
+COPY because that's pretty much normal querying, and :sql:`COPY TO` doesn't
+offer enough metadata to decode the data to Python objects.
+
+The first option is the most powerful, because it allows to load data into the
+database from any Python iterable (a list of tuple, or any iterable of
+sequences): the Python values are adapted as they would be in normal querying.
+To perform such operation use a :sql:`COPY [table] FROM STDIN` with
+`Cursor.copy()` and use `~Copy.write_row()` on the resulting object in a
+`!with` block. On exiting the block the operation will be concluded:
+
+.. code:: python
+
+ with cursor.copy("COPY table_name (col1, col2) FROM STDIN") as copy:
+ for row in source:
+ copy.write_row(row)
+
+If an exception is raised inside the block, the operation is interrupted and
+the records inserted so far discarded.
+
+If data is already formatted in a way suitable for copy (for instance because
+it is coming from a file resulting from a previous `COPY TO` operation) it can
+be loaded using `Copy.write()` instead.
+
+In order to read data in :sql:`COPY` format you can use a :sql:`COPY TO
+STDOUT` statement and iterate over the resulting `Copy` object, which will
+produce `!bytes`:
+
+.. code:: python
+
+ with open("data.out", "wb") as f:
+ for data in cursor.copy("COPY table_name TO STDOUT") as copy:
+ f.write(data)
+
+Asynchronous operations are supported using the same patterns on an
+`AsyncConnection`.
+
+Binary data can be produced and consumed using :sql:`FORMAT BINARY` in the
+:sql:`COPY` command: see :ref:`binary-data` for details and limitations.
.. index:: async
ones: in order to use them you will only have to scatter the ``async`` keyword
here and there.
-
.. code:: python
async with await psycopg3.AsyncConnection.connect(
class Copy(BaseCopy["Connection"]):
+ """Manage a :sql:`COPY` operation."""
+
def read(self) -> Optional[bytes]:
+ """Read a row after a :sql:`COPY TO` operation.
+
+ Return `None` when the data is finished.
+ """
if self._finished:
return None
return rv
def write(self, buffer: Union[str, bytes]) -> None:
+ """Write a block of data after a :sql:`COPY FROM` operation."""
conn = self.connection
conn.wait(copy_to(conn.pgconn, self._ensure_bytes(buffer)))
def write_row(self, row: Sequence[Any]) -> None:
+ """Write a record after a :sql:`COPY FROM` operation."""
data = self.format_row(row)
self.write(data)
def finish(self, error: str = "") -> None:
+ """Terminate a :sql:`COPY FROM` operation."""
conn = self.connection
berr = error.encode(conn.client_encoding, "replace") if error else None
conn.wait(copy_end(conn.pgconn, berr))
if self.pgresult.status == ExecStatus.COPY_OUT:
return
- if exc_val is None:
+ if not exc_type:
if self.format == Format.BINARY and not self._first_row:
# send EOF only if we copied binary rows (_first_row is False)
self.write(b"\xff\xff")
self.finish()
else:
- self.finish(str(exc_val) or type(exc_val).__qualname__)
+ self.finish(
+ f"error from Python: {exc_type.__qualname__} - {exc_val}"
+ )
def __iter__(self) -> Iterator[bytes]:
while 1:
class AsyncCopy(BaseCopy["AsyncConnection"]):
+ """Manage an asynchronous :sql:`COPY` operation."""
+
async def read(self) -> Optional[bytes]:
if self._finished:
return None
if self.pgresult.status == ExecStatus.COPY_OUT:
return
- if exc_val is None:
+ if not exc_type:
if self.format == Format.BINARY and not self._first_row:
# send EOF only if we copied binary rows (_first_row is False)
await self.write(b"\xff\xff")
await self.finish()
else:
- await self.finish(str(exc_val))
+ await self.finish(
+ f"error from Python: {exc_type.__qualname__} - {exc_val}"
+ )
async def __aiter__(self) -> AsyncIterator[bytes]:
while 1: