]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
pg_upgrade: Optimize logical replication slot caught-up check.
authorMasahiko Sawada <msawada@postgresql.org>
Thu, 5 Feb 2026 01:11:27 +0000 (17:11 -0800)
committerMasahiko Sawada <msawada@postgresql.org>
Thu, 5 Feb 2026 01:11:27 +0000 (17:11 -0800)
Commit 29d0a77fa6 improved pg_upgrade to allow migrating logical slots
provided that all logical slots have caught up (i.e., they have no
pending decodable WAL records). Previously, this verification was done
by checking each slot individually, which could be time-consuming if
there were many logical slots to migrate.

This commit optimizes the check to avoid reading the same WAL stream
multiple times. It performs the check only for the slot with the
minimum confirmed_flush_lsn and applies the result to all other slots
in the same database. This limits the check to at most one logical
slot per database.

During the check, we identify the last decodable WAL record's LSN to
report any slots with unconsumed records, consistent with the existing
error reporting behavior. Additionally, the maximum
confirmed_flush_lsn among all logical slots on the database is used as
an early scan cutoff; finding a decodable WAL record beyond this point
implies that no slot has caught up.

Performance testing demonstrated that the execution time remains
stable regardless of the number of slots in the database.

Note that we do not distinguish slots based on their output plugins. A
hypothetical plugin might use a replication origin filter that filters
out changes from a specific origin. In such cases, we might get a
false positive (erroneously considering a slot caught up). However,
this is safe from a data integrity standpoint, such scenarios are
rare, and the impact of a false positive is minimal.

This optimization is applied only when the old cluster is version 19
or later.

Bump catalog version.

Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAD21AoBZ0LAcw1OHGEKdW7S5TRJaURdhEk3CLAW69_siqfqyAg@mail.gmail.com

src/backend/replication/logical/logical.c
src/backend/utils/adt/pg_upgrade_support.c
src/bin/pg_upgrade/check.c
src/bin/pg_upgrade/info.c
src/bin/pg_upgrade/t/003_logical_slots.pl
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/replication/logical.h

index 85060d19a49b29fc281110d6bb17ec059e1f62a7..603a2b94d054d8eb9b1a0b1549380c155fd6157b 100644 (file)
@@ -1986,16 +1986,22 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 }
 
 /*
- * Read up to the end of WAL starting from the decoding slot's restart_lsn.
- * Return true if any meaningful/decodable WAL records are encountered,
- * otherwise false.
+ * Read up to the end of WAL starting from the decoding slot's restart_lsn
+ * to end_of_wal in order to check if any meaningful/decodable WAL records
+ * are encountered. scan_cutoff_lsn is the LSN, where we can terminate the
+ * WAL scan early if we find a decodable WAL record after this LSN.
+ *
+ * Returns the last LSN decodable WAL record's LSN if found, otherwise
+ * returns InvalidXLogRecPtr.
  */
-bool
-LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
+XLogRecPtr
+LogicalReplicationSlotCheckPendingWal(XLogRecPtr end_of_wal,
+                                                                         XLogRecPtr scan_cutoff_lsn)
 {
-       bool            has_pending_wal = false;
+       XLogRecPtr      last_pending_wal = InvalidXLogRecPtr;
 
        Assert(MyReplicationSlot);
+       Assert(end_of_wal >= scan_cutoff_lsn);
 
        PG_TRY();
        {
@@ -2023,8 +2029,7 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
                /* Invalidate non-timetravel entries */
                InvalidateSystemCaches();
 
-               /* Loop until the end of WAL or some changes are processed */
-               while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
+               while (ctx->reader->EndRecPtr < end_of_wal)
                {
                        XLogRecord *record;
                        char       *errm = NULL;
@@ -2037,7 +2042,20 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
                        if (record != NULL)
                                LogicalDecodingProcessRecord(ctx, ctx->reader);
 
-                       has_pending_wal = ctx->processing_required;
+                       if (ctx->processing_required)
+                       {
+                               last_pending_wal = ctx->reader->ReadRecPtr;
+
+                               /*
+                                * If we find a decodable WAL after the scan_cutoff_lsn point,
+                                * we can terminate the scan early.
+                                */
+                               if (last_pending_wal >= scan_cutoff_lsn)
+                                       break;
+
+                               /* Reset the flag and continue checking */
+                               ctx->processing_required = false;
+                       }
 
                        CHECK_FOR_INTERRUPTS();
                }
@@ -2055,7 +2073,7 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
        }
        PG_END_TRY();
 
-       return has_pending_wal;
+       return last_pending_wal;
 }
 
 /*
index 697143aec440a2db57bb6c977f6de545f86ffffc..b505a6b4feeb838e28be1a7897753dae89ab4049 100644 (file)
@@ -282,11 +282,12 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
  * upgraded without data loss.
  */
 Datum
-binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
+binary_upgrade_check_logical_slot_pending_wal(PG_FUNCTION_ARGS)
 {
        Name            slot_name;
        XLogRecPtr      end_of_wal;
-       bool            found_pending_wal;
+       XLogRecPtr      scan_cutoff_lsn;
+       XLogRecPtr      last_pending_wal;
 
        CHECK_IS_BINARY_UPGRADE;
 
@@ -297,6 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
        Assert(has_rolreplication(GetUserId()));
 
        slot_name = PG_GETARG_NAME(0);
+       scan_cutoff_lsn = PG_GETARG_LSN(1);
 
        /* Acquire the given slot */
        ReplicationSlotAcquire(NameStr(*slot_name), true, true);
@@ -307,12 +309,16 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
        Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 
        end_of_wal = GetFlushRecPtr(NULL);
-       found_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal);
+       last_pending_wal = LogicalReplicationSlotCheckPendingWal(end_of_wal,
+                                                                                                                        scan_cutoff_lsn);
 
        /* Clean up */
        ReplicationSlotRelease();
 
-       PG_RETURN_BOOL(!found_pending_wal);
+       if (XLogRecPtrIsValid(last_pending_wal))
+               PG_RETURN_LSN(last_pending_wal);
+       else
+               PG_RETURN_NULL();
 }
 
 /*
index a8d20a92a98d58918555b5fe7a958991745d9829..5c73773bf0eeed6254c0eae95cc4e219cc178201 100644 (file)
@@ -622,7 +622,7 @@ check_and_dump_old_cluster(void)
        {
                /*
                 * Logical replication slots can be migrated since PG17. See comments
-                * atop get_old_cluster_logical_slot_infos().
+                * in get_db_rel_and_slot_infos().
                 */
                check_old_cluster_for_valid_slots();
 
index 47e8d1039a2ac1579af3ae300845bdb963a3a5a1..ad4b1530e6d1b276a6ddc1e623885e00d88f8ad8 100644 (file)
@@ -29,7 +29,7 @@ static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static char *get_old_cluster_logical_slot_infos_query(void);
+static const char *get_old_cluster_logical_slot_infos_query(ClusterInfo *cluster);
 static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg);
 
 
@@ -281,7 +281,6 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
 {
        UpgradeTask *task = upgrade_task_create();
        char       *rel_infos_query = NULL;
-       char       *logical_slot_infos_query = NULL;
 
        if (cluster->dbarr.dbs != NULL)
                free_db_and_rel_infos(&cluster->dbarr);
@@ -306,20 +305,15 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
         */
        if (cluster == &old_cluster &&
                GET_MAJOR_VERSION(cluster->major_version) > 1600)
-       {
-               logical_slot_infos_query = get_old_cluster_logical_slot_infos_query();
                upgrade_task_add_step(task,
-                                                         logical_slot_infos_query,
+                                                         get_old_cluster_logical_slot_infos_query(cluster),
                                                          process_old_cluster_logical_slot_infos,
                                                          true, NULL);
-       }
 
        upgrade_task_run(task, cluster);
        upgrade_task_free(task);
 
        pg_free(rel_infos_query);
-       if (logical_slot_infos_query)
-               pg_free(logical_slot_infos_query);
 
        if (cluster == &old_cluster)
                pg_log(PG_VERBOSE, "\nsource databases:");
@@ -681,17 +675,15 @@ process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg)
  * get_db_rel_and_slot_infos()'s UpgradeTask.  The status of each logical slot
  * is checked in check_old_cluster_for_valid_slots().
  */
-static char *
-get_old_cluster_logical_slot_infos_query(void)
+static const char *
+get_old_cluster_logical_slot_infos_query(ClusterInfo *cluster)
 {
        /*
         * Fetch the logical replication slot information. The check whether the
         * slot is considered caught up is done by an upgrade function. This
         * regards the slot as caught up if we don't find any decodable changes.
-        * See binary_upgrade_logical_slot_has_caught_up().
-        *
-        * Note that we can't ensure whether the slot is caught up during
-        * live_check as the new WAL records could be generated.
+        * The implementation of this check varies depending on the server
+        * version.
         *
         * We intentionally skip checking the WALs for invalidated slots as the
         * corresponding WALs could have been removed for such slots.
@@ -701,21 +693,80 @@ get_old_cluster_logical_slot_infos_query(void)
         * started and stopped several times causing any temporary slots to be
         * removed.
         */
-       return psprintf("SELECT slot_name, plugin, two_phase, failover, "
-                                       "%s as caught_up, invalidation_reason IS NOT NULL as invalid "
-                                       "FROM pg_catalog.pg_replication_slots "
-                                       "WHERE slot_type = 'logical' AND "
-                                       "database = current_database() AND "
-                                       "temporary IS FALSE;",
-                                       user_opts.live_check ? "FALSE" :
-                                       "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
-                                       "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
-                                       "END)");
+
+       if (user_opts.live_check)
+       {
+               /*
+                * We skip the caught-up check during live_check. We cannot verify
+                * whether the slot is caught up in this mode, as new WAL records
+                * could be generated concurrently.
+                */
+               return "SELECT slot_name, plugin, two_phase, failover, "
+                       "FALSE as caught_up, "
+                       "invalidation_reason IS NOT NULL as invalid "
+                       "FROM pg_catalog.pg_replication_slots "
+                       "WHERE slot_type = 'logical' AND "
+                       "database = current_database() AND "
+                       "temporary IS FALSE";
+       }
+       else if (GET_MAJOR_VERSION(cluster->major_version) >= 1900)
+       {
+               /*
+                * For PG19 and later, we optimize the slot caught-up check to avoid
+                * reading the same WAL stream multiple times: execute the caught-up
+                * check only for the slot with the minimum confirmed_flush_lsn, and
+                * apply the same result to all other slots in the same database. This
+                * limits the check to at most one logical slot per database. We also
+                * use the maximum confirmed_flush_lsn among all logical slots on the
+                * database as an early scan cutoff; finding a decodable WAL record
+                * beyond this point implies that no slot has caught up.
+                *
+                * Note that we don't distinguish slots based on their output plugin.
+                * If a plugin applies replication origin filters, we might get a
+                * false positive (i.e., erroneously considering a slot caught up).
+                * However, such cases are very rare, and the impact of a false
+                * positive is minimal.
+                */
+               return "WITH check_caught_up AS ( "
+                       "  SELECT pg_catalog.binary_upgrade_check_logical_slot_pending_wal(slot_name, "
+                       "    MAX(confirmed_flush_lsn) OVER ()) as last_pending_wal "
+                       "  FROM pg_replication_slots "
+                       "  WHERE slot_type = 'logical' AND "
+                       "    database = current_database() AND "
+                       "    temporary IS FALSE AND "
+                       "    invalidation_reason IS NULL "
+                       "  ORDER BY confirmed_flush_lsn ASC "
+                       "  LIMIT 1 "
+                       ") "
+                       "SELECT slot_name, plugin, two_phase, failover, "
+                       "CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
+                       "ELSE  last_pending_wal IS NULL OR "
+                       "  confirmed_flush_lsn > last_pending_wal "
+                       "END as caught_up, "
+                       "invalidation_reason IS NOT NULL as invalid "
+                       "FROM pg_catalog.pg_replication_slots, check_caught_up "
+                       "WHERE slot_type = 'logical' AND "
+                       "database = current_database() AND "
+                       "temporary IS FALSE ";
+       }
+
+       /*
+        * For PG18 and earlier, we call
+        * binary_upgrade_logical_slot_has_caught_up() for each logical slot.
+        */
+       return "SELECT slot_name, plugin, two_phase, failover, "
+               "CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
+               "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+               "END as caught_up, "
+               "invalidation_reason IS NOT NULL as invalid "
+               "FROM pg_catalog.pg_replication_slots "
+               "WHERE slot_type = 'logical' AND "
+               "database = current_database() AND "
+               "temporary IS FALSE ";
 }
 
 /*
- * Callback function for processing results of the query returned by
- * get_old_cluster_logical_slot_infos_query(), which is used for
+ * Callback function for processing results of the query, which is used for
  * get_db_rel_and_slot_infos()'s UpgradeTask.  This function stores the logical
  * slot information for later use.
  */
@@ -768,7 +819,7 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg)
  *
  * Note: this function always returns 0 if the old_cluster is PG16 and prior
  * because we gather slot information only for cluster versions greater than or
- * equal to PG17. See get_old_cluster_logical_slot_infos().
+ * equal to PG17. See get_db_rel_and_slot_infos().
  */
 int
 count_old_cluster_logical_slots(void)
index b9abc3a2e2148a53bb990063371862ab4c874c7d..15e6d267f2f11c9341ba189009d08c504823080d 100644 (file)
@@ -64,6 +64,7 @@ $oldpub->safe_psql(
        'postgres', qq[
        SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding');
        SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding');
+       SELECT pg_create_logical_replication_slot('test_slot3', 'test_decoding');
 ]);
 $oldpub->stop();
 
@@ -77,7 +78,7 @@ command_checks_all(
        [@pg_upgrade_cmd],
        1,
        [
-               qr/"max_replication_slots" \(1\) must be greater than or equal to the number of logical replication slots \(2\) on the old cluster/
+               qr/"max_replication_slots" \(1\) must be greater than or equal to the number of logical replication slots \(3\) on the old cluster/
        ],
        [qr//],
        'run of pg_upgrade where the new cluster has insufficient "max_replication_slots"'
@@ -85,29 +86,31 @@ command_checks_all(
 ok(-d $newpub->data_dir . "/pg_upgrade_output.d",
        "pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
-# Set 'max_replication_slots' to match the number of slots (2) present on the
+# Set 'max_replication_slots' to match the number of slots (3) present on the
 # old cluster. Both slots will be used for subsequent tests.
-$newpub->append_conf('postgresql.conf', "max_replication_slots = 2");
+$newpub->append_conf('postgresql.conf', "max_replication_slots = 3");
 
 
 # ------------------------------
 # TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records
 
 # Preparations for the subsequent test:
-# 1. Generate extra WAL records. At this point neither test_slot1 nor
-#       test_slot2 has consumed them.
+# 1. Generate extra WAL records. At this point none of the slots has consumed them.
 #
 # 2. Advance the slot test_slot2 up to the current WAL location, but test_slot1
 #       still has unconsumed WAL records.
 #
 # 3. Emit a non-transactional message. This will cause test_slot2 to detect the
 #       unconsumed WAL record.
+#
+# 4. Advance the slot test_slots3 up to the current WAL location.
 $oldpub->start;
 $oldpub->safe_psql(
        'postgres', qq[
                CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
                SELECT pg_replication_slot_advance('test_slot2', pg_current_wal_lsn());
-               SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message');
+               SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message', true);
+               SELECT pg_replication_slot_advance('test_slot3', pg_current_wal_lsn());
 ]);
 $oldpub->stop;
 
@@ -138,8 +141,9 @@ find(
        },
        $newpub->data_dir . "/pg_upgrade_output.d");
 
-# Check the file content. Both slots should be reporting that they have
-# unconsumed WAL records.
+# Check the file content. While both test_slot1 and test_slot2 should be reporting
+# that they have unconsumed WAL records, test_slot3 should not be reported as
+# it has caught up.
 like(
        slurp_file($slots_filename),
        qr/The slot \"test_slot1\" has not consumed the WAL yet/m,
@@ -148,6 +152,10 @@ like(
        slurp_file($slots_filename),
        qr/The slot \"test_slot2\" has not consumed the WAL yet/m,
        'the previous test failed due to unconsumed WALs');
+unlike(
+       slurp_file($slots_filename),
+       qr/test_slot3/m,
+       'caught-up slot is not reported');
 
 
 # ------------------------------
@@ -162,6 +170,7 @@ $oldpub->safe_psql(
        'postgres', qq[
        SELECT * FROM pg_drop_replication_slot('test_slot1');
        SELECT * FROM pg_drop_replication_slot('test_slot2');
+       SELECT * FROM pg_drop_replication_slot('test_slot3');
        CREATE PUBLICATION regress_pub FOR ALL TABLES;
 ]);
 
index fb5770266663e64127e42030f5e1a1d4ec0863ff..a09d8a6c645bbf5892d169d9b6b67e05d55cc82e 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202601261
+#define CATALOG_VERSION_NO     202602051
 
 #endif
index 5e5e33f64fcb9ccab01133b55ad16e88c049cb4f..83f6501df38647f779b293c9bf1a489dc53ac2f4 100644 (file)
   proparallel => 'u', prorettype => 'void', proargtypes => 'oid',
   prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' },
 { oid => '6312', descr => 'for use by pg_upgrade',
-  proname => 'binary_upgrade_logical_slot_has_caught_up', provolatile => 'v',
-  proparallel => 'u', prorettype => 'bool', proargtypes => 'name',
-  prosrc => 'binary_upgrade_logical_slot_has_caught_up' },
+  proname => 'binary_upgrade_check_logical_slot_pending_wal', provolatile => 'v',
+  proparallel => 'u', prorettype => 'pg_lsn', proargtypes => 'name pg_lsn',
+  prosrc => 'binary_upgrade_check_logical_slot_pending_wal' },
 { oid => '6319',
   descr => 'for use by pg_upgrade (relation for pg_subscription_rel)',
   proname => 'binary_upgrade_add_sub_rel_state', proisstrict => 'f',
index 7f03537bda716bef485f4e5bd0bcba79a8bfd8a2..bc9d4ece67261445fbf6804a7fa6e971f49f5167 100644 (file)
@@ -148,7 +148,8 @@ extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, ReplOriginI
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
-extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal);
+extern XLogRecPtr LogicalReplicationSlotCheckPendingWal(XLogRecPtr end_of_wal,
+                                                                                                               XLogRecPtr scan_cutoff_lsn);
 extern XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
                                                                                                          bool *found_consistent_snapshot);