From 227eb4eea20575d9ef0aac0bdae7b6c2ba261f34 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Wed, 28 Jan 2026 11:45:26 -0800 Subject: [PATCH] Refactor replication origin state reset helpers. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Factor out common logic for clearing replorigin_session_* variables into a dedicated helper function, replorigin_xact_clear(). This removes duplicated assignments of these variables across multiple call sites, and makes the intended scope of each reset explicit. Author: Chao Li Reviewed-by: Masahiko Sawada Reviewed-by: Ashutosh Bapat Reviewed-by: Álvaro Herrera Discussion: https://postgr.es/m/CAEoWx2=pYvfRthXHTzSrOsf5_FfyY4zJyK4zV2v4W=yjUij1cA@mail.gmail.com --- src/backend/replication/logical/origin.c | 21 ++++++++++++++++----- src/backend/replication/logical/tablesync.c | 4 +--- src/backend/replication/logical/worker.c | 14 ++++++-------- src/include/replication/origin.h | 3 +++ 4 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index d91be9ebc18..7704cc5ff1b 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1341,6 +1341,19 @@ replorigin_session_get_progress(bool flush) return remote_lsn; } +/* + * Clear the per-transaction replication origin state. + * + * replorigin_session_origin is also cleared if clear_origin is set. + */ +void +replorigin_xact_clear(bool clear_origin) +{ + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + if (clear_origin) + replorigin_session_origin = InvalidReplOriginId; +} /* --------------------------------------------------------------------------- @@ -1466,9 +1479,7 @@ pg_replication_origin_session_reset(PG_FUNCTION_ARGS) replorigin_session_reset(); - replorigin_session_origin = InvalidReplOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_xact_clear(true); PG_RETURN_VOID(); } @@ -1536,8 +1547,8 @@ pg_replication_origin_xact_reset(PG_FUNCTION_ARGS) { replorigin_check_prerequisites(true, false); - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + /* Do not clear the session origin */ + replorigin_xact_clear(false); PG_RETURN_VOID(); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index c3e24ca4305..15a59759645 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -323,9 +323,7 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn) * This is needed to allow the origin to be dropped. */ replorigin_session_reset(); - replorigin_session_origin = InvalidReplOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_xact_clear(true); /* * Drop the tablesync's origin tracking if exists. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c66ca54d003..23996a83353 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -627,7 +627,7 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); -static void replorigin_reset(int code, Datum arg); +static void on_exit_clear_xact_state(int code, Datum arg); /* * Form the origin name for the subscription. @@ -5594,7 +5594,7 @@ start_apply(XLogRecPtr origin_startpos) * transaction loss as that transaction won't be sent again by the * server. */ - replorigin_reset(0, (Datum) 0); + replorigin_xact_clear(true); if (MySubscription->disableonerr) DisableSubscriptionAndExit(); @@ -5865,18 +5865,16 @@ InitializeLogRepWorker(void) * replication workers that set up origins and apply remote transactions * are protected. */ - before_shmem_exit(replorigin_reset, (Datum) 0); + before_shmem_exit(on_exit_clear_xact_state, (Datum) 0); } /* - * Reset the origin state. + * Callback on exit to clear transaction-level replication origin state. */ static void -replorigin_reset(int code, Datum arg) +on_exit_clear_xact_state(int code, Datum arg) { - replorigin_session_origin = InvalidReplOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_xact_clear(true); } /* diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index 63e198e29a8..bc0a16ecc7e 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -67,6 +67,9 @@ extern void replorigin_session_setup(ReplOriginId node, int acquired_by); extern void replorigin_session_reset(void); extern XLogRecPtr replorigin_session_get_progress(bool flush); +/* Per-transaction replication origin state manipulation */ +extern void replorigin_xact_clear(bool clear_origin); + /* Checkpoint/Startup integration */ extern void CheckPointReplicationOrigin(void); extern void StartupReplicationOrigin(void); -- 2.47.3