]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Add slotsync skip statistics.
authorAmit Kapila <akapila@postgresql.org>
Tue, 25 Nov 2025 06:47:49 +0000 (06:47 +0000)
committerAmit Kapila <akapila@postgresql.org>
Tue, 25 Nov 2025 07:06:02 +0000 (07:06 +0000)
This patch adds two new columns to the pg_stat_replication_slots view:
slotsync_skip_count - the total number of times a slotsync operation was
skipped.
slotsync_skip_at - the timestamp of the most recent skip.

These additions provide better visibility into replication slot
synchronization behavior.

A future patch will introduce the slotsync_skip_reason column in
pg_replication_slots to capture the reason for skip.

Author: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Ashutosh Sharma <ashu.coek88@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAE9k0PkhfKrTEAsGz4DjOhEj1nQ+hbQVfvWUxNacD38ibW3a1g@mail.gmail.com

contrib/test_decoding/expected/stats.out
doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/replication/logical/slotsync.c
src/backend/utils/activity/pgstat_replslot.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/pgstat.h
src/test/recovery/t/040_standby_failover_slots_sync.pl
src/test/regress/expected/rules.out

index 28da9123cc8b992ebaeca7ff1f481a14e84672bb..e5117f88a146588f32aee8d9accdc0467198b539 100644 (file)
@@ -78,17 +78,17 @@ SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count,
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_skip_at | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+------------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 |                   0 |                  | 
 (1 row)
 
 SELECT pg_stat_reset_replication_slot('do-not-exist');
 ERROR:  replication slot "do-not-exist" does not exist
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_skip_at | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+------------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 |                   0 |                  | 
 (1 row)
 
 -- spilling the xact
index 436ef0e8bd06801e572e9459057e1cd2ec5a0d29..dcc8474a7f791f447ed8218bd75439ba7cf1392b 100644 (file)
@@ -1659,6 +1659,30 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>slotsync_skip_count</structfield><type>bigint</type>
+       </para>
+       <para>
+        Number of times the slot synchronization is skipped. Slot
+        synchronization occur only on standby servers and thus this column has
+        no meaning on the primary server.
+       </para>
+      </entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>slotsync_skip_at</structfield><type>timestamp with time zone</type>
+       </para>
+       <para>
+        Time at which last slot synchronization was skipped. Slot
+        synchronization occur only on standby servers and thus this column has
+        no meaning on the primary server.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
index 95ad29a64b987bfd89707264fdc45cbbd8d1fed3..6fffdb9398e5ab542706a49491b2208d3c827981 100644 (file)
@@ -1076,6 +1076,8 @@ CREATE VIEW pg_stat_replication_slots AS
             s.mem_exceeded_count,
             s.total_txns,
             s.total_bytes,
+            s.slotsync_skip_count,
+            s.slotsync_skip_at,
             s.stats_reset
     FROM pg_replication_slots as r,
         LATERAL pg_stat_get_replication_slot(slot_name) as s
index 8b4afd87dc91cfa9c1113d7b7c639117816f0118..7e9dc7f18bd6e4f12e0584e3727d07fc70b5ef5a 100644 (file)
@@ -187,6 +187,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                TransactionIdPrecedes(remote_slot->catalog_xmin,
                                                          slot->data.catalog_xmin))
        {
+               /* Update slot sync skip stats */
+               pgstat_report_replslotsync(slot);
+
                /*
                 * This can happen in following situations:
                 *
@@ -277,6 +280,13 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                                                errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
                                                                                   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
                                                                                   LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
+
+                       /*
+                        * If we can't reach a consistent snapshot, the slot won't be
+                        * persisted. See update_and_persist_local_synced_slot().
+                        */
+                       if (found_consistent_snapshot && !(*found_consistent_snapshot))
+                               pgstat_report_replslotsync(slot);
                }
 
                updated_xmin_or_lsn = true;
@@ -563,6 +573,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
        bool            found_consistent_snapshot = false;
        bool            remote_slot_precedes = false;
 
+       /* Slotsync skip stats are handled in function update_local_synced_slot() */
        (void) update_local_synced_slot(remote_slot, remote_dbid,
                                                                        &found_consistent_snapshot,
                                                                        &remote_slot_precedes);
@@ -624,31 +635,9 @@ static bool
 synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 {
        ReplicationSlot *slot;
-       XLogRecPtr      latestFlushPtr;
+       XLogRecPtr      latestFlushPtr = GetStandbyFlushRecPtr(NULL);
        bool            slot_updated = false;
 
-       /*
-        * Make sure that concerned WAL is received and flushed before syncing
-        * slot to target lsn received from the primary server.
-        */
-       latestFlushPtr = GetStandbyFlushRecPtr(NULL);
-       if (remote_slot->confirmed_lsn > latestFlushPtr)
-       {
-               /*
-                * Can get here only if GUC 'synchronized_standby_slots' on the
-                * primary server was not configured correctly.
-                */
-               ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
-                               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                               errmsg("skipping slot synchronization because the received slot sync"
-                                          " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
-                                          LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
-                                          remote_slot->name,
-                                          LSN_FORMAT_ARGS(latestFlushPtr)));
-
-               return false;
-       }
-
        /* Search for the named slot */
        if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
        {
@@ -707,10 +696,38 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
                /* Skip the sync of an invalidated slot */
                if (slot->data.invalidated != RS_INVAL_NONE)
                {
+                       pgstat_report_replslotsync(slot);
+
                        ReplicationSlotRelease();
                        return slot_updated;
                }
 
+               /*
+                * Make sure that concerned WAL is received and flushed before syncing
+                * slot to target lsn received from the primary server.
+                *
+                * Report statistics only after the slot has been acquired, ensuring
+                * it cannot be dropped during the reporting process.
+                */
+               if (remote_slot->confirmed_lsn > latestFlushPtr)
+               {
+                       pgstat_report_replslotsync(slot);
+
+                       /*
+                        * Can get here only if GUC 'synchronized_standby_slots' on the
+                        * primary server was not configured correctly.
+                        */
+                       ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
+                                       errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                       errmsg("skipping slot synchronization because the received slot sync"
+                                                  " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
+                                                  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+                                                  remote_slot->name,
+                                                  LSN_FORMAT_ARGS(latestFlushPtr)));
+
+                       return slot_updated;
+               }
+
                /* Slot not ready yet, let's attempt to make it sync-ready now. */
                if (slot->data.persistency == RS_TEMPORARY)
                {
@@ -784,6 +801,32 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
                ReplicationSlotsComputeRequiredXmin(true);
                LWLockRelease(ProcArrayLock);
 
+               /*
+                * Make sure that concerned WAL is received and flushed before syncing
+                * slot to target lsn received from the primary server.
+                *
+                * Report statistics only after the slot has been acquired, ensuring
+                * it cannot be dropped during the reporting process.
+                */
+               if (remote_slot->confirmed_lsn > latestFlushPtr)
+               {
+                       pgstat_report_replslotsync(slot);
+
+                       /*
+                        * Can get here only if GUC 'synchronized_standby_slots' on the
+                        * primary server was not configured correctly.
+                        */
+                       ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
+                                       errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                       errmsg("skipping slot synchronization because the received slot sync"
+                                                  " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
+                                                  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+                                                  remote_slot->name,
+                                                  LSN_FORMAT_ARGS(latestFlushPtr)));
+
+                       return false;
+               }
+
                update_and_persist_local_synced_slot(remote_slot, remote_dbid);
 
                slot_updated = true;
index d210c261ac65f53fefe2fd5925c63a20a90316da..f93179146c2c7ce6299f9bf2b011e7747bc0b136 100644 (file)
@@ -102,6 +102,36 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
        pgstat_unlock_entry(entry_ref);
 }
 
