]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Refactor function parse_subscription_options.
authorAmit Kapila <akapila@postgresql.org>
Tue, 6 Jul 2021 02:16:50 +0000 (07:46 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 6 Jul 2021 02:16:50 +0000 (07:46 +0530)
Instead of using multiple parameters in parse_subscription_options
function signature, use the struct SubOpts that encapsulate all the
subscription options and their values. It will be useful for future work
where we need to add other options in the subscription. Also, use bitmaps
to pass the supported and retrieve the specified options much like the way
it is done in the commit a3dc926009.

Author: Bharath Rupireddy
Reviewed-By: Peter Smith, Amit Kapila, Alvaro Herrera
Discussion: https://postgr.es/m/CALj2ACXtoQczfNsDQWobypVvHbX2DtgEHn8DawS0eGFwuo72kw@mail.gmail.com

src/backend/commands/subscriptioncmds.c
src/tools/pgindent/typedefs.list

index b862e59f1da8a42aa73d5a16ef27dd403b1dba97..eb88d877a5032dabd6f90a6ea5f04147650a2eac 100644 (file)
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
+/*
+ * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
+ * command.
+ */
+#define SUBOPT_CONNECT                         0x00000001
+#define SUBOPT_ENABLED                         0x00000002
+#define SUBOPT_CREATE_SLOT                     0x00000004
+#define SUBOPT_SLOT_NAME                       0x00000008
+#define SUBOPT_COPY_DATA                       0x00000010
+#define SUBOPT_SYNCHRONOUS_COMMIT      0x00000020
+#define SUBOPT_REFRESH                         0x00000040
+#define SUBOPT_BINARY                          0x00000080
+#define SUBOPT_STREAMING                       0x00000100
+
+/* check if the 'val' has 'bits' set */
+#define IsSet(val, bits)  (((val) & (bits)) == (bits))
+
+/*
+ * Structure to hold a bitmap representing the user-provided CREATE/ALTER
+ * SUBSCRIPTION command options and the parsed/default values of each of them.
+ */
+typedef struct SubOpts
+{
+       bits32          specified_opts;
+       char       *slot_name;
+       char       *synchronous_commit;
+       bool            connect;
+       bool            enabled;
+       bool            create_slot;
+       bool            copy_data;
+       bool            refresh;
+       bool            binary;
+       bool            streaming;
+} SubOpts;
+
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
@@ -56,164 +91,151 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname,
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
  *
  * Since not all options can be specified in both commands, this function
- * will report an error on options if the target output pointer is NULL to
- * accommodate that.
+ * will report an error if mutually exclusive options are specified.
+ *
+ * Caller is expected to have cleared 'opts'.
  */
 static void
-parse_subscription_options(List *options,
-                                                  bool *connect,
-                                                  bool *enabled_given, bool *enabled,
-                                                  bool *create_slot,
-                                                  bool *slot_name_given, char **slot_name,
-                                                  bool *copy_data,
-                                                  char **synchronous_commit,
-                                                  bool *refresh,
-                                                  bool *binary_given, bool *binary,
-                                                  bool *streaming_given, bool *streaming)
+parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *opts)
 {
        ListCell   *lc;
-       bool            connect_given = false;
-       bool            create_slot_given = false;
-       bool            copy_data_given = false;
-       bool            refresh_given = false;
-
-       /* If connect is specified, the others also need to be. */
-       Assert(!connect || (enabled && create_slot && copy_data));
 
-       if (connect)
-               *connect = true;
-       if (enabled)
-       {
-               *enabled_given = false;
-               *enabled = true;
-       }
-       if (create_slot)
-               *create_slot = true;
-       if (slot_name)
-       {
-               *slot_name_given = false;
-               *slot_name = NULL;
-       }
-       if (copy_data)
-               *copy_data = true;
-       if (synchronous_commit)
-               *synchronous_commit = NULL;
-       if (refresh)
-               *refresh = true;
-       if (binary)
-       {
-               *binary_given = false;
-               *binary = false;
-       }
-       if (streaming)
-       {
-               *streaming_given = false;
-               *streaming = false;
-       }
+       /* caller must expect some option */
+       Assert(supported_opts != 0);
+
+       /* If connect option is supported, these others also need to be. */
+       Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
+                  IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
+                                SUBOPT_COPY_DATA));
+
+       /* Set default values for the boolean supported options. */
+       if (IsSet(supported_opts, SUBOPT_CONNECT))
+               opts->connect = true;
+       if (IsSet(supported_opts, SUBOPT_ENABLED))
+               opts->enabled = true;
+       if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
+               opts->create_slot = true;
+       if (IsSet(supported_opts, SUBOPT_COPY_DATA))
+               opts->copy_data = true;
+       if (IsSet(supported_opts, SUBOPT_REFRESH))
+               opts->refresh = true;
+       if (IsSet(supported_opts, SUBOPT_BINARY))
+               opts->binary = false;
+       if (IsSet(supported_opts, SUBOPT_STREAMING))
+               opts->streaming = false;
 
        /* Parse options */
