]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
psql: Fix issues with deferred errors in pipelines
authorMichael Paquier <michael@paquier.xyz>
Tue, 2 Jun 2026 23:58:29 +0000 (08:58 +0900)
committerMichael Paquier <michael@paquier.xyz>
Tue, 2 Jun 2026 23:58:29 +0000 (08:58 +0900)
When an error is raised while processing a Sync message in a pipeline,
like a deferred constraint violation, the error was not associated with
the piped command and was not counted in available_results.  This caused
assertion failures in discardAbortedPipelineResults(), keeping an
incorrect state at pipeline exit, because the code assumed that the
number of available and requested results would always be positive,
expecting all the counters to be 0 at the end of a pipeline.

This commit switches discardAbortedPipelineResults() and
ExecQueryAndProcessResults() to take a softer approach when consuming
and draining the results after an error.  If there are still piped syncs
in the pipeline when it ends, we now attempt to consume them before
leaving the pipeline mode.

Alexander has been able to reach two assertion failures through his
testing.  While investigating more this issue, I have bumped into two
more.  Most of these cases are covered by the regression tests added in
this commit, plus some cases with mixes of pipelines, deferred errors
and results fetched.  Some of the tests discussed (like the backend
termination one) could not be included in this commit but have been
tested manually.  Another test scenario discussed involved the injection
of an error state in the backend, that was able to trick libpq
internally and put its queue out of sync.  This scenario is not going to
happen in practice, but if we were to do something about it we would
need to make libpq understand that it needs to fail in some cases but
not block.

Reported-by: Alexander Lakhin <exclusion@gmail.com>
Author: Michael Paquier <michael@paquier.xyz>
Discussion: https://postgr.es/m/19494-97a86d84fee71c47@postgresql.org
Backpatch-through: 18

src/bin/psql/common.c
src/test/regress/expected/psql_pipeline.out
src/test/regress/sql/psql_pipeline.sql

index cd329ade12b5d7467dd1f23c68414593bca12628..7e005db5750ce385398e239df08976696e192753 100644 (file)
@@ -1497,11 +1497,24 @@ discardAbortedPipelineResults(void)
                }
                else if (res == NULL)
                {
-                       /* A query was processed, decrement the counters */
-                       Assert(pset.available_results > 0);
-                       Assert(pset.requested_results > 0);
-                       pset.available_results--;
-                       pset.requested_results--;
+                       /*
+                        * A query was processed, decrement the counters.
+                        *
+                        * It is possible to get here with available_results == 0 when an
+                        * error is generated by the Sync message processing itself. Such
+                        * errors are not counted in available_results because they are
+                        * not associated with a piped command.  In that case, skip the
+                        * counter decrements and continue to find the Sync result.
+                        *
+                        * If the connection has been lost, there will never be any more
+                        * results to read, so bail out.
+                        */
+                       if (!ConnectionUp())
+                               return NULL;
+                       if (pset.available_results > 0)
+                               pset.available_results--;
+                       if (pset.requested_results > 0)
+                               pset.requested_results--;
                }
 
                if (pset.requested_results == 0)