+/*
+ * Report replication slot sync skip statistics.
+ *
+ * Similar to pgstat_report_replslot(), we can rely on the stats for the
+ * slot to exist and to belong to this slot.
+ */
+void
+pgstat_report_replslotsync(ReplicationSlot *slot)
+{
+       PgStat_EntryRef *entry_ref;
+       PgStatShared_ReplSlot *shstatent;
+       PgStat_StatReplSlotEntry *statent;
+
+       /* Slot sync stats are valid only for logical slots on standby. */
+       Assert(SlotIsLogical(slot));
+       Assert(RecoveryInProgress());
+
+       entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
+                                                                                       ReplicationSlotIndex(slot), false);
+       Assert(entry_ref != NULL);
+
+       shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
+       statent = &shstatent->stats;
+
+       statent->slotsync_skip_count += 1;
+       statent->slotsync_skip_at = GetCurrentTimestamp();
+
+       pgstat_unlock_entry(entry_ref);
+}
+
 /*
  * Report replication slot creation.
  *
@@ -133,7 +163,7 @@ pgstat_create_replslot(ReplicationSlot *slot)
  * Report replication slot has been acquired.
  *
  * This guarantees that a stats entry exists during later
- * pgstat_report_replslot() calls.
+ * pgstat_report_replslot() or pgstat_report_replslotsync() calls.
  *
  * If we previously crashed, no stats data exists. But if we did not crash,
  * the stats do belong to this slot:
index 3d98d064a94478edff2053aa5c2b610349747f41..7e2ed69138aa225564e32bf700c186cb1c9ecbca 100644 (file)
@@ -2129,7 +2129,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 11
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 13
        text       *slotname_text = PG_GETARG_TEXT_P(0);
        NameData        slotname;
        TupleDesc       tupdesc;
@@ -2160,7 +2160,11 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
                                           INT8OID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
                                           INT8OID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 11, "slotsync_skip_count",
+                                          INT8OID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_skip_at",
+                                          TIMESTAMPTZOID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
                                           TIMESTAMPTZOID, -1, 0);
        BlessTupleDesc(tupdesc);
 
@@ -2186,11 +2190,17 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
        values[7] = Int64GetDatum(slotent->mem_exceeded_count);
        values[8] = Int64GetDatum(slotent->total_txns);
        values[9] = Int64GetDatum(slotent->total_bytes);
+       values[10] = Int64GetDatum(slotent->slotsync_skip_count);
+
+       if (slotent->slotsync_skip_at == 0)
+               nulls[11] = true;
+       else
+               values[11] = TimestampTzGetDatum(slotent->slotsync_skip_at);
 
        if (slotent->stat_reset_timestamp == 0)
-               nulls[10] = true;
+               nulls[12] = true;
        else
-               values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+               values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
        /* Returns the record as Datum */
        PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
index 53c12364d5d7ee8a7a3d593eb7f450700eee66da..ee642e5510d8f9c90e760bdafe25ded852bc41b4 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202511221
+#define CATALOG_VERSION_NO     202511251
 
 #endif