-       foreach(lc, options)
+       foreach(lc, stmt_options)
        {
                DefElem    *defel = (DefElem *) lfirst(lc);
 
-               if (strcmp(defel->defname, "connect") == 0 && connect)
+               if (IsSet(supported_opts, SUBOPT_CONNECT) &&
+                       strcmp(defel->defname, "connect") == 0)
                {
-                       if (connect_given)
+                       if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
 
-                       connect_given = true;
-                       *connect = defGetBoolean(defel);
+                       opts->specified_opts |= SUBOPT_CONNECT;
+                       opts->connect = defGetBoolean(defel);
                }
-               else if (strcmp(defel->defname, "enabled") == 0 && enabled)
+               else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
+                                strcmp(defel->defname, "enabled") == 0)
                {
-                       if (*enabled_given)
+                       if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
 
-                       *enabled_given = true;
-                       *enabled = defGetBoolean(defel);
+                       opts->specified_opts |= SUBOPT_ENABLED;
+                       opts->enabled = defGetBoolean(defel);
                }
-               else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
+               else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+                                strcmp(defel->defname, "create_slot") == 0)
                {
-                       if (create_slot_given)
+                       if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
 
-                       create_slot_given = true;
-                       *create_slot = defGetBoolean(defel);
+                       opts->specified_opts |= SUBOPT_CREATE_SLOT;
+                       opts->create_slot = defGetBoolean(defel);
                }
-               else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
+               else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
+                                strcmp(defel->defname, "slot_name") == 0)
                {
-                       if (*slot_name_given)
+                       if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
 
-                       *slot_name_given = true;
-                       *slot_name = defGetString(defel);
+                       opts->specified_opts |= SUBOPT_SLOT_NAME;
+                       opts->slot_name = defGetString(defel);
 
                        /* Setting slot_name = NONE is treated as no slot name. */
-                       if (strcmp(*slot_name, "none") == 0)
-                               *slot_name = NULL;
+                       if (strcmp(opts->slot_name, "none") == 0)
+                               opts->slot_name = NULL;
                }
-               else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
+               else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
+                                strcmp(defel->defname, "copy_data") == 0)
                {
-                       if (copy_data_given)
+                       if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
 
-                       copy_data_given = true;
-                       *copy_data = defGetBoolean(defel);
+                       opts->specified_opts |= SUBOPT_COPY_DATA;
+                       opts->copy_data = defGetBoolean(defel);
                }
-               else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
-                                synchronous_commit)
+               else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
+                                strcmp(defel->defname, "synchronous_commit") == 0)
                {
-                       if (*synchronous_commit)
+                       if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
 
-                       *synchronous_commit = defGetString(defel);
+                       opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
+                       opts->synchronous_commit = defGetString(defel);
 
                        /* Test if the given value is valid for synchronous_commit GUC. */
-                       (void) set_config_option("synchronous_commit", *synchronous_commit,
+                       (void) set_config_option("synchronous_commit", opts->synchronous_commit,
                                                                         PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
                                                                         false, 0, false);
                }
-               else if (strcmp(defel->defname, "refresh") == 0 && refresh)
+               else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
+                                strcmp(defel->defname, "refresh") == 0)
                {
-                       if (refresh_given)
+                       if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
 
-                       refresh_given = true;
-                       *refresh = defGetBoolean(defel);
+                       opts->specified_opts |= SUBOPT_REFRESH;
+                       opts->refresh = defGetBoolean(defel);
                }
