]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Add two-phase option in pg_createsubscriber.
authorAmit Kapila <akapila@postgresql.org>
Wed, 26 Feb 2025 05:42:50 +0000 (11:12 +0530)
committerAmit Kapila <akapila@postgresql.org>
Wed, 26 Feb 2025 05:42:50 +0000 (11:12 +0530)
This patch introduces the '--enable-two-phase' option to the
'pg_createsubscriber' utility, allowing users to enable two-phase commit
for all subscriptions during their creation.

Note that even without this option users can enable the two_phase option
for the subscriptions created by pg_createsubscriber. However, it requires
the subscription to be disabled first which could be inconvenient for
users.

When two-phase commit is enabled, prepared transactions are sent to the
subscriber at the time of 'PREPARE TRANSACTION', and they are processed as
two-phase transactions on the subscriber as well. If disabled, prepared
transactions are sent only when committed and are processed immediately by
the subscriber.

Author: Shubham Khanna <khannashubham1197@gmail.com>
Reviewed-by: vignesh C <vignesh21@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Ajin Cherian <itsajin@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAHv8RjLPdFP=kA5LNSmWZ=+GMXmO+LczvV6p9HJjsXxZz10KGA@mail.gmail.com

doc/src/sgml/ref/pg_createsubscriber.sgml
src/bin/pg_basebackup/pg_createsubscriber.c
src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

index d56487fe2ca8158a6c3bf8bdb5e93b08dbe8da58..65caa3f50959eb1b74063edfca89cd4880461a35 100644 (file)
@@ -165,6 +165,19 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-T</option></term>
+     <term><option>--enable-two-phase</option></term>
+     <listitem>
+      <para>
+       Enables <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
+       commit for the subscription. When multiple databases are specified, this
+       option applies uniformly to all subscriptions created on those databases.
+       The default is <literal>false</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-U <replaceable class="parameter">username</replaceable></option></term>
      <term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term>
@@ -300,7 +313,9 @@ PostgreSQL documentation
     greater than or equal to the number of specified databases.  The target
     server must have <xref linkend="guc-max-worker-processes"/> configured to a
     value greater than the number of specified databases.  The target server
-    must accept local connections.
+    must accept local connections. If you are planning to use the
+    <option>--enable-two-phase</option> switch then you will also need to set
+    the <xref linkend="guc-max-prepared-transactions"/> appropriately.
    </para>
 
    <para>
@@ -360,6 +375,7 @@ PostgreSQL documentation
    </para>
 
    <para>
+    Unless the <option>--enable-two-phase</option> switch is specified,
     <application>pg_createsubscriber</application> sets up logical
     replication with two-phase commit disabled.  This means that any
     prepared transactions will be replicated at the time
index 9fdf15e5ac03bdd0e4ca350e9bd3de6b75030976..a5a2d61165d2993cdd94679d4c0a1faa790efaa0 100644 (file)
@@ -38,6 +38,7 @@ struct CreateSubscriberOptions
        char       *socket_dir;         /* directory for Unix-domain socket, if any */
        char       *sub_port;           /* subscriber port number */
        const char *sub_username;       /* subscriber username */
+       bool            two_phase;              /* enable-two-phase option */
        SimpleStringList database_names;        /* list of database names */
        SimpleStringList pub_names; /* list of publication names */
        SimpleStringList sub_names; /* list of subscription names */
@@ -45,6 +46,7 @@ struct CreateSubscriberOptions
        int                     recovery_timeout;       /* stop recovery after this time */
 };
 
+/* per-database publication/subscription info */
 struct LogicalRepInfo
 {
        char       *dbname;                     /* database name */
@@ -58,6 +60,16 @@ struct LogicalRepInfo
        bool            made_publication;       /* publication was created */
 };
 
+/*
+ * Information shared across all the databases (or publications and
+ * subscriptions).
+ */
+struct LogicalRepInfos
+{
+       struct LogicalRepInfo *dbinfo;
+       bool            two_phase;              /* enable-two-phase option */
+};
+
 static void cleanup_objects_atexit(void);
 static void usage();
 static char *get_base_conninfo(const char *conninfo, char **dbname);
@@ -117,7 +129,7 @@ static bool dry_run = false;
 
 static bool success = false;
 
-static struct LogicalRepInfo *dbinfo;
+static struct LogicalRepInfos dbinfos;
 static int     num_dbs = 0;            /* number of specified databases */
 static int     num_pubs = 0;           /* number of specified publications */
 static int     num_subs = 0;           /* number of specified subscriptions */
