]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Flexible options for CREATE_REPLICATION_SLOT.
authorRobert Haas <rhaas@postgresql.org>
Tue, 5 Oct 2021 16:52:49 +0000 (12:52 -0400)
committerRobert Haas <rhaas@postgresql.org>
Tue, 5 Oct 2021 16:52:49 +0000 (12:52 -0400)
Like BASE_BACKUP, CREATE_REPLICATION_SLOT has historically used a
hard-coded syntax.  To improve future extensibility, adopt a flexible
options syntax here, too.

In the new syntax, instead of three mutually exclusive options
EXPORT_SNAPSHOT, USE_SNAPSHOT, and NOEXPORT_SNAPSHOT, there is now a single
SNAPSHOT option with three possible values: 'export', 'use', and 'nothing'.

This commit does not remove support for the old syntax. It just adds
the new one as an additional option, makes pg_receivewal,
pg_recvlogical, and walreceiver processes use it.

Patch by me, reviewed by Fabien Coelho, Sergei Kornilov, and
Fujii Masao.

Discussion: http://postgr.es/m/CA+TgmobAczXDRO_Gr2euo_TxgzaH1JxbNxvFx=HYvBinefNH8Q@mail.gmail.com
Discussion: http://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com

doc/src/sgml/protocol.sgml
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/repl_gram.y
src/backend/replication/walsender.c
src/bin/pg_basebackup/streamutil.c

index 8ed88334442aed2ed06774beeab8b3f7186ea64d..b95cc88599a1850a46999c402129d2b661262565 100644 (file)
@@ -1914,7 +1914,7 @@ The commands accepted in replication mode are:
   </varlistentry>
 
   <varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