-               else if (strcmp(defel->defname, "binary") == 0 && binary)
+               else if (IsSet(supported_opts, SUBOPT_BINARY) &&
+                                strcmp(defel->defname, "binary") == 0)
                {
-                       if (*binary_given)
+                       if (IsSet(opts->specified_opts, SUBOPT_BINARY))
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
 
-                       *binary_given = true;
-                       *binary = defGetBoolean(defel);
+                       opts->specified_opts |= SUBOPT_BINARY;
+                       opts->binary = defGetBoolean(defel);
                }
-               else if (strcmp(defel->defname, "streaming") == 0 && streaming)
+               else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
+                                strcmp(defel->defname, "streaming") == 0)
                {
-                       if (*streaming_given)
+                       if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
 
-                       *streaming_given = true;
-                       *streaming = defGetBoolean(defel);
+                       opts->specified_opts |= SUBOPT_STREAMING;
+                       opts->streaming = defGetBoolean(defel);
                }
                else
                        ereport(ERROR,
@@ -225,63 +247,81 @@ parse_subscription_options(List *options,
         * We've been explicitly asked to not connect, that requires some
         * additional processing.
         */
-       if (connect && !*connect)
+       if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
        {
                /* Check for incompatible options from the user. */
-               if (enabled && *enabled_given && *enabled)
+               if (opts->enabled &&
+                       IsSet(supported_opts, SUBOPT_ENABLED) &&
+                       IsSet(opts->specified_opts, SUBOPT_ENABLED))
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
                        /*- translator: both %s are strings of the form "option = value" */
                                         errmsg("%s and %s are mutually exclusive options",
                                                        "connect = false", "enabled = true")));
 
-               if (create_slot && create_slot_given && *create_slot)
+               if (opts->create_slot &&
+                       IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+                       IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
                                         errmsg("%s and %s are mutually exclusive options",
                                                        "connect = false", "create_slot = true")));
 
-               if (copy_data && copy_data_given && *copy_data)
+               if (opts->copy_data &&
+                       IsSet(supported_opts, SUBOPT_COPY_DATA) &&
+                       IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
                                         errmsg("%s and %s are mutually exclusive options",
                                                        "connect = false", "copy_data = true")));
 
                /* Change the defaults of other options. */
-               *enabled = false;
-               *create_slot = false;
-               *copy_data = false;
+               opts->enabled = false;
+               opts->create_slot = false;
+               opts->copy_data = false;
        }
 
        /*
         * Do additional checking for disallowed combination when slot_name = NONE
         * was used.
         */
-       if (slot_name && *slot_name_given && !*slot_name)
+       if (!opts->slot_name &&
+               IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
+               IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
        {
-               if (enabled && *enabled_given && *enabled)
+               if (opts->enabled &&
+                       IsSet(supported_opts, SUBOPT_ENABLED) &&
+                       IsSet(opts->specified_opts, SUBOPT_ENABLED))
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
                        /*- translator: both %s are strings of the form "option = value" */
                                         errmsg("%s and %s are mutually exclusive options",
                                                        "slot_name = NONE", "enabled = true")));
 
-               if (create_slot && create_slot_given && *create_slot)
+               if (opts->create_slot &&
+                       IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+                       IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
+                       /*- translator: both %s are strings of the form "option = value" */
                                         errmsg("%s and %s are mutually exclusive options",
                                                        "slot_name = NONE", "create_slot = true")));
 
-               if (enabled && !*enabled_given && *enabled)
+               if (opts->enabled &&
+                       IsSet(supported_opts, SUBOPT_ENABLED) &&
+                       !IsSet(opts->specified_opts, SUBOPT_ENABLED))
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
                        /*- translator: both %s are strings of the form "option = value" */
                                         errmsg("subscription with %s must also set %s",
                                                        "slot_name = NONE", "enabled = false")));
 
-               if (create_slot && !create_slot_given && *create_slot)
+               if (opts->create_slot &&
+                       IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+                       !IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
+                       /*- translator: both %s are strings of the form "option = value" */
                                         errmsg("subscription with %s must also set %s",
                                                        "slot_name = NONE", "create_slot = false")));
        }
@@ -331,37 +371,22 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
        Datum           values[Natts_pg_subscription];
        Oid                     owner = GetUserId();
        HeapTuple       tup;
-       bool            connect;
-       bool            enabled_given;
-       bool            enabled;
-       bool            copy_data;
-       bool            streaming;
-       bool            streaming_given;
-       char       *synchronous_commit;
        char       *conninfo;