@@ -2042,14 +2055,16 @@ ExecQueryAndProcessResults(const char *query,
 
                if (result_status == PGRES_PIPELINE_SYNC)
                {
-                       Assert(pset.piped_syncs > 0);
-
                        /*
                         * Sync response, decrease the sync and requested_results
-                        * counters.
+                        * counters.  Guard against underflow: an error during Sync
+                        * processing on the server can cause the client-side counter to
+                        * drift.
                         */
-                       pset.piped_syncs--;
-                       pset.requested_results--;
+                       if (pset.piped_syncs > 0)
+                               pset.piped_syncs--;
+                       if (pset.requested_results > 0)
+                               pset.requested_results--;
 
                        /*
                         * After a synchronisation point, reset success state to print
@@ -2071,8 +2086,10 @@ ExecQueryAndProcessResults(const char *query,
                         * In a pipeline with a non-sync response?  Decrease the result
                         * counters.
                         */
-                       pset.available_results--;
-                       pset.requested_results--;
+                       if (pset.available_results > 0)
+                               pset.available_results--;
+                       if (pset.requested_results > 0)
+                               pset.requested_results--;
                }
 
                /*
@@ -2173,14 +2190,37 @@ ExecQueryAndProcessResults(const char *query,
 
        if (end_pipeline)
        {
-               /* after a pipeline is processed, pipeline piped_syncs should be 0 */
-               Assert(pset.piped_syncs == 0);
-               /* all commands have been processed */
-               Assert(pset.piped_commands == 0);
-               /* all results were read */
-               Assert(pset.available_results == 0);
+               /*
+                * Reset available/requested results.  Normally these are already 0,
+                * but an error generated by a Sync processing itself can leave some
+                * of them behind.  Consume them before exiting pipeline mode.
+                */
+               while (pset.piped_syncs > 0)
+               {
+                       PGresult   *remaining;
+
+                       remaining = PQgetResult(pset.db);
+
+                       if (remaining == NULL)
+                       {
+                               if (!ConnectionUp())
+                                       break;
+                               continue;
+                       }
+                       if (PQresultStatus(remaining) == PGRES_PIPELINE_SYNC)
+                               pset.piped_syncs--;
+                       PQclear(remaining);
+               }
+               pset.piped_syncs = 0;
+               pset.piped_commands = 0;
+               pset.available_results = 0;
+               pset.requested_results = 0;
+
+               if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+                       PQexitPipelineMode(pset.db);
        }
        Assert(pset.requested_results == 0);
+
        SetPipelineVariables();
 
        /* may need this to recover from conn loss during COPY */
index a0816fb10b68e802de1e1a007d0656625bed73dc..a931d63cafe76aff72c8fa44d5d1d9af4ebc967c 100644 (file)
@@ -764,5 +764,129 @@ VACUUM psql_pipeline \bind \sendpipeline
         1
 (1 row)
 
+-- Deferred constraint violation at commit time in a pipeline.
+CREATE TABLE psql_pipeline_defer (a INTEGER PRIMARY KEY DEFERRABLE INITIALLY DEFERRED);
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) RETURNING * \bind 1 \sendpipeline
+\endpipeline
+ a 
+---
+ 1
+ 1
+(2 rows)
+
+ERROR:  duplicate key value violates unique constraint "psql_pipeline_defer_pkey"
+DETAIL:  Key (a)=(1) already exists.
+-- Same with \syncpipeline and commands after the failing sync.
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+\syncpipeline
+SELECT $1 \bind 'after_sync_1' \sendpipeline
+\endpipeline
+ERROR:  duplicate key value violates unique constraint "psql_pipeline_defer_pkey"
+DETAIL:  Key (a)=(1) already exists.
+   ?column?   
+--------------
+ after_sync_1
+(1 row)
+
+-- More patterns with more \syncpipeline, more commands and \getresults
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+\syncpipeline
+SELECT $1 \bind 'after_sync_1' \sendpipeline
+\getresults
+ERROR:  duplicate key value violates unique constraint "psql_pipeline_defer_pkey"
+DETAIL:  Key (a)=(1) already exists.
+SELECT $1 \bind 'after_sync_2' \sendpipeline
+\endpipeline
+   ?column?   
+--------------
+ after_sync_1
+(1 row)
+
+   ?column?   
+--------------
+ after_sync_2
+(1 row)
+
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+\syncpipeline
+\getresults
+ERROR:  duplicate key value violates unique constraint "psql_pipeline_defer_pkey"
+DETAIL:  Key (a)=(1) already exists.
+SELECT $1 \bind 'after_sync_1' \sendpipeline
+\getresults
+SELECT $1 \bind 'after_sync_2' \sendpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+SELECT $1 \bind 'after_sync_3' \sendpipeline
+SELECT $1 \bind 'after_sync_4' \sendpipeline
+SELECT $1 \bind 'after_sync_5' \sendpipeline
+\endpipeline
+   ?column?   
+--------------
+ after_sync_1
+(1 row)
+
+   ?column?   
+--------------
+ after_sync_2
+(1 row)
+
+   ?column?   
+--------------
+ after_sync_3
+(1 row)
+
+   ?column?   
+--------------
+ after_sync_4
+(1 row)
+
+   ?column?   
+--------------
+ after_sync_5
+(1 row)
+
+ERROR:  duplicate key value violates unique constraint "psql_pipeline_defer_pkey"
+DETAIL:  Key (a)=(1) already exists.
+-- Deferred error combined with a regular command error after the sync.
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+\syncpipeline
+SELECT $1 \bind \sendpipeline
+SELECT $1 \bind 'after_error' \sendpipeline
+\endpipeline
+ERROR:  duplicate key value violates unique constraint "psql_pipeline_defer_pkey"
+DETAIL:  Key (a)=(1) already exists.
+ERROR:  bind message supplies 0 parameters, but prepared statement "" requires 1
+-- Empty sync segment followed by a deferred error.
+\startpipeline
+\syncpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+\endpipeline
+ERROR:  duplicate key value violates unique constraint "psql_pipeline_defer_pkey"
+DETAIL:  Key (a)=(1) already exists.
+-- Deferred error with \getresults reading results one at a time.
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+SELECT $1 \bind 'partial' \sendpipeline
+\syncpipeline
+\getresults 1
+\getresults 1
+ ?column? 
+----------
+ partial
+(1 row)
+
+\getresults
+ERROR:  duplicate key value violates unique constraint "psql_pipeline_defer_pkey"
+DETAIL:  Key (a)=(1) already exists.
+\endpipeline
+DROP TABLE psql_pipeline_defer;
 -- Clean up
 DROP TABLE psql_pipeline;