-   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
+   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> | <literal>LOGICAL</literal> } [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
      <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
     </term>
     <listitem>
@@ -1954,46 +1954,50 @@ The commands accepted in replication mode are:
         </para>
        </listitem>
       </varlistentry>
+     </variablelist>
+
+     <para>The following options are supported:</para>
 
+     <variablelist>
       <varlistentry>
-       <term><literal>TWO_PHASE</literal></term>
+       <term><literal>TWO_PHASE [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
        <listitem>
         <para>
-         Specify that this logical replication slot supports decoding of two-phase
+         If true, this logical replication slot supports decoding of two-phase
          transactions. With this option, two-phase commands like
          <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
          and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
          The transaction will be decoded and transmitted at
          <literal>PREPARE TRANSACTION</literal> time.
+         The default is false.
         </para>
        </listitem>
       </varlistentry>
 
       <varlistentry>
-       <term><literal>RESERVE_WAL</literal></term>
+       <term><literal>RESERVE_WAL [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
        <listitem>
         <para>
-         Specify that this physical replication slot reserves <acronym>WAL</acronym>
+         If true, this physical replication slot reserves <acronym>WAL</acronym>
          immediately.  Otherwise, <acronym>WAL</acronym> is only reserved upon
          connection from a streaming replication client.
+         The default is false.
         </para>
        </listitem>
       </varlistentry>
 
       <varlistentry>
-       <term><literal>EXPORT_SNAPSHOT</literal></term>
-       <term><literal>NOEXPORT_SNAPSHOT</literal></term>
-       <term><literal>USE_SNAPSHOT</literal></term>
+       <term><literal>SNAPSHOT { 'export' | 'use' | 'nothing' }</literal></term>
        <listitem>
         <para>
          Decides what to do with the snapshot created during logical slot
-         initialization. <literal>EXPORT_SNAPSHOT</literal>, which is the default,
+         initialization. <literal>'export'</literal>, which is the default,
          will export the snapshot for use in other sessions. This option can't
-         be used inside a transaction.  <literal>USE_SNAPSHOT</literal> will use the
+         be used inside a transaction.  <literal>'use'</literal> will use the
          snapshot for the current transaction executing the command. This
          option must be used in a transaction, and
          <literal>CREATE_REPLICATION_SLOT</literal> must be the first command
-         run in that transaction.  Finally, <literal>NOEXPORT_SNAPSHOT</literal> will
+         run in that transaction.  Finally, <literal>'nothing'</literal> will
          just use the snapshot for logical decoding as normal but won't do
          anything else with it.
         </para>
@@ -2052,6 +2056,17 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
+    </term>
+    <listitem>
+     <para>
+      For compatibility with older releases, this alternative syntax for
+      the <literal>CREATE_REPLICATION_SLOT</literal> command is still supported.
+     </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
index 19ea159af4fb1c2d02c28ea3f0f684965957909b..5c6e56a5b241ed6546fde63ec4f127912b2869fa 100644 (file)
@@ -862,6 +862,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
        PGresult   *res;
        StringInfoData cmd;
        char       *snapshot;
+       int                     use_new_options_syntax;
+
+       use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
 
        initStringInfo(&cmd);
 
@@ -872,26 +875,58 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 
        if (conn->logical)
        {
-               appendStringInfoString(&cmd, " LOGICAL pgoutput");
+               appendStringInfoString(&cmd, " LOGICAL pgoutput ");
+               if (use_new_options_syntax)
+                       appendStringInfoChar(&cmd, '(');
                if (two_phase)
-                       appendStringInfoString(&cmd, " TWO_PHASE");
+               {
+                       appendStringInfoString(&cmd, "TWO_PHASE");
+                       if (use_new_options_syntax)
+                               appendStringInfoString(&cmd, ", ");
+                       else
+                               appendStringInfoChar(&cmd, ' ');
+               }
 
-               switch (snapshot_action)
+               if (use_new_options_syntax)
                {
-                       case CRS_EXPORT_SNAPSHOT:
-                               appendStringInfoString(&cmd, " EXPORT_SNAPSHOT");
-                               break;
-                       case CRS_NOEXPORT_SNAPSHOT:
-                               appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT");
-                               break;
-                       case CRS_USE_SNAPSHOT:
-                               appendStringInfoString(&cmd, " USE_SNAPSHOT");
-                               break;
+                       switch (snapshot_action)
+                       {
+                               case CRS_EXPORT_SNAPSHOT:
+                                       appendStringInfoString(&cmd, "SNAPSHOT 'export'");
+                                       break;
+                               case CRS_NOEXPORT_SNAPSHOT:
+                                       appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
+                                       break;
+                               case CRS_USE_SNAPSHOT:
+                                       appendStringInfoString(&cmd, "SNAPSHOT 'use'");
+                                       break;
+                       }
                }
+               else
+               {
+                       switch (snapshot_action)
+                       {
+                               case CRS_EXPORT_SNAPSHOT:
+                                       appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
+                                       break;
+                               case CRS_NOEXPORT_SNAPSHOT:
+                                       appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
+                                       break;
+                               case CRS_USE_SNAPSHOT:
+                                       appendStringInfoString(&cmd, "USE_SNAPSHOT");
+                                       break;
+                       }
+               }
+
+               if (use_new_options_syntax)
+                       appendStringInfoChar(&cmd, ')');
        }
        else
        {
-               appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
+               if (use_new_options_syntax)
+                       appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
+               else
+                       appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
        }
 
        res = libpqrcv_PQexec(conn->streamConn, cmd.data);
index 3b59d62ed86b762fd2a2e0e36f27c7c24bd8d2e6..126380e2df77d3f29e391c0db0d68f9d38bf5bfe 100644 (file)
@@ -103,8 +103,8 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>   plugin_opt_arg
 %type <str>            opt_slot var_name ident_or_keyword
 %type <boolval>        opt_temporary
-%type <list>   create_slot_opt_list
-%type <defelt> create_slot_opt
+%type <list>   create_slot_options create_slot_legacy_opt_list
+%type <defelt> create_slot_legacy_opt
 
 %%
 
@@ -243,8 +243,8 @@ base_backup_legacy_opt:
                        ;
 
 create_replication_slot:
-                       /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
-                       K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
+                       /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL [options] */
+                       K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_options
                                {
                                        CreateReplicationSlotCmd *cmd;
                                        cmd = makeNode(CreateReplicationSlotCmd);
@@ -254,8 +254,8 @@ create_replication_slot:
                                        cmd->options = $5;
                                        $$ = (Node *) cmd;
                                }
-                       /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
-                       | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
+                       /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin [options] */
+                       | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_options
                                {
                                        CreateReplicationSlotCmd *cmd;
                                        cmd = makeNode(CreateReplicationSlotCmd);
@@ -268,28 +268,33 @@ create_replication_slot:
                                }
                        ;
 
-create_slot_opt_list:
-                       create_slot_opt_list create_slot_opt
+create_slot_options:
+                       '(' generic_option_list ')'                     { $$ = $2; }
+                       | create_slot_legacy_opt_list           { $$ = $1; }
+                       ;
+
+create_slot_legacy_opt_list:
+                       create_slot_legacy_opt_list create_slot_legacy_opt
                                { $$ = lappend($1, $2); }
                        | /* EMPTY */
                                { $$ = NIL; }
                        ;
 
-create_slot_opt:
+create_slot_legacy_opt:
                        K_EXPORT_SNAPSHOT
                                {
-                                 $$ = makeDefElem("export_snapshot",
-                                                                  (Node *)makeInteger(true), -1);
+                                 $$ = makeDefElem("snapshot",
+                                                                  (Node *)makeString("export"), -1);
                                }
                        | K_NOEXPORT_SNAPSHOT
                                {
-                                 $$ = makeDefElem("export_snapshot",
-                                                                  (Node *)makeInteger(false), -1);
+                                 $$ = makeDefElem("snapshot",
+                                                                  (Node *)makeString("nothing"), -1);
                                }
                        | K_USE_SNAPSHOT
                                {
-                                 $$ = makeDefElem("use_snapshot",
-                                                                  (Node *)makeInteger(true), -1);
+                                 $$ = makeDefElem("snapshot",
+                                                                  (Node *)makeString("use"), -1);
                                }
                        | K_RESERVE_WAL
                                {
index 3ca2a11389dc26ca881a845ec2286d9744f331e2..b811a5c0ef2b77e89eb6e5ca91bafb7e4ea42c1e 100644 (file)
@@ -872,26 +872,30 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
        {
                DefElem    *defel = (DefElem *) lfirst(lc);
 
-               if (strcmp(defel->defname, "export_snapshot") == 0)
+               if (strcmp(defel->defname, "snapshot") == 0)
                {
+                       char       *action;
+
                        if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
 
+                       action = defGetString(defel);
                        snapshot_action_given = true;
-                       *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
-                               CRS_NOEXPORT_SNAPSHOT;
-               }
-               else if (strcmp(defel->defname, "use_snapshot") == 0)
-               {
-                       if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+
+                       if (strcmp(action, "export") == 0)
+                               *snapshot_action = CRS_EXPORT_SNAPSHOT;
+                       else if (strcmp(action, "nothing") == 0)
+                               *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
+                       else if (strcmp(action, "use") == 0)
+                               *snapshot_action = CRS_USE_SNAPSHOT;
+                       else
                                ereport(ERROR,
-                                               (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
+                                                               defel->defname, action)));
 
-                       snapshot_action_given = true;
-                       *snapshot_action = CRS_USE_SNAPSHOT;
                }
                else if (strcmp(defel->defname, "reserve_wal") == 0)
                {
@@ -901,7 +905,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
                                                 errmsg("conflicting or redundant options")));
 
                        reserve_wal_given = true;
-                       *reserve_wal = true;
+                       *reserve_wal = defGetBoolean(defel);
                }
                else if (strcmp(defel->defname, "two_phase") == 0)
                {
@@ -910,7 +914,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
                        two_phase_given = true;
-                       *two_phase = true;
+                       *two_phase = defGetBoolean(defel);
                }
                else
                        elog(ERROR, "unrecognized option: %s", defel->defname);
@@ -980,7 +984,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
                                ereport(ERROR,
                                /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
                                                (errmsg("%s must not be called inside a transaction",
-                                                               "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
+                                                               "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
 
                        need_full_snapshot = true;
                }
@@ -990,25 +994,25 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
                                ereport(ERROR,
                                /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
                                                (errmsg("%s must be called inside a transaction",
-                                                               "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
+                                                               "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
 
                        if (XactIsoLevel != XACT_REPEATABLE_READ)
                                ereport(ERROR,
                                /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
                                                (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
-                                                               "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
+                                                               "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
 
                        if (FirstSnapshotSet)
                                ereport(ERROR,
                                /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
                                                (errmsg("%s must be called before any query",
-                                                               "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
+                                                               "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
 
                        if (IsSubTransaction())
                                ereport(ERROR,
                                /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
                                                (errmsg("%s must not be called in a subtransaction",
-                                                               "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
+                                                               "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
 
                        need_full_snapshot = true;
                }
index d782b81adc6599abcdba138c2c977d0d8c230d11..37237cd5d95c946cbebc54db240db07025ffaa60 100644 (file)
@@ -490,6 +490,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 {
        PQExpBuffer query;
        PGresult   *res;
+       bool            use_new_option_syntax = (PQserverVersion(conn) >= 150000);
 
        query = createPQExpBuffer();
 
@@ -498,27 +499,54 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
        Assert(!(two_phase && is_physical));
        Assert(slot_name != NULL);
 
-       /* Build query */
+       /* Build base portion of query */
        appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
        if (is_temporary)
                appendPQExpBufferStr(query, " TEMPORARY");
        if (is_physical)
-       {
                appendPQExpBufferStr(query, " PHYSICAL");
+       else
+               appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
+
+       /* Add any requested options */
+       if (use_new_option_syntax)
+               appendPQExpBufferStr(query, " (");
+       if (is_physical)
+       {
                if (reserve_wal)
-                       appendPQExpBufferStr(query, " RESERVE_WAL");
+                       AppendPlainCommandOption(query, use_new_option_syntax,
+                                                                        "RESERVE_WAL");
        }
        else
        {
-               appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
                if (two_phase && PQserverVersion(conn) >= 150000)
-                       appendPQExpBufferStr(query, " TWO_PHASE");
+                       AppendPlainCommandOption(query, use_new_option_syntax,
+                                                                        "TWO_PHASE");
 
                if (PQserverVersion(conn) >= 100000)
+               {
                        /* pg_recvlogical doesn't use an exported snapshot, so suppress */
-                       appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT");
+                       if (use_new_option_syntax)
+                               AppendStringCommandOption(query, use_new_option_syntax,
+                                                                                  "SNAPSHOT", "nothing");
+                       else
+                               AppendPlainCommandOption(query, use_new_option_syntax,
+                                                                                "NOEXPORT_SNAPSHOT");
+               }
+       }
+       if (use_new_option_syntax)
+       {
+               /* Suppress option list if it would be empty, otherwise terminate */
+               if (query->data[query->len - 1] == '(')
+               {
+                       query->len -= 2;
+                       query->data[query->len] = '\0';
+               }
+               else
+                       appendPQExpBufferChar(query, ')');
        }
 
+       /* Now run the query */
        res = PQexec(conn, query->data);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {