MySubscription->name)));
CommitTransactionCommand();
+
+ /*
+ * Register a callback to reset the origin state before aborting any
+ * pending transaction during shutdown (see ShutdownPostgres()). This will
+ * avoid origin advancement for an incomplete transaction which could
+ * otherwise lead to its loss as such a transaction won't be sent by the
+ * server again.
+ *
+ * Note that even a LOG or DEBUG statement placed after setting the origin
+ * state may process a shutdown signal before committing the current apply
+ * operation. So, it is important to register such a callback here.
+ *
+ * Register this callback here to ensure that all types of logical
+ * replication workers that set up origins and apply remote transactions
+ * are protected.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);
}
/* Logical Replication Apply worker entry point */
InitializeApplyWorker();
- /*
- * Register a callback to reset the origin state before aborting any
- * pending transaction during shutdown (see ShutdownPostgres()). This will
- * avoid origin advancement for an in-complete transaction which could
- * otherwise lead to its loss as such a transaction won't be sent by the
- * server again.
- *
- * Note that even a LOG or DEBUG statement placed after setting the origin
- * state may process a shutdown signal before committing the current apply
- * operation. So, it is important to register such a callback here.
- */
- before_shmem_exit(replorigin_reset, (Datum) 0);
-
InitializingApplyWorker = false;
/* Connect to the origin and start the replication. */
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(1), 'transaction is committed on subscriber');
+# Test the ability to re-apply a transaction when a parallel apply worker fails
+# to prepare the transaction due to insufficient max_prepared_transactions
+# setting.
+$node_subscriber->append_conf(
+ 'postgresql.conf', qq(
+max_prepared_transactions = 0
+debug_logical_replication_streaming = buffered
+));
+$node_subscriber->restart;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab_2 values(2);
+ PREPARE TRANSACTION 'xact';
+ COMMIT PREPARED 'xact';
+ });
+
+$offset = -s $node_subscriber->logfile;
+
+# Confirm the ERROR is reported because max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+ qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/,
+ $offset);
+
+# Confirm that the parallel apply worker has encountered an error. The check
+# focuses on the worker type as a keyword, since the error message content may
+# differ based on whether the leader initially detected the parallel apply
+# worker's failure or received a signal from it.
+$node_subscriber->wait_for_log(
+ qr/ERROR: .*logical replication parallel apply worker.*/,
+ $offset);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(2), 'transaction is committed on subscriber after retrying');
+
###############################
# check all the cleanup
###############################