@@ -172,17 +184,17 @@ cleanup_objects_atexit(void)
 
        for (int i = 0; i < num_dbs; i++)
        {
-               if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
+               if (dbinfos.dbinfo[i].made_publication || dbinfos.dbinfo[i].made_replslot)
                {
                        PGconn     *conn;
 
-                       conn = connect_database(dbinfo[i].pubconninfo, false);
+                       conn = connect_database(dbinfos.dbinfo[i].pubconninfo, false);
                        if (conn != NULL)
                        {
-                               if (dbinfo[i].made_publication)
-                                       drop_publication(conn, &dbinfo[i]);
-                               if (dbinfo[i].made_replslot)
-                                       drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
+                               if (dbinfos.dbinfo[i].made_publication)
+                                       drop_publication(conn, &dbinfos.dbinfo[i]);
+                               if (dbinfos.dbinfo[i].made_replslot)
+                                       drop_replication_slot(conn, &dbinfos.dbinfo[i], dbinfos.dbinfo[i].replslotname);
                                disconnect_database(conn, false);
                        }
                        else
@@ -192,16 +204,18 @@ cleanup_objects_atexit(void)
                                 * that some objects were left on primary and should be
                                 * removed before trying again.
                                 */
-                               if (dbinfo[i].made_publication)
+                               if (dbinfos.dbinfo[i].made_publication)
                                {
                                        pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
-                                                                  dbinfo[i].pubname, dbinfo[i].dbname);
+                                                                  dbinfos.dbinfo[i].pubname,
+                                                                  dbinfos.dbinfo[i].dbname);
                                        pg_log_warning_hint("Drop this publication before trying again.");
                                }
-                               if (dbinfo[i].made_replslot)
+                               if (dbinfos.dbinfo[i].made_replslot)
                                {
                                        pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
-                                                                  dbinfo[i].replslotname, dbinfo[i].dbname);
+                                                                  dbinfos.dbinfo[i].replslotname,
+                                                                  dbinfos.dbinfo[i].dbname);
                                        pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
                                }
                        }
@@ -227,6 +241,7 @@ usage(void)
        printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
        printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
        printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
+       printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
        printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
        printf(_("  -v, --verbose                   output verbose messages\n"));
        printf(_("      --config-file=FILENAME      use specified main server configuration\n"
@@ -479,9 +494,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
                                         dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
                                         dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
                                         dbinfo[i].pubconninfo);
-               pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
+               pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
                                         dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
-                                        dbinfo[i].subconninfo);
+                                        dbinfo[i].subconninfo,
+                                        dbinfos.two_phase ? "true" : "false");
 
                if (num_pubs > 0)
                        pubcell = pubcell->next;
@@ -938,11 +954,12 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
                failed = true;
        }
 
-       if (max_prepared_transactions != 0)
+       if (max_prepared_transactions != 0 && !dbinfos.two_phase)
        {
                pg_log_warning("two_phase option will not be enabled for replication slots");
                pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled.  "
                                                          "Prepared transactions will be replicated at COMMIT PREPARED.");
+               pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
        }
 
        /*
@@ -1345,8 +1362,9 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
        slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
 
        appendPQExpBuffer(str,
-                                         "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
-                                         slot_name_esc);
+                                         "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
+                                         slot_name_esc,
+                                         dbinfos.two_phase ? "true" : "false");
 
        PQfreemem(slot_name_esc);
 
@@ -1722,8 +1740,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
        appendPQExpBuffer(str,
                                          "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
                                          "WITH (create_slot = false, enabled = false, "
-                                         "slot_name = %s, copy_data = false)",
-                                         subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
+                                         "slot_name = %s, copy_data = false, two_phase = %s)",
+                                         subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
+                                         dbinfos.two_phase ? "true" : "false");
 
        PQfreemem(pubname_esc);
        PQfreemem(subname_esc);
@@ -1895,6 +1914,7 @@ main(int argc, char **argv)
                {"publisher-server", required_argument, NULL, 'P'},
                {"socketdir", required_argument, NULL, 's'},
                {"recovery-timeout", required_argument, NULL, 't'},
+               {"enable-two-phase", no_argument, NULL, 'T'},
                {"subscriber-username", required_argument, NULL, 'U'},
                {"verbose", no_argument, NULL, 'v'},
                {"version", no_argument, NULL, 'V'},
@@ -1950,6 +1970,7 @@ main(int argc, char **argv)
        opt.socket_dir = NULL;
        opt.sub_port = DEFAULT_SUB_PORT;
        opt.sub_username = NULL;
+       opt.two_phase = false;
        opt.database_names = (SimpleStringList)
        {
                0
@@ -1972,7 +1993,7 @@ main(int argc, char **argv)
 
        get_restricted_token();
 
-       while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+       while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
                                                        long_options, &option_index)) != -1)
        {
                switch (c)
@@ -2009,6 +2030,9 @@ main(int argc, char **argv)
                        case 't':
                                opt.recovery_timeout = atoi(optarg);
                                break;
+                       case 'T':
+                               opt.two_phase = true;
+                               break;
                        case 'U':
                                opt.sub_username = pg_strdup(optarg);
                                break;
@@ -2170,12 +2194,14 @@ main(int argc, char **argv)
        /* Rudimentary check for a data directory */
        check_data_directory(subscriber_dir);
 
