*
* If no update was needed (the data of the remote slot is the same as the
* local slot) return false, otherwise true.
- *
- * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
- * modified, and decoding from the corresponding LSN's can reach a
- * consistent snapshot.
- *
- * *remote_slot_precedes will be true if the remote slot's LSN or xmin
- * precedes locally reserved position.
*/
static bool
-update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
- bool *found_consistent_snapshot,
- bool *remote_slot_precedes)
+update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot = MyReplicationSlot;
bool updated_xmin_or_lsn = false;
bool updated_config = false;
SlotSyncSkipReason skip_reason = SS_SKIP_NONE;
+ XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL);
Assert(slot->data.invalidated == RS_INVAL_NONE);
- if (found_consistent_snapshot)
- *found_consistent_snapshot = false;
+ /*
+ * Make sure that concerned WAL is received and flushed before syncing
+ * slot to target lsn received from the primary server.
+ */
+ if (remote_slot->confirmed_lsn > latestFlushPtr)
+ {
+ update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
- if (remote_slot_precedes)
- *remote_slot_precedes = false;
+ /*
+ * Can get here only if GUC 'synchronized_standby_slots' on the
+ * primary server was not configured correctly.
+ */
+ ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipping slot synchronization because the received slot sync"
+ " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ remote_slot->name,
+ LSN_FORMAT_ARGS(latestFlushPtr)));
+
+ return false;
+ }
/*
* Don't overwrite if we already have a newer catalog_xmin and
LSN_FORMAT_ARGS(slot->data.restart_lsn),
slot->data.catalog_xmin));
- if (remote_slot_precedes)
- *remote_slot_precedes = true;
-
/*
* Skip updating the configuration. This is required to avoid syncing
* two_phase_at without syncing confirmed_lsn. Otherwise, the prepared
slot->data.confirmed_flush = remote_slot->confirmed_lsn;
slot->data.catalog_xmin = remote_slot->catalog_xmin;
SpinLockRelease(&slot->mutex);
-
- if (found_consistent_snapshot)
- *found_consistent_snapshot = true;
}
else
{
+ bool found_consistent_snapshot;
+
LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
- found_consistent_snapshot);
+ &found_consistent_snapshot);
/* Sanity check */
if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
* If we can't reach a consistent snapshot, the slot won't be
* persisted. See update_and_persist_local_synced_slot().
*/
- if (found_consistent_snapshot && !(*found_consistent_snapshot))
+ if (!found_consistent_snapshot)
+ {
+ Assert(MyReplicationSlot->data.persistency == RS_TEMPORARY);
+
+ ereport(LOG,
+ errmsg("could not synchronize replication slot \"%s\"",
+ remote_slot->name),
+ errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
+ LSN_FORMAT_ARGS(slot->data.restart_lsn)));
+
skip_reason = SS_SKIP_NO_CONSISTENT_SNAPSHOT;
+ }
}
updated_xmin_or_lsn = true;
bool *slot_persistence_pending)
{
ReplicationSlot *slot = MyReplicationSlot;
- bool found_consistent_snapshot = false;
- bool remote_slot_precedes = false;
/* Slotsync skip stats are handled in function update_local_synced_slot() */
- (void) update_local_synced_slot(remote_slot, remote_dbid,
- &found_consistent_snapshot,
- &remote_slot_precedes);
+ (void) update_local_synced_slot(remote_slot, remote_dbid);
/*
- * Check if the primary server has caught up. Refer to the comment atop
- * the file for details on this check.
+ * Check if the slot cannot be synchronized. Refer to the comment atop the
+ * file for details on this check.
*/
- if (remote_slot_precedes)
+ if (slot->slotsync_skip_reason != SS_SKIP_NONE)
{
/*
- * The remote slot didn't catch up to locally reserved position.
+ * We reach this point when the remote slot didn't catch up to locally
+ * reserved position, or it cannot reach the consistent point from the
+ * restart_lsn, or the WAL prior to the remote confirmed flush LSN has
+ * not been received and flushed.
*
- * We do not drop the slot because the restart_lsn can be ahead of the
- * current location when recreating the slot in the next cycle. It may
- * take more time to create such a slot. Therefore, we keep this slot
- * and attempt the synchronization in the next cycle.
+ * We do not drop the slot because the restart_lsn and confirmed_lsn
+ * can be ahead of the current location when recreating the slot in
+ * the next cycle. It may take more time to create such a slot or
+ * reach the consistent point. Therefore, we keep this slot and
+ * attempt the synchronization in the next cycle.
*
* We also update the slot_persistence_pending parameter, so the SQL
* function can retry.
return false;
}
- /*
- * Don't persist the slot if it cannot reach the consistent point from the
- * restart_lsn. See comments atop this file.
- */
- if (!found_consistent_snapshot)
- {
- ereport(LOG,
- errmsg("could not synchronize replication slot \"%s\"", remote_slot->name),
- errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
- LSN_FORMAT_ARGS(slot->data.restart_lsn)));
-
- /* Set this, so that SQL function can retry */
- if (slot_persistence_pending)
- *slot_persistence_pending = true;
-
- return false;
- }
-
ReplicationSlotPersist();
ereport(LOG,
bool *slot_persistence_pending)
{
ReplicationSlot *slot;
- XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL);
bool slot_updated = false;
/* Search for the named slot */
return slot_updated;
}
- /*
- * Make sure that concerned WAL is received and flushed before syncing
- * slot to target lsn received from the primary server.
- *
- * Report statistics only after the slot has been acquired, ensuring
- * it cannot be dropped during the reporting process.
- */
- if (remote_slot->confirmed_lsn > latestFlushPtr)
- {
- update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
-
- /*
- * Can get here only if GUC 'synchronized_standby_slots' on the
- * primary server was not configured correctly.
- */
- ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("skipping slot synchronization because the received slot sync"
- " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
- LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
- remote_slot->name,
- LSN_FORMAT_ARGS(latestFlushPtr)));
-
- ReplicationSlotRelease();
-
- return slot_updated;
- }
-
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if (slot->data.persistency == RS_TEMPORARY)
{
LSN_FORMAT_ARGS(slot->data.confirmed_flush),
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
- slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
- NULL, NULL);
+ slot_updated = update_local_synced_slot(remote_slot, remote_dbid);
}
}
/* Otherwise create the slot first. */
LWLockRelease(ProcArrayLock);
LWLockRelease(ReplicationSlotControlLock);
- /*
- * Make sure that concerned WAL is received and flushed before syncing
- * slot to target lsn received from the primary server.
- *
- * Report statistics only after the slot has been acquired, ensuring
- * it cannot be dropped during the reporting process.
- */
- if (remote_slot->confirmed_lsn > latestFlushPtr)
- {
- update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
-
- /*
- * Can get here only if GUC 'synchronized_standby_slots' on the
- * primary server was not configured correctly.
- */
- ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("skipping slot synchronization because the received slot sync"
- " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
- LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
- remote_slot->name,
- LSN_FORMAT_ARGS(latestFlushPtr)));
-
- ReplicationSlotRelease();
-
- return false;
- }
-
update_and_persist_local_synced_slot(remote_slot, remote_dbid,
slot_persistence_pending);