]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Add libpq pipeline mode support to pgbench
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Mon, 15 Mar 2021 21:33:03 +0000 (18:33 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Mon, 15 Mar 2021 21:33:03 +0000 (18:33 -0300)
New metacommands \startpipeline and \endpipeline allow the user to run
queries in libpq pipeline mode.

Author: Daniel Vérité <daniel@manitou-mail.org>
Reviewed-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/b4e34135-2bd9-4b8a-94ca-27d760da26d7@manitou-mail.org

doc/src/sgml/ref/pgbench.sgml
src/bin/pgbench/pgbench.c
src/bin/pgbench/t/001_pgbench_with_server.pl

index 299d93b24198ec3b6440ef76b230195b6444f28a..50cf22ba6baddf4c9efd08c95f139db0283382c5 100644 (file)
@@ -1110,6 +1110,12 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
       row, the last value is kept.
      </para>
 
+     <para>
+      <literal>\gset</literal> and <literal>\aset</literal> cannot be used in
+      pipeline mode, since the query results are not yet available by the time
+      the commands would need them.
+     </para>
+
      <para>
       The following example puts the final account balance from the first query
       into variable <replaceable>abalance</replaceable>, and fills variables
@@ -1270,6 +1276,22 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
 </programlisting></para>
     </listitem>
    </varlistentry>
+
+   <varlistentry id='pgbench-metacommand-pipeline'>
+    <term><literal>\startpipeline</literal></term>
+    <term><literal>\endpipeline</literal></term>
+
+    <listitem>
+      <para>
+        These commands delimit the start and end of a pipeline of SQL
+        statements.  In pipeline mode, statements are sent to the server
+        without waiting for the results of previous statements.  See
+        <xref linkend="libpq-pipeline-mode"/> for more details.
+        Pipeline mode requires the use of extended query protocol.
+     </para>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
  </refsect2>
 
index f6a214669c1a312227b33891ea0ccd1742dba735..e69d43b26ba91d95fe332de74725e16577cc46b4 100644 (file)
@@ -395,10 +395,11 @@ typedef enum
         *
         * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
         * command, the command is sent to the server, and we move to
-        * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is set,
-        * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
-        * meta-commands are executed immediately.  If the command about to start
-        * is actually beyond the end of the script, advance to CSTATE_END_TX.
+        * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
+        * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
+        * wait for it to expire. Other meta-commands are executed immediately. If
+        * the command about to start is actually beyond the end of the script,
+        * advance to CSTATE_END_TX.
         *
         * CSTATE_WAIT_RESULT waits until we get a result set back from the server
         * for the current command.
@@ -530,7 +531,9 @@ typedef enum MetaCommand
        META_IF,                                        /* \if */
        META_ELIF,                                      /* \elif */
        META_ELSE,                                      /* \else */
-       META_ENDIF                                      /* \endif */
+       META_ENDIF,                                     /* \endif */
+       META_STARTPIPELINE,                     /* \startpipeline */
+       META_ENDPIPELINE                        /* \endpipeline */
 } MetaCommand;
 
 typedef enum QueryMode
@@ -2568,6 +2571,10 @@ getMetaCommand(const char *cmd)
                mc = META_GSET;
        else if (pg_strcasecmp(cmd, "aset") == 0)
                mc = META_ASET;
+       else if (pg_strcasecmp(cmd, "startpipeline") == 0)
+               mc = META_STARTPIPELINE;
+       else if (pg_strcasecmp(cmd, "endpipeline") == 0)
+               mc = META_ENDPIPELINE;
        else
                mc = META_NONE;
        return mc;
@@ -2757,11 +2764,25 @@ sendCommand(CState *st, Command *command)
                                if (commands[j]->type != SQL_COMMAND)
                                        continue;
                                preparedStatementName(name, st->use_file, j);
