* the last cycle. Refer to the comments above wait_for_slot_activity() for
* more details.
*
+ * If the SQL function pg_sync_replication_slots() is used to sync the slots,
+ * and if the slots are not ready to be synced and are marked as RS_TEMPORARY
+ * because of any of the reasons mentioned above, then the SQL function also
+ * waits and retries until the slots are marked as RS_PERSISTENT (which means
+ * sync-ready). Refer to the comments in SyncReplicationSlots() for more
+ * details.
+ *
* Any standby synchronized slots will be dropped if they no longer need
* to be synchronized. See comment atop drop_local_obsolete_slots() for more
* details.
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
* local ones, then update the LSNs and persist the local synced slot for
* future synchronization; otherwise, do nothing.
*
+ * *slot_persistence_pending is set to true if any of the slots fail to
+ * persist.
+ *
* Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
* false.
*/
static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+ bool *slot_persistence_pending)
{
ReplicationSlot *slot = MyReplicationSlot;
bool found_consistent_snapshot = false;
* current location when recreating the slot in the next cycle. It may
* take more time to create such a slot. Therefore, we keep this slot
* and attempt the synchronization in the next cycle.
+ *
+ * We also update the slot_persistence_pending parameter, so the SQL
+ * function can retry.
*/
+ if (slot_persistence_pending)
+ *slot_persistence_pending = true;
+
return false;
}
errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
LSN_FORMAT_ARGS(slot->data.restart_lsn)));
+ /* Set this, so that SQL function can retry */
+ if (slot_persistence_pending)
+ *slot_persistence_pending = true;
+
return false;
}
* updated. The slot is then persisted and is considered as sync-ready for
* periodic syncs.
*
+ * *slot_persistence_pending is set to true if any of the slots fail to
+ * persist.
+ *
* Returns TRUE if the local slot is updated.
*/
static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+ bool *slot_persistence_pending)
{
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL);
if (slot->data.persistency == RS_TEMPORARY)
{
slot_updated = update_and_persist_local_synced_slot(remote_slot,
- remote_dbid);
+ remote_dbid,
+ slot_persistence_pending);
}
/* Slot ready for sync, so sync it. */
return false;
}
- update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+ update_and_persist_local_synced_slot(remote_slot, remote_dbid,
+ slot_persistence_pending);
slot_updated = true;
}
}
/*
- * Synchronize slots.
+ * Fetch remote slots.
*
- * Gets the failover logical slots info from the primary server and updates
- * the slots locally. Creates the slots if not present on the standby.
+ * If slot_names is NIL, fetches all failover logical slots from the
+ * primary server, otherwise fetches only the ones with names in slot_names.
*
- * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ * Returns a list of remote slot information structures, or NIL if none
+ * are found.
*/
-static bool
-synchronize_slots(WalReceiverConn *wrconn)
+static List *
+fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names)
{
#define SLOTSYNC_COLUMN_COUNT 10
Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
WalRcvExecResult *res;
TupleTableSlot *tupslot;
List *remote_slot_list = NIL;
- bool some_slot_updated = false;
- bool started_tx = false;
- const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
- " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
- " database, invalidation_reason"
- " FROM pg_catalog.pg_replication_slots"
- " WHERE failover and NOT temporary";
-
- /* The syscache access in walrcv_exec() needs a transaction env. */
- if (!IsTransactionState())
+ StringInfoData query;
+
+ initStringInfo(&query);
+ appendStringInfoString(&query,
+ "SELECT slot_name, plugin, confirmed_flush_lsn,"
+ " restart_lsn, catalog_xmin, two_phase,"
+ " two_phase_at, failover,"
+ " database, invalidation_reason"
+ " FROM pg_catalog.pg_replication_slots"
+ " WHERE failover and NOT temporary");
+
+ if (slot_names != NIL)
{
- StartTransactionCommand();
- started_tx = true;
+ bool first_slot = true;
+
+ /*
+ * Construct the query to fetch only the specified slots
+ */
+ appendStringInfoString(&query, " AND slot_name IN (");
+
+ foreach_ptr(char, slot_name, slot_names)
+ {
+ if (!first_slot)
+ appendStringInfoString(&query, ", ");
+
+ appendStringInfo(&query, "%s", quote_literal_cstr(slot_name));
+ first_slot = false;
+ }
+ appendStringInfoChar(&query, ')');
}
/* Execute the query */
- res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
+ res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow);
+ pfree(query.data);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
errmsg("could not fetch failover logical slots info from the primary server: %s",
res->err));
- /* Construct the remote_slot tuple and synchronize each slot locally */
tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
{
ExecClearTuple(tupslot);
}
+ walrcv_clear_result(res);
+
+ return remote_slot_list;
+}
+
+/*
+ * Synchronize slots.
+ *
+ * This function takes a list of remote slots and synchronizes them locally. It
+ * creates the slots if not present on the standby and updates existing ones.
+ *
+ * If slot_persistence_pending is not NULL, it will be set to true if one or
+ * more slots could not be persisted. This allows callers such as
+ * SyncReplicationSlots() to retry those slots.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ */
+static bool
+synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list,
+ bool *slot_persistence_pending)
+{
+ bool some_slot_updated = false;
+
/* Drop local slots that no longer need to be synced. */
drop_local_obsolete_slots(remote_slot_list);
*/
LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
- some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+ some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid,
+ slot_persistence_pending);
UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
}
- /* We are done, free remote_slot_list elements */
- list_free_deep(remote_slot_list);
-
- walrcv_clear_result(res);
-
- if (started_tx)
- CommitTransactionCommand();
-
return some_slot_updated;
}
*
* It connects to the primary server, fetches logical failover slots
* information periodically in order to create and sync the slots.
+ *
+ * Note: If any changes are made here, check if the corresponding SQL
+ * function logic in SyncReplicationSlots() also needs to be changed.
*/
void
ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
for (;;)
{
bool some_slot_updated = false;
+ bool started_tx = false;
+ List *remote_slots;
ProcessSlotSyncInterrupts();
- some_slot_updated = synchronize_slots(wrconn);
+ /*
+ * The syscache access in fetch_remote_slots() needs a transaction
+ * env.
+ */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ remote_slots = fetch_remote_slots(wrconn, NIL);
+ some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL);
+ list_free_deep(remote_slots);
+
+ if (started_tx)
+ CommitTransactionCommand();
wait_for_slot_activity(some_slot_updated);
}
walrcv_disconnect(wrconn);
}
+/*
+ * Helper function to extract slot names from a list of remote slots
+ */
+static List *
+extract_slot_names(List *remote_slots)
+{
+ List *slot_names = NIL;
+
+ foreach_ptr(RemoteSlot, remote_slot, remote_slots)
+ {
+ char *slot_name;
+
+ slot_name = pstrdup(remote_slot->name);
+ slot_names = lappend(slot_names, slot_name);
+ }
+
+ return slot_names;
+}
+
/*
* Synchronize the failover enabled replication slots using the specified
* primary server connection.
+ *
+ * Repeatedly fetches and updates replication slot information from the
+ * primary until all slots are at least "sync ready".
+ *
+ * Exits early if promotion is triggered or certain critical
+ * configuration parameters have changed.
*/
void
SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ List *remote_slots = NIL;
+ List *slot_names = NIL; /* List of slot names to track */
+
check_and_set_sync_info(MyProcPid);
/* Check for interrupts and config changes */
validate_remote_info(wrconn);
- synchronize_slots(wrconn);
+ /* Retry until all the slots are sync-ready */
+ for (;;)
+ {
+ bool slot_persistence_pending = false;
+ bool some_slot_updated = false;
+
+ /* Check for interrupts and config changes */
+ ProcessSlotSyncInterrupts();
+
+ /* We must be in a valid transaction state */
+ Assert(IsTransactionState());
+
+ /*
+ * Fetch remote slot info for the given slot_names. If slot_names
+ * is NIL, fetch all failover-enabled slots. Note that we reuse
+ * slot_names from the first iteration; re-fetching all failover
+ * slots each time could cause an endless loop. Instead of
+ * reprocessing only the pending slots in each iteration, it's
+ * better to process all the slots received in the first
+ * iteration. This ensures that by the time we're done, all slots
+ * reflect the latest values.
+ */
+ remote_slots = fetch_remote_slots(wrconn, slot_names);
+
+ /* Attempt to synchronize slots */
+ some_slot_updated = synchronize_slots(wrconn, remote_slots,
+ &slot_persistence_pending);
+
+ /*
+ * If slot_persistence_pending is true, extract slot names for
+ * future iterations (only needed if we haven't done it yet)
+ */
+ if (slot_names == NIL && slot_persistence_pending)
+ slot_names = extract_slot_names(remote_slots);
+
+ /* Free the current remote_slots list */
+ list_free_deep(remote_slots);
+
+ /* Done if all slots are persisted i.e are sync-ready */
+ if (!slot_persistence_pending)
+ break;
+
+ /* wait before retrying again */
+ wait_for_slot_activity(some_slot_updated);
+ }
+
+ if (slot_names)
+ list_free_deep(slot_names);
/* Cleanup the synced temporary slots */
ReplicationSlotCleanup(true);
));
$subscriber2->safe_psql('postgres', 'DROP SUBSCRIPTION regress_mysub2;');
+$subscriber1->safe_psql('postgres', 'DROP SUBSCRIPTION regress_mysub1;');
+$subscriber1->safe_psql('postgres', 'TRUNCATE tab_int;');
+
+# Remove the dropped sb1_slot from the synchronized_standby_slots list and reload the
+# configuration.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
# Verify that all slots have been removed except the one necessary for standby2,
# which is needed for further testing.
$primary->wait_for_replay_catchup($standby2);
##################################################
-# Verify that slotsync skip statistics are correctly updated when the
+# Test that pg_sync_replication_slots() on the standby skips and retries
+# until the slot becomes sync-ready (when the remote slot catches up with
+# the locally reserved position).
+# Also verify that slotsync skip statistics are correctly updated when the
# slotsync operation is skipped.
##################################################
-# Create a logical replication slot and create some DDL on the primary so
-# that the slot lags behind the standby.
-$primary->safe_psql(
- 'postgres', qq(
- SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);
- CREATE TABLE wal_push(a int);
-));
+# Recreate the slot by creating a subscription on the subscriber, keep it disabled.
+$subscriber1->safe_psql('postgres', qq[
+ CREATE TABLE push_wal (a int);
+ CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, failover = true, enabled = false);]);
+
+# Create some DDL on the primary so that the slot lags behind the standby
+$primary->safe_psql('postgres', "CREATE TABLE push_wal (a int);");
+
+# Make sure the DDL changes are synced to the standby
$primary->wait_for_replay_catchup($standby2);
$log_offset = -s $standby2->logfile;
-# Enable slot sync worker
+# Enable standby for slot synchronization
$standby2->append_conf(
'postgresql.conf', qq(
hot_standby_feedback = on
primary_conninfo = '$connstr_1 dbname=postgres'
log_min_messages = 'debug2'
-sync_replication_slots = on
));
$standby2->reload;
-# Confirm that the slot sync worker is able to start.
-$standby2->wait_for_log(qr/slot sync worker started/, $log_offset);
+# Attempt to synchronize slots using API. The API will continue retrying
+# synchronization until the remote slot catches up.
+# The API will not return until this happens, to be able to make
+# further calls, call the API in a background process.
+my $h = $standby2->background_psql('postgres', on_error_stop => 0);
+
+$h->query_until(qr/start/, q(
+ \echo start
+ SELECT pg_sync_replication_slots();
+ ));
# Confirm that the slot sync is skipped due to the remote slot lagging behind
$standby2->wait_for_log(
);
is($result, 't', "check slot sync skip count increments");
+# Enable the Subscription, so that the remote slot catches up
+$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
+$subscriber1->wait_for_subscription_sync;
+
+# Create xl_running_xacts on the primary to speed up restart_lsn advancement.
+$primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();");
+
+# Confirm from the log that the slot is sync-ready now.
+$standby2->wait_for_log(
+ qr/newly created replication slot \"lsub1_slot\" is sync-ready now/,
+ $log_offset);
+
+$h->quit;
+
done_testing();