-       char       *slotname;
-       bool            slotname_given;
-       bool            binary;
-       bool            binary_given;
        char            originname[NAMEDATALEN];
-       bool            create_slot;
        List       *publications;
+       bits32          supported_opts;
+       SubOpts         opts = {0};
 
        /*
         * Parse and check options.
         *
         * Connection and publication should not be specified here.
         */
-       parse_subscription_options(stmt->options,
-                                                          &connect,
-                                                          &enabled_given, &enabled,
-                                                          &create_slot,
-                                                          &slotname_given, &slotname,
-                                                          &copy_data,
-                                                          &synchronous_commit,
-                                                          NULL,        /* no "refresh" */
-                                                          &binary_given, &binary,
-                                                          &streaming_given, &streaming);
+       supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
+                                         SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
+                                         SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+                                         SUBOPT_STREAMING);
+       parse_subscription_options(stmt->options, supported_opts, &opts);
 
        /*
         * Since creating a replication slot is not transactional, rolling back
@@ -369,7 +394,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
         * CREATE SUBSCRIPTION inside a transaction block if creating a
         * replication slot.
         */
-       if (create_slot)
+       if (opts.create_slot)
                PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
 
        if (!superuser())
@@ -399,12 +424,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                                                stmt->subname)));
        }
 
-       if (!slotname_given && slotname == NULL)
-               slotname = stmt->subname;
+       if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
+               opts.slot_name == NULL)
+               opts.slot_name = stmt->subname;
 
        /* The default for synchronous_commit of subscriptions is off. */
-       if (synchronous_commit == NULL)
-               synchronous_commit = "off";
+       if (opts.synchronous_commit == NULL)
+               opts.synchronous_commit = "off";
 
        conninfo = stmt->conninfo;
        publications = stmt->publication;
@@ -426,18 +452,18 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
        values[Anum_pg_subscription_subname - 1] =
                DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
        values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
-       values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
-       values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
-       values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
+       values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
+       values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
+       values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
        values[Anum_pg_subscription_subconninfo - 1] =
                CStringGetTextDatum(conninfo);
-       if (slotname)
+       if (opts.slot_name)
                values[Anum_pg_subscription_subslotname - 1] =
-                       DirectFunctionCall1(namein, CStringGetDatum(slotname));
+                       DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
        else
                nulls[Anum_pg_subscription_subslotname - 1] = true;
        values[Anum_pg_subscription_subsynccommit - 1] =
-               CStringGetTextDatum(synchronous_commit);
+               CStringGetTextDatum(opts.synchronous_commit);
        values[Anum_pg_subscription_subpublications - 1] =
                publicationListToArray(publications);
 
@@ -456,7 +482,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
         * Connect to remote side to execute requested commands and fetch table
         * info.
         */
-       if (connect)
+       if (opts.connect)
        {
                char       *err;
                WalReceiverConn *wrconn;
@@ -477,7 +503,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                         * Set sync state based on if we were asked to do data copy or
                         * not.
                         */
-                       table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+                       table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 
                        /*
                         * Get the table list from publisher and build local table status
@@ -504,15 +530,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                         * won't use the initial snapshot for anything, so no need to
                         * export it.
                         */
-                       if (create_slot)
+                       if (opts.create_slot)
                        {
-                               Assert(slotname);
+                               Assert(opts.slot_name);
 
-                               walrcv_create_slot(wrconn, slotname, false,
+                               walrcv_create_slot(wrconn, opts.slot_name, false,
                                                                   CRS_NOEXPORT_SNAPSHOT, NULL);
                                ereport(NOTICE,
                                                (errmsg("created replication slot \"%s\" on publisher",
-                                                               slotname)));
+                                                               opts.slot_name)));
                        }
                }
                PG_FINALLY();
@@ -529,7 +555,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
        table_close(rel, RowExclusiveLock);
 
-       if (enabled)
+       if (opts.enabled)
                ApplyLauncherWakeupAtCommit();
 
        ObjectAddressSet(myself, SubscriptionRelationId, subid);
@@ -764,6 +790,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
        bool            update_tuple = false;
        Subscription *sub;
        Form_pg_subscription form;
+       bits32          supported_opts;
+       SubOpts         opts = {0};
 
        rel = table_open(SubscriptionRelationId, RowExclusiveLock);
 