-                               res = PQprepare(st->con, name,
-                                                               commands[j]->argv[0], commands[j]->argc - 1, NULL);
-                               if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                                       pg_log_error("%s", PQerrorMessage(st->con));
-                               PQclear(res);
+                               if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
+                               {
+                                       res = PQprepare(st->con, name,
+                                                                       commands[j]->argv[0], commands[j]->argc - 1, NULL);
+                                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                                               pg_log_error("%s", PQerrorMessage(st->con));
+                                       PQclear(res);
+                               }
+                               else
+                               {
+                                       /*
+                                        * In pipeline mode, we use asynchronous functions. If a
+                                        * server-side error occurs, it will be processed later
+                                        * among the other results.
+                                        */
+                                       if (!PQsendPrepare(st->con, name,
+                                                                          commands[j]->argv[0], commands[j]->argc - 1, NULL))
+                                               pg_log_error("%s", PQerrorMessage(st->con));
+                               }
                        }
                        st->prepared[st->use_file] = true;
                }
@@ -2802,10 +2823,11 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
        int                     qrynum = 0;
 
        /*
-        * varprefix should be set only with \gset or \aset, and SQL commands do
-        * not need it.
+        * varprefix should be set only with \gset or \aset, and \endpipeline and
+        * SQL commands do not need it.
         */
        Assert((meta == META_NONE && varprefix == NULL) ||
+                  ((meta == META_ENDPIPELINE) && varprefix == NULL) ||
                   ((meta == META_GSET || meta == META_ASET) && varprefix != NULL));
 
        res = PQgetResult(st->con);
@@ -2874,6 +2896,13 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
                                /* otherwise the result is simply thrown away by PQclear below */
                                break;
 
+                       case PGRES_PIPELINE_SYNC:
+                               pg_log_debug("client %d pipeline ending", st->id);
+                               if (PQexitPipelineMode(st->con) != 1)
+                                       pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
+                                                                PQerrorMessage(st->con));
+                               break;
+
                        default:
                                /* anything else is unexpected */
                                pg_log_error("client %d script %d aborted in command %d query %d: %s",
@@ -3127,13 +3156,36 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
                                /* Execute the command */
                                if (command->type == SQL_COMMAND)
                                {
+                                       /* disallow \aset and \gset in pipeline mode */
+                                       if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+                                       {
+                                               if (command->meta == META_GSET)
+                                               {
+                                                       commandFailed(st, "gset", "\\gset is not allowed in pipeline mode");
+                                                       st->state = CSTATE_ABORTED;
+                                                       break;
+                                               }
+                                               else if (command->meta == META_ASET)
+                                               {
+                                                       commandFailed(st, "aset", "\\aset is not allowed in pipeline mode");
+                                                       st->state = CSTATE_ABORTED;
+                                                       break;
+                                               }
+                                       }
+
                                        if (!sendCommand(st, command))
                                        {
                                                commandFailed(st, "SQL", "SQL command send failed");
                                                st->state = CSTATE_ABORTED;
                                        }
                                        else
-                                               st->state = CSTATE_WAIT_RESULT;
+                                       {
+                                               /* Wait for results, unless in pipeline mode */
+                                               if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
+                                                       st->state = CSTATE_WAIT_RESULT;
+                                               else
+                                                       st->state = CSTATE_END_COMMAND;
+                                       }
                                }
                                else if (command->type == META_COMMAND)
                                {
@@ -3273,7 +3325,15 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
                                if (readCommandResponse(st,
                                                                                sql_script[st->use_file].commands[st->command]->meta,
                                                                                sql_script[st->use_file].commands[st->command]->varprefix))
-                                       st->state = CSTATE_END_COMMAND;
+                               {
+                                       /*
+                                        * outside of pipeline mode: stop reading results.
+                                        * pipeline mode: continue reading results until an
+                                        * end-of-pipeline response.
+                                        */
+                                       if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+                                               st->state = CSTATE_END_COMMAND;
+                               }
                                else
                                        st->state = CSTATE_ABORTED;
                                break;
@@ -3516,6 +3576,45 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
                        return CSTATE_ABORTED;
                }
        }