index 1edb18958f758a62e6b46785f84fe9e3d5fd579b..664319407008df6deb2fa9d02cff119eef69cf56 100644 (file)
 { oid => '6169', descr => 'statistics: information about replication slot',
   proname => 'pg_stat_get_replication_slot', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,stats_reset}',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,slotsync_skip_count,slotsync_skip_at,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
 
 { oid => '6230', descr => 'statistics: check if a stats object exists',
index a68e725259a22c7e592013a7258de241674aae7b..ad85134f27a3e641e212e36bf7dd1c945ce526f3 100644 (file)
@@ -214,7 +214,7 @@ typedef struct PgStat_TableXactStatus
  * ------------------------------------------------------------
  */
 
-#define PGSTAT_FILE_FORMAT_ID  0x01A5BCBA
+#define PGSTAT_FILE_FORMAT_ID  0x01A5BCBB
 
 typedef struct PgStat_ArchiverStats
 {
@@ -400,6 +400,8 @@ typedef struct PgStat_StatReplSlotEntry
        PgStat_Counter mem_exceeded_count;
        PgStat_Counter total_txns;
        PgStat_Counter total_bytes;
+       PgStat_Counter slotsync_skip_count;
+       TimestampTz slotsync_skip_at;
        TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
@@ -745,6 +747,7 @@ extern PgStat_TableStatus *find_tabstat_entry(Oid rel_id);
 extern void pgstat_reset_replslot(const char *name);
 struct ReplicationSlot;
 extern void pgstat_report_replslot(struct ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat);
+extern void pgstat_report_replslotsync(struct ReplicationSlot *slot);
 extern void pgstat_create_replslot(struct ReplicationSlot *slot);
 extern void pgstat_acquire_replslot(struct ReplicationSlot *slot);
 extern void pgstat_drop_replslot(struct ReplicationSlot *slot);
index 1627e619b1b9059e17f95ee646c338a030ca34ff..b2bf5072bbf9b14b6bad19e0a12a0a7577b1690c 100644 (file)
@@ -213,19 +213,75 @@ is( $standby1->safe_psql(
 ##################################################
 # Test that the synchronized slot will be dropped if the corresponding remote
 # slot on the primary server has been dropped.
+#
+# Note: Both slots need to be dropped for the next test to work
 ##################################################
 
 $primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');");
+$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub1_slot');");
 
 $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
 
 is( $standby1->safe_psql(
                'postgres',
-               q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';}
+               q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot');}
        ),
        "t",
        'synchronized slot has been dropped');
 
+##################################################
+# Verify that slotsync skip statistics are correctly updated when the
+# slotsync operation is skipped.
+##################################################
+
+# Create a logical replication slot and create some DDL on the primary so
+# that the slot lags behind the standby.
+$primary->safe_psql(
+       'postgres', qq(
+       SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);
+       CREATE TABLE wal_push(a int);
+));
+$primary->wait_for_replay_catchup($standby1);
+
+my $log_offset = -s $standby1->logfile;
+
+# Enable slot sync worker.
+$standby1->append_conf('postgresql.conf', qq(sync_replication_slots = on));
+$standby1->reload;
+
+# Confirm that the slot sync worker is able to start.
+$standby1->wait_for_log(qr/slot sync worker started/, $log_offset);
+
+# Confirm that the slot sync is skipped due to the remote slot lagging behind
+$standby1->wait_for_log(
+       qr/could not synchronize replication slot \"lsub1_slot\"/, $log_offset);
+
+# Confirm that the slotsync skip statistics is updated
+$result = $standby1->safe_psql('postgres',
+       "SELECT slotsync_skip_count > 0 FROM pg_stat_replication_slots WHERE slot_name = 'lsub1_slot'"
+);
+is($result, 't', "check slot sync skip count increments");
+
+# Clean the table
+$primary->safe_psql(
+       'postgres', qq(
+    DROP TABLE wal_push;
+));
+$primary->wait_for_replay_catchup($standby1);
+
+# Re-create the logical replication slot and sync it to standby for further tests
+$primary->safe_psql(
+       'postgres', qq(
+       SELECT pg_drop_replication_slot('lsub1_slot');
+       SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);
+));
+$standby1->wait_for_log(
+       qr/newly created replication slot \"lsub1_slot\" is sync-ready now/,
+       $log_offset);
+
+$standby1->append_conf('postgresql.conf', qq(sync_replication_slots = off));
+$standby1->reload;
+
 ##################################################
 # Test that if the synchronized slot is invalidated while the remote slot is
 # still valid, the slot will be dropped and re-created on the standby by
@@ -281,7 +337,7 @@ $inactive_since_on_primary =
 # the failover slots.
 $primary->wait_for_replay_catchup($standby1);
 
-my $log_offset = -s $standby1->logfile;
+$log_offset = -s $standby1->logfile;
 
 # Synchronize the primary server slots to the standby.
 $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
index 372a2188c22a43050606df893c2f5e76c1f6dcf2..c337f0bc30dca0dd11d53f498c6d9cdd49dcf076 100644 (file)
@@ -2151,9 +2151,11 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.mem_exceeded_count,
     s.total_txns,
     s.total_bytes,
+    s.slotsync_skip_count,
+    s.slotsync_skip_at,
     s.stats_reset
    FROM pg_replication_slots r,
-    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, stats_reset)
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, slotsync_skip_count, slotsync_skip_at, stats_reset)
   WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT name,
     blks_zeroed,