@@ -799,59 +827,46 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
        {
                case ALTER_SUBSCRIPTION_OPTIONS:
                        {
-                               char       *slotname;
-                               bool            slotname_given;
-                               char       *synchronous_commit;
-                               bool            binary_given;
-                               bool            binary;
-                               bool            streaming_given;
-                               bool            streaming;
-
-                               parse_subscription_options(stmt->options,
-                                                                                  NULL,        /* no "connect" */
-                                                                                  NULL, NULL,  /* no "enabled" */
-                                                                                  NULL,        /* no "create_slot" */
-                                                                                  &slotname_given, &slotname,
-                                                                                  NULL,        /* no "copy_data" */
-                                                                                  &synchronous_commit,
-                                                                                  NULL,        /* no "refresh" */
-                                                                                  &binary_given, &binary,
-                                                                                  &streaming_given, &streaming);
-
-                               if (slotname_given)
+                               supported_opts = (SUBOPT_SLOT_NAME |
+                                                                 SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+                                                                 SUBOPT_STREAMING);
+
+                               parse_subscription_options(stmt->options, supported_opts, &opts);
+
+                               if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
                                {
-                                       if (sub->enabled && !slotname)
+                                       if (sub->enabled && !opts.slot_name)
                                                ereport(ERROR,
                                                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                                                 errmsg("cannot set %s for enabled subscription",
                                                                                "slot_name = NONE")));
 
-                                       if (slotname)
+                                       if (opts.slot_name)
                                                values[Anum_pg_subscription_subslotname - 1] =
-                                                       DirectFunctionCall1(namein, CStringGetDatum(slotname));
+                                                       DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
                                        else
                                                nulls[Anum_pg_subscription_subslotname - 1] = true;
                                        replaces[Anum_pg_subscription_subslotname - 1] = true;
                                }
 
-                               if (synchronous_commit)
+                               if (opts.synchronous_commit)
                                {
                                        values[Anum_pg_subscription_subsynccommit - 1] =
-                                               CStringGetTextDatum(synchronous_commit);
+                                               CStringGetTextDatum(opts.synchronous_commit);
                                        replaces[Anum_pg_subscription_subsynccommit - 1] = true;
                                }
 
-                               if (binary_given)
+                               if (IsSet(opts.specified_opts, SUBOPT_BINARY))
                                {
                                        values[Anum_pg_subscription_subbinary - 1] =
-                                               BoolGetDatum(binary);
+                                               BoolGetDatum(opts.binary);
                                        replaces[Anum_pg_subscription_subbinary - 1] = true;
                                }
 
-                               if (streaming_given)
+                               if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
                                {
                                        values[Anum_pg_subscription_substream - 1] =
-                                               BoolGetDatum(streaming);
+                                               BoolGetDatum(opts.streaming);
                                        replaces[Anum_pg_subscription_substream - 1] = true;
                                }
 
@@ -861,31 +876,19 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
                case ALTER_SUBSCRIPTION_ENABLED:
                        {
-                               bool            enabled,
-                                                       enabled_given;
-
-                               parse_subscription_options(stmt->options,
-                                                                                  NULL,        /* no "connect" */
-                                                                                  &enabled_given, &enabled,
-                                                                                  NULL,        /* no "create_slot" */
-                                                                                  NULL, NULL,  /* no "slot_name" */
-                                                                                  NULL,        /* no "copy_data" */
-                                                                                  NULL,        /* no "synchronous_commit" */
-                                                                                  NULL,        /* no "refresh" */
-                                                                                  NULL, NULL,  /* no "binary" */
-                                                                                  NULL, NULL); /* no streaming */
-                               Assert(enabled_given);
-
-                               if (!sub->slotname && enabled)
+                               parse_subscription_options(stmt->options, SUBOPT_ENABLED, &opts);
+                               Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
+
+                               if (!sub->slotname && opts.enabled)
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                                         errmsg("cannot enable subscription that does not have a slot name")));
 
                                values[Anum_pg_subscription_subenabled - 1] =
-                                       BoolGetDatum(enabled);
+                                       BoolGetDatum(opts.enabled);
                                replaces[Anum_pg_subscription_subenabled - 1] = true;
 
-                               if (enabled)
+                               if (opts.enabled)
                                        ApplyLauncherWakeupAtCommit();
 
                                update_tuple = true;