+       else if (command->meta == META_STARTPIPELINE)
+       {
+               /*
+                * In pipeline mode, we use a workflow based on libpq pipeline
+                * functions.
+                */
+               if (querymode == QUERY_SIMPLE)
+               {
+                       commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol");
+                       return CSTATE_ABORTED;
+               }
+
+               if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+               {
+                       commandFailed(st, "startpipeline", "already in pipeline mode");
+                       return CSTATE_ABORTED;
+               }
+               if (PQenterPipelineMode(st->con) == 0)
+               {
+                       commandFailed(st, "startpipeline", "failed to enter pipeline mode");
+                       return CSTATE_ABORTED;
+               }
+       }
+       else if (command->meta == META_ENDPIPELINE)
+       {
+               if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+               {
+                       commandFailed(st, "endpipeline", "not in pipeline mode");
+                       return CSTATE_ABORTED;
+               }
+               if (!PQpipelineSync(st->con))
+               {
+                       commandFailed(st, "endpipeline", "failed to send a pipeline sync");
+                       return CSTATE_ABORTED;
+               }
+               /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
+               /* collect pending results before getting out of pipeline mode */
+               return CSTATE_WAIT_RESULT;
+       }
 
        /*
         * executing the expression or shell command might have taken a
@@ -4725,7 +4824,9 @@ process_backslash_command(PsqlScanState sstate, const char *source)
                        syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
                                                 "missing command", NULL, -1);
        }
-       else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF)
+       else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
+                        my_command->meta == META_STARTPIPELINE ||
+                        my_command->meta == META_ENDPIPELINE)
        {
                if (my_command->argc != 1)
                        syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
index daffc18e521945cd6a953d6272edcae5b6073f52..d11c4e8c242e0e0772c28d78d24ac5e70b23538b 100644 (file)
@@ -41,7 +41,7 @@ sub pgbench
                        # filenames are expected to be unique on a test
                        if (-e $filename)
                        {
-                               ok(0, "$filename must not already exists");
+                               ok(0, "$filename must not already exist");
                                unlink $filename or die "cannot unlink $filename: $!";
                        }
                        append_to_file($filename, $$files{$fn});
@@ -755,6 +755,83 @@ pgbench(
 }
        });
 
+# Working \startpipeline
+pgbench(
+       '-t 1 -n -M extended',
+       0,
+       [ qr{type: .*/001_pgbench_pipeline}, qr{actually processed: 1/1} ],
+       [],
+       'working \startpipeline',
+       {
+               '001_pgbench_pipeline' => q{
+-- test startpipeline
+\startpipeline
+} . "select 1;\n" x 10 . q{
+\endpipeline
+}
+       });
+
+# Working \startpipeline in prepared query mode
+pgbench(
+       '-t 1 -n -M prepared',
+       0,
+       [ qr{type: .*/001_pgbench_pipeline_prep}, qr{actually processed: 1/1} ],
+       [],
+       'working \startpipeline',
+       {
+               '001_pgbench_pipeline_prep' => q{
+-- test startpipeline
+\startpipeline
+} . "select 1;\n" x 10 . q{
+\endpipeline
+}
+       });
+
+# Try \startpipeline twice
+pgbench(
+       '-t 1 -n -M extended',
+       2,
+       [],
+       [qr{already in pipeline mode}],
+       'error: call \startpipeline twice',
+       {
+               '001_pgbench_pipeline_2' => q{
+-- startpipeline twice
+\startpipeline
+\startpipeline
+}
+       });
+
+# Try to end a pipeline that hasn't started
+pgbench(
+       '-t 1 -n -M extended',
+       2,
+       [],
+       [qr{not in pipeline mode}],
+       'error: \endpipeline with no start',
+       {
+               '001_pgbench_pipeline_3' => q{
+-- pipeline not started
+\endpipeline
+}
+       });
+
+# Try \gset in pipeline mode
+pgbench(
+       '-t 1 -n -M extended',
+       2,
+       [],
+       [qr{gset is not allowed in pipeline mode}],
+       'error: \gset not allowed in pipeline mode',
+       {
+               '001_pgbench_pipeline_4' => q{
+\startpipeline
+select 1 \gset f
+\endpipeline
+}
+       });
+
+
 # trigger many expression errors
 my @errors = (