From: Daniele Varrazzo Date: Sat, 7 May 2022 17:57:50 +0000 (+0200) Subject: docs: add examples and messages details in pipeline mode X-Git-Tag: 3.1~120^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b34d11bf3b6230ea096959e2b2cc93ed03d750df;p=thirdparty%2Fpsycopg.git docs: add examples and messages details in pipeline mode --- diff --git a/docs/advanced/pipeline.rst b/docs/advanced/pipeline.rst index ef9b73e34..37c38a629 100644 --- a/docs/advanced/pipeline.rst +++ b/docs/advanced/pipeline.rst @@ -42,6 +42,123 @@ buffered on the server side; the server flushes that buffer when a .. __: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY +Client-server messages flow +--------------------------- + +During normal querying, each statement consists in a stream of request +messages sent by the client to the server, followed by a stream of response +messages in the opposite direction. + +The client sends commands to parse a query, bind it to parameters, execute it, +and terminates with a ``Sync`` to communicate the server that it should +process the commands sent so far. The server will process the query and send +back the results, terminating the stream of message with a ``ReadyForQuery`` +message, telling the client that it can now send a new query. The messages are +described in the `PostgreSQL message flow documentation`__. + +For example the statement: + +.. code:: python + + conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["hello"]) + +results in a complete back-and-forth of the messages: + +.. table:: + :align: left + + +---------------+-----------------------------------------------------------+ + | Direction | Message | + +===============+===========================================================+ + | Python | - Parse ``INSERT INTO ... (VALUE $1)`` (skipped if | + | | :ref:`the statement is prepared `) | + | |>| | - Bind ``'hello'`` | + | | - Describe | + | PostgreSQL | - Execute | + | | - Sync | + +---------------+-----------------------------------------------------------+ + | PostgreSQL | - ParseComplete | + | | - BindComplete | + | |<| | - NoData | + | | - CommandComplete ``INSERT 0 1`` | + | Python | - ReadyForQuery | + +---------------+-----------------------------------------------------------+ + +and the query: + +.. code:: python + + conn.execute("SELECT data FROM mytable WHERE id = %s", [1]) + +results in the exchange: + +.. table:: + :align: left + + +---------------+-----------------------------------------------------------+ + | Direction | Message | + +===============+===========================================================+ + | Python | - Parse ``SELECT data FROM mytable WHERE id = $1`` | + | | - Bind ``1`` | + | |>| | - Describe | + | | - Execute | + | PostgreSQL | - Sync | + +---------------+-----------------------------------------------------------+ + | PostgreSQL | - ParseComplete | + | | - BindComplete | + | |<| | - RowDescription ``data`` | + | | - DataRow ``hello`` | + | Python | - CommandComplete ``SELECT 1`` | + | | - ReadyForQuery | + +---------------+-----------------------------------------------------------+ + +The pipeline mode allows to combine several operation in a longer stream of +messages sent to the server, then to receive more than one response in a +single batch. If we combine the two operations above in a pipeline. +If we execute the two operations above in a pipeline: + +.. code:: python + + with conn.pipeline(): + conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["hello"]) + conn.execute("SELECT data FROM mytable WHERE id = %s", [1]) + +they will result in a single roundtrip between the client and the server: + +.. table:: + :align: left + + +---------------+-----------------------------------------------------------+ + | Direction | Message | + +===============+===========================================================+ + | Python | - Parse ``INSERT INTO ... (VALUE $1)`` | + | | - Bind ``'hello'`` | + | |>| | - Describe | + | | - Execute | + | PostgreSQL | - Parse ``SELECT data FROM mytable WHERE id = $1`` | + | | - Bind ``1`` | + | | - Describe | + | | - Execute | + | | - Sync | + +---------------+-----------------------------------------------------------+ + | PostgreSQL | - ParseComplete | + | | - BindComplete | + | |<| | - NoData | + | | - CommandComplete ``INSERT 0 1`` | + | Python | - ParseComplete | + | | - BindComplete | + | | - RowDescription ``data`` | + | | - DataRow ``hello`` | + | | - CommandComplete ``SELECT 1`` | + | | - ReadyForQuery | + +---------------+-----------------------------------------------------------+ + +.. |<| unicode:: U+25C0 +.. |>| unicode:: U+25B6 + +.. __: https://www.postgresql.org/docs/current/protocol-flow.html + + .. _pipeline-usage: Pipeline mode usage @@ -57,10 +174,34 @@ normal mode, Psycopg will not wait for the server to receive the result of each query, which will be received in batches when the server flushes it output buffer. +.. code:: python + + >>> with conn.pipeline(): + ... cnn.execute("INSERT INTO mytable VALUES (%s)", ["hello"]) + ... with cnn.cursor() as cur: + ... cur.execute("INSERT INTO othertable VALUES (%s)", ["world"]) + ... cur.executemany( + ... "INSERT INTO elsewhere VALUES (%s)", + ... [("one",), ("two",), ("four",)]) + When a flush (or a sync) is performed, all pending results are sent back to the cursors which executed them. If a cursor had run more than one query, it will receive more than one result; results after the first will be available, -in their execution order, using `~Cursor.nextset()`. +in their execution order, using `~Cursor.nextset()`: + +.. code:: python + + >>> with conn.pipeline(): + ... with conn.cursor() as cur: + ... cur.execute("INSERT INTO mytable (data) VALUES (%s) RETURNING *", ["hello"]) + ... cur.execute("INSERT INTO mytable (data) VALUES (%s) RETURNING *", ["world"]) + ... while True: + ... print(cur.fetchall()) + ... if not cur.nextset(): + ... break + + [(1, 'hello')] + [(2, 'world')] If any statement encounters an error, the server aborts the current transaction and does not execute any subsequent command in the queue until the @@ -68,12 +209,6 @@ next :ref:`synchronization point `; a `~errors.PipelineAborted` exception is raised for each such command. Query processing resumes after the synchronization point. -Note that, even in :ref:`autocommit `, the server wraps the -statements sent in pipeline mode in an implicit transaction, which will be -only committed when the sync is received. As such, a failure in a group of -statement will probably invalidate the effect of statements executed after the -previous sync. - .. warning:: Certain features are not available in pipeline mode, including: @@ -110,6 +245,43 @@ output buffer is full. In contrast with a synchronization point, a flush request (i.e. calling a fetch method) will not reset the pipeline error state. +Note that, even in :ref:`autocommit `, the server wraps the +statements sent in pipeline mode in an implicit transaction, which will be +only committed when the sync is received. As such, a failure in a group of +statements will probably invalidate the effect of statements executed after +the previous Sync, and will propagate to the following Sync. + +For example, the following block: + +.. code:: python + + >>> with psycopg.connect("", autocommit=True) as conn: + ... with conn.pipeline() as p, conn.cursor() as cur: + ... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["one"]) + ... cur.execute("INSERT INTO no_such_table (data) VALUES (%s)", ["two"]) + ... conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["three"]) + ... p.sync() + ... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["four"]) + +fails with the error ``relation "no_such_table" does not exist``; and, at the +end of the block, the table will contain: + +.. code:: text + + =# SELECT * FROM mytable; + ┌────┬──────┐ + │ id │ data │ + ├────┼──────┤ + │ 2 │ four │ + └────┴──────┘ + (1 row) + +because the value 1 of the sequence is consumed by the statement ``one``, but +the record discarded because of the error in the same implicit transaction; +the statement ``three`` is not executed (so it doesn't consume a sequence +item), the statement ``four`` is executed with success after the Sync has +terminated the failed transaction. + The fine prints ---------------