+       dbinfos.two_phase = opt.two_phase;
+
        /*
         * Store database information for publisher and subscriber. It should be
         * called before atexit() because its return is used in the
         * cleanup_objects_atexit().
         */
-       dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
+       dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
 
        /* Register a function to clean up objects in case of failure */
        atexit(cleanup_objects_atexit);
@@ -2184,7 +2210,7 @@ main(int argc, char **argv)
         * Check if the subscriber data directory has the same system identifier
         * than the publisher data directory.
         */
-       pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
+       pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
        sub_sysid = get_standby_sysid(subscriber_dir);
        if (pub_sysid != sub_sysid)
                pg_fatal("subscriber data directory is not a copy of the source database cluster");
@@ -2214,10 +2240,10 @@ main(int argc, char **argv)
        start_standby_server(&opt, true, false);
 
        /* Check if the standby server is ready for logical replication */
-       check_subscriber(dbinfo);
+       check_subscriber(dbinfos.dbinfo);
 
        /* Check if the primary server is ready for logical replication */
-       check_publisher(dbinfo);
+       check_publisher(dbinfos.dbinfo);
 
        /*
         * Stop the target server. The recovery process requires that the server
@@ -2230,10 +2256,10 @@ main(int argc, char **argv)
        stop_standby_server(subscriber_dir);
 
        /* Create the required objects for each database on publisher */
-       consistent_lsn = setup_publisher(dbinfo);
+       consistent_lsn = setup_publisher(dbinfos.dbinfo);
 
        /* Write the required recovery parameters */
-       setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
+       setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
 
        /*
         * Start subscriber so the recovery parameters will take effect. Wait
@@ -2244,7 +2270,7 @@ main(int argc, char **argv)
        start_standby_server(&opt, true, true);
 
        /* Waiting the subscriber to be promoted */
-       wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
+       wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
 
        /*
         * Create the subscription for each database on subscriber. It does not
@@ -2252,13 +2278,13 @@ main(int argc, char **argv)
         * point to the LSN reported by setup_publisher().  It also cleans up
         * publications created by this tool and replication to the standby.
         */
-       setup_subscriber(dbinfo, consistent_lsn);
+       setup_subscriber(dbinfos.dbinfo, consistent_lsn);
 
        /* Remove primary_slot_name if it exists on primary */
-       drop_primary_replication_slot(dbinfo, primary_slot_name);
+       drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
 
        /* Remove failover replication slots if they exist on subscriber */
-       drop_failover_replication_slots(dbinfo);
+       drop_failover_replication_slots(dbinfos.dbinfo);
 
        /* Stop the subscriber */
        pg_log_info("stopping the subscriber");
index c8dbdb7e9b79c74a2c6708b4ef3eefb563be670f..c35fa108ce3459afd09d793f6217e390f78fae7a 100644 (file)
@@ -373,6 +373,7 @@ command_ok(
 
 # Run pg_createsubscriber on node S.  --verbose is used twice
 # to show more information.
+# In passing, also test the --enable-two-phase option
 command_ok(
        [
                'pg_createsubscriber',
@@ -388,6 +389,7 @@ command_ok(
                '--replication-slot' => 'replslot2',
                '--database' => $db1,
                '--database' => $db2,
+               '--enable-two-phase'
        ],
        'run pg_createsubscriber on node S');
 
@@ -406,6 +408,15 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Verify that all subtwophase states are pending or enabled,
+# e.g. there are no subscriptions where subtwophase is disabled ('d')
+is( $node_s->safe_psql(
+               'postgres',
+               "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'"
+       ),
+       't',
+       'subscriptions are created with the two-phase option enabled');
+
 # Confirm the pre-existing subscription has been removed
 $result = $node_s->safe_psql(
        'postgres', qq(