-- verify accessing/resetting stats for non-existent slot does something reasonable
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
- slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+-------------
- do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+ slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_skip_at | stats_reset
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+------------------+-------------
+ do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
(1 row)
SELECT pg_stat_reset_replication_slot('do-not-exist');
ERROR: replication slot "do-not-exist" does not exist
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
- slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+-------------
- do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+ slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_skip_at | stats_reset
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+------------------+-------------
+ do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
(1 row)
-- spilling the xact
</entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>slotsync_skip_count</structfield><type>bigint</type>
+ </para>
+ <para>
+ Number of times the slot synchronization is skipped. Slot
+ synchronization occur only on standby servers and thus this column has
+ no meaning on the primary server.
+ </para>
+ </entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>slotsync_skip_at</structfield><type>timestamp with time zone</type>
+ </para>
+ <para>
+ Time at which last slot synchronization was skipped. Slot
+ synchronization occur only on standby servers and thus this column has
+ no meaning on the primary server.
+ </para>
+ </entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
s.mem_exceeded_count,
s.total_txns,
s.total_bytes,
+ s.slotsync_skip_count,
+ s.slotsync_skip_at,
s.stats_reset
FROM pg_replication_slots as r,
LATERAL pg_stat_get_replication_slot(slot_name) as s
TransactionIdPrecedes(remote_slot->catalog_xmin,
slot->data.catalog_xmin))
{
+ /* Update slot sync skip stats */
+ pgstat_report_replslotsync(slot);
+
/*
* This can happen in following situations:
*
errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
+
+ /*
+ * If we can't reach a consistent snapshot, the slot won't be
+ * persisted. See update_and_persist_local_synced_slot().
+ */
+ if (found_consistent_snapshot && !(*found_consistent_snapshot))
+ pgstat_report_replslotsync(slot);
}
updated_xmin_or_lsn = true;
bool found_consistent_snapshot = false;
bool remote_slot_precedes = false;
+ /* Slotsync skip stats are handled in function update_local_synced_slot() */
(void) update_local_synced_slot(remote_slot, remote_dbid,
&found_consistent_snapshot,
&remote_slot_precedes);
synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot;
- XLogRecPtr latestFlushPtr;
+ XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL);
bool slot_updated = false;
- /*
- * Make sure that concerned WAL is received and flushed before syncing
- * slot to target lsn received from the primary server.
- */
- latestFlushPtr = GetStandbyFlushRecPtr(NULL);
- if (remote_slot->confirmed_lsn > latestFlushPtr)
- {
- /*
- * Can get here only if GUC 'synchronized_standby_slots' on the
- * primary server was not configured correctly.
- */
- ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("skipping slot synchronization because the received slot sync"
- " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
- LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
- remote_slot->name,
- LSN_FORMAT_ARGS(latestFlushPtr)));
-
- return false;
- }
-
/* Search for the named slot */
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
{
/* Skip the sync of an invalidated slot */
if (slot->data.invalidated != RS_INVAL_NONE)
{
+ pgstat_report_replslotsync(slot);
+
ReplicationSlotRelease();
return slot_updated;
}
+ /*
+ * Make sure that concerned WAL is received and flushed before syncing
+ * slot to target lsn received from the primary server.
+ *
+ * Report statistics only after the slot has been acquired, ensuring
+ * it cannot be dropped during the reporting process.
+ */
+ if (remote_slot->confirmed_lsn > latestFlushPtr)
+ {
+ pgstat_report_replslotsync(slot);
+
+ /*
+ * Can get here only if GUC 'synchronized_standby_slots' on the
+ * primary server was not configured correctly.
+ */
+ ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipping slot synchronization because the received slot sync"
+ " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ remote_slot->name,
+ LSN_FORMAT_ARGS(latestFlushPtr)));
+
+ return slot_updated;
+ }
+
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if (slot->data.persistency == RS_TEMPORARY)
{
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
+ /*
+ * Make sure that concerned WAL is received and flushed before syncing
+ * slot to target lsn received from the primary server.
+ *
+ * Report statistics only after the slot has been acquired, ensuring
+ * it cannot be dropped during the reporting process.
+ */
+ if (remote_slot->confirmed_lsn > latestFlushPtr)
+ {
+ pgstat_report_replslotsync(slot);
+
+ /*
+ * Can get here only if GUC 'synchronized_standby_slots' on the
+ * primary server was not configured correctly.
+ */
+ ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipping slot synchronization because the received slot sync"
+ " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ remote_slot->name,
+ LSN_FORMAT_ARGS(latestFlushPtr)));
+
+ return false;
+ }
+
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
slot_updated = true;
pgstat_unlock_entry(entry_ref);
}
+/*
+ * Report replication slot sync skip statistics.
+ *
+ * Similar to pgstat_report_replslot(), we can rely on the stats for the
+ * slot to exist and to belong to this slot.
+ */
+void
+pgstat_report_replslotsync(ReplicationSlot *slot)
+{
+ PgStat_EntryRef *entry_ref;
+ PgStatShared_ReplSlot *shstatent;
+ PgStat_StatReplSlotEntry *statent;
+
+ /* Slot sync stats are valid only for logical slots on standby. */
+ Assert(SlotIsLogical(slot));
+ Assert(RecoveryInProgress());
+
+ entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
+ ReplicationSlotIndex(slot), false);
+ Assert(entry_ref != NULL);
+
+ shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
+ statent = &shstatent->stats;
+
+ statent->slotsync_skip_count += 1;
+ statent->slotsync_skip_at = GetCurrentTimestamp();
+
+ pgstat_unlock_entry(entry_ref);
+}
+
/*
* Report replication slot creation.
*
* Report replication slot has been acquired.
*
* This guarantees that a stats entry exists during later
- * pgstat_report_replslot() calls.
+ * pgstat_report_replslot() or pgstat_report_replslotsync() calls.
*
* If we previously crashed, no stats data exists. But if we did not crash,
* the stats do belong to this slot:
Datum
pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 11
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 13
text *slotname_text = PG_GETARG_TEXT_P(0);
NameData slotname;
TupleDesc tupdesc;
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "slotsync_skip_count",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_skip_at",
+ TIMESTAMPTZOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
values[7] = Int64GetDatum(slotent->mem_exceeded_count);
values[8] = Int64GetDatum(slotent->total_txns);
values[9] = Int64GetDatum(slotent->total_bytes);
+ values[10] = Int64GetDatum(slotent->slotsync_skip_count);
+
+ if (slotent->slotsync_skip_at == 0)
+ nulls[11] = true;
+ else
+ values[11] = TimestampTzGetDatum(slotent->slotsync_skip_at);
if (slotent->stat_reset_timestamp == 0)
- nulls[10] = true;
+ nulls[12] = true;
else
- values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+ values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202511221
+#define CATALOG_VERSION_NO 202511251
#endif
{ oid => '6169', descr => 'statistics: information about replication slot',
proname => 'pg_stat_get_replication_slot', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'text',
- proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,stats_reset}',
+ proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,slotsync_skip_count,slotsync_skip_at,stats_reset}',
prosrc => 'pg_stat_get_replication_slot' },
{ oid => '6230', descr => 'statistics: check if a stats object exists',
* ------------------------------------------------------------
*/
-#define PGSTAT_FILE_FORMAT_ID 0x01A5BCBA
+#define PGSTAT_FILE_FORMAT_ID 0x01A5BCBB
typedef struct PgStat_ArchiverStats
{
PgStat_Counter mem_exceeded_count;
PgStat_Counter total_txns;
PgStat_Counter total_bytes;
+ PgStat_Counter slotsync_skip_count;
+ TimestampTz slotsync_skip_at;
TimestampTz stat_reset_timestamp;
} PgStat_StatReplSlotEntry;
extern void pgstat_reset_replslot(const char *name);
struct ReplicationSlot;
extern void pgstat_report_replslot(struct ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat);
+extern void pgstat_report_replslotsync(struct ReplicationSlot *slot);
extern void pgstat_create_replslot(struct ReplicationSlot *slot);
extern void pgstat_acquire_replslot(struct ReplicationSlot *slot);
extern void pgstat_drop_replslot(struct ReplicationSlot *slot);
##################################################
# Test that the synchronized slot will be dropped if the corresponding remote
# slot on the primary server has been dropped.
+#
+# Note: Both slots need to be dropped for the next test to work
##################################################
$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');");
+$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub1_slot');");
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
is( $standby1->safe_psql(
'postgres',
- q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';}
+ q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot');}
),
"t",
'synchronized slot has been dropped');
+##################################################
+# Verify that slotsync skip statistics are correctly updated when the
+# slotsync operation is skipped.
+##################################################
+
+# Create a logical replication slot and create some DDL on the primary so
+# that the slot lags behind the standby.
+$primary->safe_psql(
+ 'postgres', qq(
+ SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);
+ CREATE TABLE wal_push(a int);
+));
+$primary->wait_for_replay_catchup($standby1);
+
+my $log_offset = -s $standby1->logfile;
+
+# Enable slot sync worker.
+$standby1->append_conf('postgresql.conf', qq(sync_replication_slots = on));
+$standby1->reload;
+
+# Confirm that the slot sync worker is able to start.
+$standby1->wait_for_log(qr/slot sync worker started/, $log_offset);
+
+# Confirm that the slot sync is skipped due to the remote slot lagging behind
+$standby1->wait_for_log(
+ qr/could not synchronize replication slot \"lsub1_slot\"/, $log_offset);
+
+# Confirm that the slotsync skip statistics is updated
+$result = $standby1->safe_psql('postgres',
+ "SELECT slotsync_skip_count > 0 FROM pg_stat_replication_slots WHERE slot_name = 'lsub1_slot'"
+);
+is($result, 't', "check slot sync skip count increments");
+
+# Clean the table
+$primary->safe_psql(
+ 'postgres', qq(
+ DROP TABLE wal_push;
+));
+$primary->wait_for_replay_catchup($standby1);
+
+# Re-create the logical replication slot and sync it to standby for further tests
+$primary->safe_psql(
+ 'postgres', qq(
+ SELECT pg_drop_replication_slot('lsub1_slot');
+ SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);
+));
+$standby1->wait_for_log(
+ qr/newly created replication slot \"lsub1_slot\" is sync-ready now/,
+ $log_offset);
+
+$standby1->append_conf('postgresql.conf', qq(sync_replication_slots = off));
+$standby1->reload;
+
##################################################
# Test that if the synchronized slot is invalidated while the remote slot is
# still valid, the slot will be dropped and re-created on the standby by
# the failover slots.
$primary->wait_for_replay_catchup($standby1);
-my $log_offset = -s $standby1->logfile;
+$log_offset = -s $standby1->logfile;
# Synchronize the primary server slots to the standby.
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
s.mem_exceeded_count,
s.total_txns,
s.total_bytes,
+ s.slotsync_skip_count,
+ s.slotsync_skip_at,
s.stats_reset
FROM pg_replication_slots r,
- LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, stats_reset)
+ LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, slotsync_skip_count, slotsync_skip_at, stats_reset)
WHERE (r.datoid IS NOT NULL);
pg_stat_slru| SELECT name,
blks_zeroed,