@@ -906,19 +909,9 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
                case ALTER_SUBSCRIPTION_SET_PUBLICATION:
                        {
-                               bool            copy_data;
-                               bool            refresh;
-
-                               parse_subscription_options(stmt->options,
-                                                                                  NULL,        /* no "connect" */
-                                                                                  NULL, NULL,  /* no "enabled" */
-                                                                                  NULL,        /* no "create_slot" */
-                                                                                  NULL, NULL,  /* no "slot_name" */
-                                                                                  &copy_data,
-                                                                                  NULL,        /* no "synchronous_commit" */
-                                                                                  &refresh,
-                                                                                  NULL, NULL,  /* no "binary" */
-                                                                                  NULL, NULL); /* no "streaming" */
+                               supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
+                               parse_subscription_options(stmt->options, supported_opts, &opts);
+
                                values[Anum_pg_subscription_subpublications - 1] =
                                        publicationListToArray(stmt->publication);
                                replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -926,7 +919,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                                update_tuple = true;
 
                                /* Refresh if user asked us to. */
-                               if (refresh)
+                               if (opts.refresh)
                                {
                                        if (!sub->enabled)
                                                ereport(ERROR,
@@ -939,7 +932,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                                        /* Make sure refresh sees the new list of publications. */
                                        sub->publications = stmt->publication;
 
-                                       AlterSubscription_refresh(sub, copy_data);
+                                       AlterSubscription_refresh(sub, opts.copy_data);
                                }
 
                                break;
@@ -948,25 +941,16 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
                case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
                        {
-                               bool            isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
-                               bool            copy_data = false;
-                               bool            refresh;
                                List       *publist;
+                               bool            isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
 
-                               parse_subscription_options(stmt->options,
-                                                                                  NULL,        /* no "connect" */
-                                                                                  NULL, NULL,  /* no "enabled" */
-                                                                                  NULL,        /* no "create_slot" */
-                                                                                  NULL, NULL,  /* no "slot_name" */
-                                                                                  isadd ? &copy_data : NULL,   /* for drop, no
-                                                                                                                                                * "copy_data" */
-                                                                                  NULL,        /* no "synchronous_commit" */
-                                                                                  &refresh,
-                                                                                  NULL, NULL,  /* no "binary" */
-                                                                                  NULL, NULL); /* no "streaming" */
+                               supported_opts = SUBOPT_REFRESH;
+                               if (isadd)
+                                       supported_opts |= SUBOPT_COPY_DATA;
 
-                               publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
+                               parse_subscription_options(stmt->options, supported_opts, &opts);
 
+                               publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
                                values[Anum_pg_subscription_subpublications - 1] =
                                        publicationListToArray(publist);
                                replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -974,7 +958,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                                update_tuple = true;
 
                                /* Refresh if user asked us to. */
-                               if (refresh)
+                               if (opts.refresh)
                                {
                                        if (!sub->enabled)
                                                ereport(ERROR,
@@ -987,7 +971,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                                        /* Only refresh the added/dropped list of publications. */
                                        sub->publications = stmt->publication;
 
-                                       AlterSubscription_refresh(sub, copy_data);
+                                       AlterSubscription_refresh(sub, opts.copy_data);
                                }
 
                                break;
@@ -995,27 +979,16 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
                case ALTER_SUBSCRIPTION_REFRESH:
                        {
-                               bool            copy_data;
-
                                if (!sub->enabled)
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                                         errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
-                               parse_subscription_options(stmt->options,
-                                                                                  NULL,        /* no "connect" */
-                                                                                  NULL, NULL,  /* no "enabled" */
-                                                                                  NULL,        /* no "create_slot" */
-                                                                                  NULL, NULL,  /* no "slot_name" */
-                                                                                  &copy_data,
-                                                                                  NULL,        /* no "synchronous_commit" */
-                                                                                  NULL,        /* no "refresh" */
-                                                                                  NULL, NULL,  /* no "binary" */
-                                                                                  NULL, NULL); /* no "streaming" */
+                               parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts);
 
                                PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-                               AlterSubscription_refresh(sub, copy_data);
+                               AlterSubscription_refresh(sub, opts.copy_data);
 
                                break;
                        }
index 64c06cf95235951db5688b38338a71436cff0502..a72d53a272f1d3c552eb646dd9303f53da033f59 100644 (file)
@@ -2511,6 +2511,7 @@ StringInfoData
 StripnullState
 SubLink
 SubLinkType
+SubOpts
 SubPlan
 SubPlanState
 SubRemoveRels