index 6788dceee2e9080ed0b454d7e8c9d054c33d55c2..468ef1d090b6d3cb9852ff524518d170e97f9cfc 100644 (file)
@@ -438,5 +438,68 @@ SELECT 1 \bind \sendpipeline
 VACUUM psql_pipeline \bind \sendpipeline
 \endpipeline
 
+-- Deferred constraint violation at commit time in a pipeline.
+CREATE TABLE psql_pipeline_defer (a INTEGER PRIMARY KEY DEFERRABLE INITIALLY DEFERRED);
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) RETURNING * \bind 1 \sendpipeline
+\endpipeline
+
+-- Same with \syncpipeline and commands after the failing sync.
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+\syncpipeline
+SELECT $1 \bind 'after_sync_1' \sendpipeline
+\endpipeline
+
+-- More patterns with more \syncpipeline, more commands and \getresults
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+\syncpipeline
+SELECT $1 \bind 'after_sync_1' \sendpipeline
+\getresults
+SELECT $1 \bind 'after_sync_2' \sendpipeline
+\endpipeline
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+\syncpipeline
+\getresults
+SELECT $1 \bind 'after_sync_1' \sendpipeline
+\getresults
+SELECT $1 \bind 'after_sync_2' \sendpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+SELECT $1 \bind 'after_sync_3' \sendpipeline
+SELECT $1 \bind 'after_sync_4' \sendpipeline
+SELECT $1 \bind 'after_sync_5' \sendpipeline
+\endpipeline
+
+-- Deferred error combined with a regular command error after the sync.
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+\syncpipeline
+SELECT $1 \bind \sendpipeline
+SELECT $1 \bind 'after_error' \sendpipeline
+\endpipeline
+
+-- Empty sync segment followed by a deferred error.
+\startpipeline
+\syncpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+\endpipeline
+
+-- Deferred error with \getresults reading results one at a time.
+\startpipeline
+INSERT INTO psql_pipeline_defer VALUES ($1), ($1) \bind 1 \sendpipeline
+SELECT $1 \bind 'partial' \sendpipeline
+\syncpipeline
+\getresults 1
+\getresults 1
+\getresults
+\endpipeline
+
+DROP TABLE psql_pipeline_defer;
+
 -- Clean up
 DROP TABLE psql_pipeline;