}
/*
- * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
- * the shared flags to revert the logical decoding activation process.
+ * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding.
+ *
+ * Rather than directly reverting xlog_logical_info here, we request
+ * that the checkpointer handle it via the normal disable path. This
+ * avoids race conditions when multiple backends attempt concurrent
+ * activation: the checkpointer will reset xlog_logical_info when
+ * no valid logical slots exist.
*/
static void
abort_logical_decoding_activation(int code, Datum arg)
{
- Assert(MyReplicationSlot);
- Assert(!LogicalDecodingCtl->logical_decoding_enabled);
-
elog(DEBUG1, "aborting logical decoding activation process");
-
- /*
- * Abort the change to xlog_logical_info. We don't need to check
- * CheckLogicalSlotExists() as we're still holding a logical slot.
- */
- LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
- LogicalDecodingCtl->xlog_logical_info = false;
- LWLockRelease(LogicalDecodingControlLock);
-
- /*
- * Some processes might have already started logical info WAL logging, so
- * tell all running processes to update their caches. We don't need to
- * wait for all processes to disable xlog_logical_info locally as it's
- * always safe to write logical information to WAL records, even when not
- * strictly required.
- */
- EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
+ RequestDisableLogicalDecoding();
}
/*
* long-running read queries, which is practically unacceptable.
*/
+ LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+
START_CRIT_SECTION();
/*
* This sequence ensures logical decoding becomes available on the primary
* first.
*/
- LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
-
LogicalDecodingCtl->logical_decoding_enabled = true;
if (!in_recovery)
LogicalDecodingCtl->pending_disable = false;
- LWLockRelease(LogicalDecodingControlLock);
-
END_CRIT_SECTION();
+ LWLockRelease(LogicalDecodingControlLock);
+
+ /*
+ * We log the activation message after releasing the slot lock. This is
+ * safe because the activation is performed while holding a logical slot,
+ * meaning, a concurrent deactivation cannot interleave its log message
+ * ahead of ours.
+ */
if (!in_recovery)
ereport(LOG,
errmsg("logical decoding is enabled upon creating a new logical replication slot"));
DisableLogicalDecoding(void)
{
bool in_recovery = RecoveryInProgress();
+ bool was_enabled;
LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
/*
* Check if we can disable logical decoding.
*
- * Skip CheckLogicalSlotExists() check during recovery because the
- * existing slots will be invalidated after disabling logical decoding.
+ * Nothing to do if both flags are already off, or if valid slots exist
+ * (skip the slot check during recovery because the existing slots will be
+ * invalidated after disabling logical decoding.)
*/
- if (!LogicalDecodingCtl->logical_decoding_enabled ||
+ if ((!LogicalDecodingCtl->logical_decoding_enabled &&
+ !LogicalDecodingCtl->xlog_logical_info) ||
(!in_recovery && CheckLogicalSlotExists()))
{
LogicalDecodingCtl->pending_disable = false;
return;
}
+ /*
+ * Remember if logical decoding was enabled. An interrupted activation can
+ * leave xlog_logical_info=true while logical_decoding_enabled remains
+ * false.
+ */
+ was_enabled = LogicalDecodingCtl->logical_decoding_enabled;
+
START_CRIT_SECTION();
/*
LogicalDecodingCtl->logical_decoding_enabled = false;
/* Write the WAL to disable logical decoding on standbys too */
- if (!in_recovery)
+ if (!in_recovery && was_enabled)
write_logical_decoding_status_update_record(false);
/* Now disable logical information WAL logging */
END_CRIT_SECTION();
- if (!in_recovery)
+ /*
+ * Logging under the lock guarantees our "is disabled" message appears in
+ * the server log before its eventual "is enabled", making server log
+ * diagnostics easy.
+ */
+ if (!in_recovery && was_enabled)
ereport(LOG,
errmsg("logical decoding is disabled because there are no valid logical replication slots"));
# Verify that the backend aborted the activation process.
$primary->wait_for_log("aborting logical decoding activation process");
- test_wal_level($primary, "replica|replica",
- "the activation process aborted");
+ wait_for_logical_decoding_disabled($primary);
+ pass("the activation process aborted");
+
+ # Test concurrent activation processes run and one is interrupted.
+ $psql_create_slot = $primary->background_psql('postgres');
+
+ # Start a psql session and stops in the middle of the activation
+ # process.
+ $psql_create_slot->query_until(
+ qr/create_slot_canceled/,
+ q(\echo create_slot_canceled
+select injection_points_set_local();
+select injection_points_attach('logical-decoding-activation', 'wait');
+select pg_create_logical_replication_slot('slot_canceled2', 'pgoutput');
+\q
+));
+ $primary->wait_for_event('client backend', 'logical-decoding-activation');
+ note("injection_point 'logical-decoding-activation' is reached");
+
+ # Another backend concurrently enables logical decoding.
+ $primary->safe_psql('postgres',
+ qq[select pg_create_logical_replication_slot('test_slot2', 'pgoutput')]
+ );
+
+ # Concurrent slot creation should not be blocked. So wait until
+ # test_slot2 is created and logical decoding is enabled.
+ test_wal_level($primary, "replica|logical",
+ "the concurrent activation has done properly");
+
+ # Cancel the backend initiated by $psql_create_slot, aborting its activation
+ # process.
+ my $log_offset = -s $primary->logfile;
+ $primary->safe_psql(
+ 'postgres',
+ qq[
+select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled2' and pid <> pg_backend_pid()
+]);
+ # Canceling the backend should not affect the concurrent slot creation.
+ $primary->wait_for_log("canceling statement due to user request",
+ $log_offset);
+ test_wal_level($primary, "replica|logical",
+ "effective_wal_level remains 'logical' even after the concurrent activation is interrupted"
+ );
}
$primary->stop;